blob: 8eaeb7387baf4670db472e6c5299036989085b01 [file] [log] [blame]
Austin Schuhb06f03b2021-02-17 22:00:37 -08001#include "aos/events/logging/log_writer.h"
2
Austin Schuh6bb8a822021-03-31 23:04:39 -07003#include <dirent.h>
4
Austin Schuhb06f03b2021-02-17 22:00:37 -08005#include <functional>
6#include <map>
7#include <vector>
8
9#include "aos/configuration.h"
10#include "aos/events/event_loop.h"
11#include "aos/network/message_bridge_server_generated.h"
12#include "aos/network/team_number.h"
Austin Schuh61e973f2021-02-21 21:43:56 -080013#include "aos/network/timestamp_channel.h"
Austin Schuhb06f03b2021-02-17 22:00:37 -080014
15namespace aos {
16namespace logger {
17namespace {
18using message_bridge::RemoteMessage;
Austin Schuhbd06ae42021-03-31 22:48:21 -070019namespace chrono = std::chrono;
Austin Schuhb06f03b2021-02-17 22:00:37 -080020} // namespace
21
22Logger::Logger(EventLoop *event_loop, const Configuration *configuration,
23 std::function<bool(const Channel *)> should_log)
24 : event_loop_(event_loop),
25 configuration_(configuration),
Austin Schuh5b728b72021-06-16 14:57:15 -070026 node_(configuration::GetNode(configuration_, event_loop->node())),
27 node_index_(configuration::GetNodeIndex(configuration_, node_)),
Austin Schuhb06f03b2021-02-17 22:00:37 -080028 name_(network::GetHostname()),
29 timer_handler_(event_loop_->AddTimer(
Austin Schuh30586902021-03-30 22:54:08 -070030 [this]() { DoLogData(event_loop_->monotonic_now(), true); })),
Austin Schuhb06f03b2021-02-17 22:00:37 -080031 server_statistics_fetcher_(
32 configuration::MultiNode(event_loop_->configuration())
33 ? event_loop_->MakeFetcher<message_bridge::ServerStatistics>(
34 "/aos")
35 : aos::Fetcher<message_bridge::ServerStatistics>()) {
Austin Schuh58646e22021-08-23 23:51:46 -070036 timer_handler_->set_name("channel_poll");
Austin Schuh5b728b72021-06-16 14:57:15 -070037 VLOG(1) << "Creating logger for " << FlatbufferToJson(node_);
Austin Schuhb06f03b2021-02-17 22:00:37 -080038
Austin Schuhb06f03b2021-02-17 22:00:37 -080039 std::map<const Channel *, const Node *> timestamp_logger_channels;
40
Austin Schuh61e973f2021-02-21 21:43:56 -080041 message_bridge::ChannelTimestampFinder finder(event_loop_);
42 for (const Channel *channel : *event_loop_->configuration()->channels()) {
43 if (!configuration::ChannelIsSendableOnNode(channel, event_loop_->node())) {
Austin Schuhb06f03b2021-02-17 22:00:37 -080044 continue;
45 }
Austin Schuh61e973f2021-02-21 21:43:56 -080046 if (!channel->has_destination_nodes()) {
47 continue;
48 }
49 for (const Connection *connection : *channel->destination_nodes()) {
50 if (configuration::ConnectionDeliveryTimeIsLoggedOnNode(
51 connection, event_loop_->node())) {
52 const Node *other_node = configuration::GetNode(
Austin Schuh5b728b72021-06-16 14:57:15 -070053 configuration_, connection->name()->string_view());
Austin Schuh61e973f2021-02-21 21:43:56 -080054
55 VLOG(1) << "Timestamps are logged from "
56 << FlatbufferToJson(other_node);
57 timestamp_logger_channels.insert(
58 std::make_pair(finder.ForChannel(channel, connection), other_node));
59 }
60 }
Austin Schuhb06f03b2021-02-17 22:00:37 -080061 }
62
Austin Schuhb06f03b2021-02-17 22:00:37 -080063 for (size_t channel_index = 0;
64 channel_index < configuration_->channels()->size(); ++channel_index) {
65 const Channel *const config_channel =
66 configuration_->channels()->Get(channel_index);
67 // The MakeRawFetcher method needs a channel which is in the event loop
68 // configuration() object, not the configuration_ object. Go look that up
69 // from the config.
70 const Channel *channel = aos::configuration::GetChannel(
71 event_loop_->configuration(), config_channel->name()->string_view(),
72 config_channel->type()->string_view(), "", event_loop_->node());
73 CHECK(channel != nullptr)
74 << ": Failed to look up channel "
75 << aos::configuration::CleanedChannelToString(config_channel);
Austin Schuh5b728b72021-06-16 14:57:15 -070076 if (!should_log(config_channel)) {
Austin Schuhb06f03b2021-02-17 22:00:37 -080077 continue;
78 }
79
80 FetcherStruct fs;
81 fs.channel_index = channel_index;
82 fs.channel = channel;
83
84 const bool is_local =
Austin Schuh5b728b72021-06-16 14:57:15 -070085 configuration::ChannelIsSendableOnNode(config_channel, node_);
Austin Schuhb06f03b2021-02-17 22:00:37 -080086
87 const bool is_readable =
Austin Schuh5b728b72021-06-16 14:57:15 -070088 configuration::ChannelIsReadableOnNode(config_channel, node_);
Austin Schuhb06f03b2021-02-17 22:00:37 -080089 const bool is_logged = configuration::ChannelMessageIsLoggedOnNode(
Austin Schuh5b728b72021-06-16 14:57:15 -070090 config_channel, node_);
Austin Schuhb06f03b2021-02-17 22:00:37 -080091 const bool log_message = is_logged && is_readable;
92
93 bool log_delivery_times = false;
Austin Schuh5b728b72021-06-16 14:57:15 -070094 if (configuration::MultiNode(configuration_)) {
Austin Schuh72211ae2021-08-05 14:02:30 -070095 const aos::Connection *connection =
Austin Schuh5b728b72021-06-16 14:57:15 -070096 configuration::ConnectionToNode(config_channel, node_);
Austin Schuh72211ae2021-08-05 14:02:30 -070097
Austin Schuhb06f03b2021-02-17 22:00:37 -080098 log_delivery_times = configuration::ConnectionDeliveryTimeIsLoggedOnNode(
Austin Schuh72211ae2021-08-05 14:02:30 -070099 connection, event_loop_->node());
100
101 CHECK_EQ(log_delivery_times,
102 configuration::ConnectionDeliveryTimeIsLoggedOnNode(
Austin Schuh5b728b72021-06-16 14:57:15 -0700103 config_channel, node_, node_));
Austin Schuh72211ae2021-08-05 14:02:30 -0700104
105 if (connection) {
106 fs.reliable_forwarding = (connection->time_to_live() == 0);
107 }
Austin Schuhb06f03b2021-02-17 22:00:37 -0800108 }
109
110 // Now, detect a RemoteMessage timestamp logger where we should just log the
111 // contents to a file directly.
112 const bool log_contents = timestamp_logger_channels.find(channel) !=
113 timestamp_logger_channels.end();
114
115 if (log_message || log_delivery_times || log_contents) {
116 fs.fetcher = event_loop->MakeRawFetcher(channel);
117 VLOG(1) << "Logging channel "
118 << configuration::CleanedChannelToString(channel);
119
120 if (log_delivery_times) {
121 VLOG(1) << " Delivery times";
122 fs.wants_timestamp_writer = true;
Austin Schuh5b728b72021-06-16 14:57:15 -0700123 fs.timestamp_node_index = static_cast<int>(node_index_);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800124 }
Austin Schuhe46492f2021-07-31 19:49:41 -0700125 // Both the timestamp and data writers want data_node_index so it knows
126 // what the source node is.
127 if (log_message || log_delivery_times) {
Austin Schuhb06f03b2021-02-17 22:00:37 -0800128 if (!is_local) {
129 const Node *source_node = configuration::GetNode(
130 configuration_, channel->source_node()->string_view());
131 fs.data_node_index =
132 configuration::GetNodeIndex(configuration_, source_node);
Austin Schuhe46492f2021-07-31 19:49:41 -0700133 }
134 }
135 if (log_message) {
136 VLOG(1) << " Data";
137 fs.wants_writer = true;
138 if (!is_local) {
Austin Schuhb06f03b2021-02-17 22:00:37 -0800139 fs.log_type = LogType::kLogRemoteMessage;
140 } else {
Austin Schuh5b728b72021-06-16 14:57:15 -0700141 fs.data_node_index = static_cast<int>(node_index_);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800142 }
143 }
144 if (log_contents) {
145 VLOG(1) << "Timestamp logger channel "
146 << configuration::CleanedChannelToString(channel);
147 fs.timestamp_node = timestamp_logger_channels.find(channel)->second;
148 fs.wants_contents_writer = true;
149 fs.contents_node_index =
150 configuration::GetNodeIndex(configuration_, fs.timestamp_node);
151 }
152 fetchers_.emplace_back(std::move(fs));
153 }
154 }
155
156 // When we are logging remote timestamps, we need to be able to translate from
157 // the channel index that the event loop uses to the channel index in the
158 // config in the log file.
159 event_loop_to_logged_channel_index_.resize(
160 event_loop->configuration()->channels()->size(), -1);
161 for (size_t event_loop_channel_index = 0;
162 event_loop_channel_index <
163 event_loop->configuration()->channels()->size();
164 ++event_loop_channel_index) {
165 const Channel *event_loop_channel =
166 event_loop->configuration()->channels()->Get(event_loop_channel_index);
167
168 const Channel *logged_channel = aos::configuration::GetChannel(
169 configuration_, event_loop_channel->name()->string_view(),
Austin Schuh5b728b72021-06-16 14:57:15 -0700170 event_loop_channel->type()->string_view(), "", node_);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800171
172 if (logged_channel != nullptr) {
173 event_loop_to_logged_channel_index_[event_loop_channel_index] =
174 configuration::ChannelIndex(configuration_, logged_channel);
175 }
176 }
177}
178
179Logger::~Logger() {
180 if (log_namer_) {
181 // If we are replaying a log file, or in simulation, we want to force the
182 // last bit of data to be logged. The easiest way to deal with this is to
183 // poll everything as we go to destroy the class, ie, shut down the logger,
184 // and write it to disk.
185 StopLogging(event_loop_->monotonic_now());
186 }
187}
188
Austin Schuh6bb8a822021-03-31 23:04:39 -0700189bool Logger::RenameLogBase(std::string new_base_name) {
190 if (new_base_name == log_namer_->base_name()) {
191 return true;
192 }
193 std::string current_directory = std::string(log_namer_->base_name());
194 std::string new_directory = new_base_name;
195
196 auto current_path_split = current_directory.rfind("/");
197 auto new_path_split = new_directory.rfind("/");
198
199 CHECK(new_base_name.substr(new_path_split) ==
200 current_directory.substr(current_path_split))
201 << "Rename of file base from " << current_directory << " to "
202 << new_directory << " is not supported.";
203
204 current_directory.resize(current_path_split);
205 new_directory.resize(new_path_split);
206 DIR *dir = opendir(current_directory.c_str());
207 if (dir) {
208 closedir(dir);
209 const int result = rename(current_directory.c_str(), new_directory.c_str());
210 if (result != 0) {
211 PLOG(ERROR) << "Unable to rename " << current_directory << " to "
212 << new_directory;
213 return false;
214 }
215 } else {
216 // Handle if directory was already renamed.
217 dir = opendir(new_directory.c_str());
218 if (!dir) {
219 LOG(ERROR) << "Old directory " << current_directory
220 << " missing and new directory " << new_directory
221 << " not present.";
222 return false;
223 }
224 closedir(dir);
225 }
226
227 log_namer_->set_base_name(new_base_name);
228 Rotate();
229 return true;
230}
231
Austin Schuhb06f03b2021-02-17 22:00:37 -0800232void Logger::StartLogging(std::unique_ptr<LogNamer> log_namer,
Austin Schuh34f9e482021-03-31 22:54:18 -0700233 std::optional<UUID> log_start_uuid) {
Austin Schuhb06f03b2021-02-17 22:00:37 -0800234 CHECK(!log_namer_) << ": Already logging";
235 log_namer_ = std::move(log_namer);
236
237 std::string config_sha256;
238 if (separate_config_) {
239 flatbuffers::FlatBufferBuilder fbb;
240 flatbuffers::Offset<aos::Configuration> configuration_offset =
241 CopyFlatBuffer(configuration_, &fbb);
242 LogFileHeader::Builder log_file_header_builder(fbb);
243 log_file_header_builder.add_configuration(configuration_offset);
244 fbb.FinishSizePrefixed(log_file_header_builder.Finish());
245 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config_header(
246 fbb.Release());
247 config_sha256 = Sha256(config_header.span());
248 LOG(INFO) << "Config sha256 of " << config_sha256;
249 log_namer_->WriteConfiguration(&config_header, config_sha256);
250 }
251
252 log_event_uuid_ = UUID::Random();
253 log_start_uuid_ = log_start_uuid;
Austin Schuh5b728b72021-06-16 14:57:15 -0700254 VLOG(1) << "Starting logger for " << FlatbufferToJson(node_);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800255
256 // We want to do as much work as possible before the initial Fetch. Time
257 // between that and actually starting to log opens up the possibility of
258 // falling off the end of the queue during that time.
259
260 for (FetcherStruct &f : fetchers_) {
261 if (f.wants_writer) {
262 f.writer = log_namer_->MakeWriter(f.channel);
263 }
264 if (f.wants_timestamp_writer) {
265 f.timestamp_writer = log_namer_->MakeTimestampWriter(f.channel);
266 }
267 if (f.wants_contents_writer) {
268 f.contents_writer = log_namer_->MakeForwardedTimestampWriter(
269 f.channel, CHECK_NOTNULL(f.timestamp_node));
270 }
271 }
272
Austin Schuh73340842021-07-30 22:32:06 -0700273 log_namer_->SetHeaderTemplate(MakeHeader(config_sha256));
Austin Schuhb06f03b2021-02-17 22:00:37 -0800274
Austin Schuha42ee962021-03-31 22:49:30 -0700275 const aos::monotonic_clock::time_point beginning_time =
276 event_loop_->monotonic_now();
277
Austin Schuhb06f03b2021-02-17 22:00:37 -0800278 // Grab data from each channel right before we declare the log file started
279 // so we can capture the latest message on each channel. This lets us have
280 // non periodic messages with configuration that now get logged.
281 for (FetcherStruct &f : fetchers_) {
282 const auto start = event_loop_->monotonic_now();
283 const bool got_new = f.fetcher->Fetch();
284 const auto end = event_loop_->monotonic_now();
285 RecordFetchResult(start, end, got_new, &f);
286
287 // If there is a message, we want to write it.
288 f.written = f.fetcher->context().data == nullptr;
289 }
290
291 // Clear out any old timestamps in case we are re-starting logging.
Austin Schuh572924a2021-07-30 22:32:12 -0700292 for (size_t i = 0; i < configuration::NodesCount(configuration_); ++i) {
Austin Schuh58646e22021-08-23 23:51:46 -0700293 log_namer_->ClearStartTimes();
Austin Schuhb06f03b2021-02-17 22:00:37 -0800294 }
295
Austin Schuha42ee962021-03-31 22:49:30 -0700296 const aos::monotonic_clock::time_point fetch_time =
297 event_loop_->monotonic_now();
Austin Schuhb06f03b2021-02-17 22:00:37 -0800298 WriteHeader();
Austin Schuha42ee962021-03-31 22:49:30 -0700299 const aos::monotonic_clock::time_point header_time =
300 event_loop_->monotonic_now();
Austin Schuhb06f03b2021-02-17 22:00:37 -0800301
Austin Schuh5b728b72021-06-16 14:57:15 -0700302 LOG(INFO) << "Logging node as " << FlatbufferToJson(node_)
Austin Schuha42ee962021-03-31 22:49:30 -0700303 << " start_time " << last_synchronized_time_ << ", took "
304 << chrono::duration<double>(fetch_time - beginning_time).count()
305 << " to fetch, "
306 << chrono::duration<double>(header_time - fetch_time).count()
307 << " to write headers, boot uuid " << event_loop_->boot_uuid();
Austin Schuhb06f03b2021-02-17 22:00:37 -0800308
309 // Force logging up until the start of the log file now, so the messages at
310 // the start are always ordered before the rest of the messages.
311 // Note: this ship may have already sailed, but we don't have to make it
312 // worse.
313 // TODO(austin): Test...
Austin Schuh855f8932021-03-19 22:01:32 -0700314 //
315 // This is safe to call here since we have set last_synchronized_time_ as the
316 // same time as in the header, and all the data before it should be logged
317 // without ordering concerns.
Austin Schuhb06f03b2021-02-17 22:00:37 -0800318 LogUntil(last_synchronized_time_);
319
320 timer_handler_->Setup(event_loop_->monotonic_now() + polling_period_,
321 polling_period_);
322}
323
324std::unique_ptr<LogNamer> Logger::StopLogging(
325 aos::monotonic_clock::time_point end_time) {
326 CHECK(log_namer_) << ": Not logging right now";
327
328 if (end_time != aos::monotonic_clock::min_time) {
Austin Schuh30586902021-03-30 22:54:08 -0700329 // Folks like to use the on_logged_period_ callback to trigger stop and
330 // start events. We can't have those then recurse and try to stop again.
331 // Rather than making everything reentrant, let's just instead block the
332 // callback here.
333 DoLogData(end_time, false);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800334 }
335 timer_handler_->Disable();
336
337 for (FetcherStruct &f : fetchers_) {
338 f.writer = nullptr;
339 f.timestamp_writer = nullptr;
340 f.contents_writer = nullptr;
341 }
Austin Schuhb06f03b2021-02-17 22:00:37 -0800342
343 log_event_uuid_ = UUID::Zero();
Austin Schuh34f9e482021-03-31 22:54:18 -0700344 log_start_uuid_ = std::nullopt;
Austin Schuhb06f03b2021-02-17 22:00:37 -0800345
346 return std::move(log_namer_);
347}
348
349void Logger::WriteHeader() {
350 if (configuration::MultiNode(configuration_)) {
351 server_statistics_fetcher_.Fetch();
352 }
353
Austin Schuh73340842021-07-30 22:32:06 -0700354 const aos::monotonic_clock::time_point monotonic_start_time =
Austin Schuhb06f03b2021-02-17 22:00:37 -0800355 event_loop_->monotonic_now();
Austin Schuh73340842021-07-30 22:32:06 -0700356 const aos::realtime_clock::time_point realtime_start_time =
Austin Schuhb06f03b2021-02-17 22:00:37 -0800357 event_loop_->realtime_now();
358
359 // We need to pick a point in time to declare the log file "started". This
360 // starts here. It needs to be after everything is fetched so that the
361 // fetchers are all pointed at the most recent message before the start
362 // time.
363 last_synchronized_time_ = monotonic_start_time;
364
365 for (const Node *node : log_namer_->nodes()) {
366 const int node_index = configuration::GetNodeIndex(configuration_, node);
367 MaybeUpdateTimestamp(node, node_index, monotonic_start_time,
368 realtime_start_time);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800369 }
370}
371
Austin Schuhb06f03b2021-02-17 22:00:37 -0800372void Logger::WriteMissingTimestamps() {
373 if (configuration::MultiNode(configuration_)) {
374 server_statistics_fetcher_.Fetch();
375 } else {
376 return;
377 }
378
379 if (server_statistics_fetcher_.get() == nullptr) {
380 return;
381 }
382
383 for (const Node *node : log_namer_->nodes()) {
384 const int node_index = configuration::GetNodeIndex(configuration_, node);
385 if (MaybeUpdateTimestamp(
386 node, node_index,
387 server_statistics_fetcher_.context().monotonic_event_time,
388 server_statistics_fetcher_.context().realtime_event_time)) {
Austin Schuh58646e22021-08-23 23:51:46 -0700389 VLOG(1) << "Timestamps changed on " << aos::FlatbufferToJson(node);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800390 }
391 }
392}
393
Austin Schuhb06f03b2021-02-17 22:00:37 -0800394bool Logger::MaybeUpdateTimestamp(
395 const Node *node, int node_index,
396 aos::monotonic_clock::time_point monotonic_start_time,
397 aos::realtime_clock::time_point realtime_start_time) {
398 // Bail early if the start times are already set.
Austin Schuh58646e22021-08-23 23:51:46 -0700399 if (node_ == node || !configuration::MultiNode(configuration_)) {
400 if (log_namer_->monotonic_start_time(node_index,
401 event_loop_->boot_uuid()) !=
402 monotonic_clock::min_time) {
403 return false;
404 }
Austin Schuhb06f03b2021-02-17 22:00:37 -0800405 // There are no offsets to compute for ourself, so always succeed.
Austin Schuh58646e22021-08-23 23:51:46 -0700406 log_namer_->SetStartTimes(node_index, event_loop_->boot_uuid(),
407 monotonic_start_time, realtime_start_time,
408 monotonic_start_time, realtime_start_time);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800409 return true;
410 } else if (server_statistics_fetcher_.get() != nullptr) {
411 // We must be a remote node now. Look for the connection and see if it is
412 // connected.
James Kuszmaul17607fb2021-10-15 20:00:32 -0700413 CHECK(server_statistics_fetcher_->has_connections());
Austin Schuhb06f03b2021-02-17 22:00:37 -0800414
415 for (const message_bridge::ServerConnection *connection :
416 *server_statistics_fetcher_->connections()) {
417 if (connection->node()->name()->string_view() !=
418 node->name()->string_view()) {
419 continue;
420 }
421
422 if (connection->state() != message_bridge::State::CONNECTED) {
423 VLOG(1) << node->name()->string_view()
424 << " is not connected, can't start it yet.";
425 break;
426 }
427
Austin Schuhb06f03b2021-02-17 22:00:37 -0800428 if (!connection->has_monotonic_offset()) {
429 VLOG(1) << "Missing monotonic offset for setting start time for node "
430 << aos::FlatbufferToJson(node);
431 break;
432 }
433
James Kuszmaul17607fb2021-10-15 20:00:32 -0700434 CHECK(connection->has_boot_uuid());
Austin Schuh58646e22021-08-23 23:51:46 -0700435 const UUID boot_uuid =
436 UUID::FromString(connection->boot_uuid()->string_view());
437
438 if (log_namer_->monotonic_start_time(node_index, boot_uuid) !=
439 monotonic_clock::min_time) {
440 break;
441 }
442
443 VLOG(1) << "Updating start time for "
444 << aos::FlatbufferToJson(connection);
445
Austin Schuhb06f03b2021-02-17 22:00:37 -0800446 // Found it and it is connected. Compensate and go.
Austin Schuh73340842021-07-30 22:32:06 -0700447 log_namer_->SetStartTimes(
Austin Schuh58646e22021-08-23 23:51:46 -0700448 node_index, boot_uuid,
Austin Schuh73340842021-07-30 22:32:06 -0700449 monotonic_start_time +
450 std::chrono::nanoseconds(connection->monotonic_offset()),
451 realtime_start_time, monotonic_start_time, realtime_start_time);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800452 return true;
453 }
454 }
455 return false;
456}
457
458aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> Logger::MakeHeader(
Austin Schuh73340842021-07-30 22:32:06 -0700459 std::string_view config_sha256) {
Austin Schuhb06f03b2021-02-17 22:00:37 -0800460 flatbuffers::FlatBufferBuilder fbb;
461 fbb.ForceDefaults(true);
462
463 flatbuffers::Offset<aos::Configuration> configuration_offset;
464 if (!separate_config_) {
465 configuration_offset = CopyFlatBuffer(configuration_, &fbb);
466 } else {
467 CHECK(!config_sha256.empty());
468 }
469
470 const flatbuffers::Offset<flatbuffers::String> name_offset =
471 fbb.CreateString(name_);
472
473 CHECK(log_event_uuid_ != UUID::Zero());
474 const flatbuffers::Offset<flatbuffers::String> log_event_uuid_offset =
Austin Schuh5e2bfb82021-03-13 22:46:55 -0800475 log_event_uuid_.PackString(&fbb);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800476
477 const flatbuffers::Offset<flatbuffers::String> logger_instance_uuid_offset =
Austin Schuh5e2bfb82021-03-13 22:46:55 -0800478 logger_instance_uuid_.PackString(&fbb);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800479
480 flatbuffers::Offset<flatbuffers::String> log_start_uuid_offset;
Austin Schuh34f9e482021-03-31 22:54:18 -0700481 if (log_start_uuid_) {
482 log_start_uuid_offset = fbb.CreateString(log_start_uuid_->ToString());
Austin Schuhb06f03b2021-02-17 22:00:37 -0800483 }
484
485 flatbuffers::Offset<flatbuffers::String> config_sha256_offset;
486 if (!config_sha256.empty()) {
487 config_sha256_offset = fbb.CreateString(config_sha256);
488 }
489
490 const flatbuffers::Offset<flatbuffers::String> logger_node_boot_uuid_offset =
Austin Schuh5e2bfb82021-03-13 22:46:55 -0800491 event_loop_->boot_uuid().PackString(&fbb);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800492
Austin Schuhb06f03b2021-02-17 22:00:37 -0800493 flatbuffers::Offset<Node> logger_node_offset;
494
495 if (configuration::MultiNode(configuration_)) {
Austin Schuh5b728b72021-06-16 14:57:15 -0700496 logger_node_offset = RecursiveCopyFlatBuffer(node_, &fbb);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800497 }
498
499 aos::logger::LogFileHeader::Builder log_file_header_builder(fbb);
500
501 log_file_header_builder.add_name(name_offset);
502
503 // Only add the node if we are running in a multinode configuration.
Austin Schuh73340842021-07-30 22:32:06 -0700504 if (configuration::MultiNode(configuration_)) {
Austin Schuhb06f03b2021-02-17 22:00:37 -0800505 log_file_header_builder.add_logger_node(logger_node_offset);
506 }
507
508 if (!configuration_offset.IsNull()) {
509 log_file_header_builder.add_configuration(configuration_offset);
510 }
511 // The worst case theoretical out of order is the polling period times 2.
512 // One message could get logged right after the boundary, but be for right
513 // before the next boundary. And the reverse could happen for another
514 // message. Report back 3x to be extra safe, and because the cost isn't
515 // huge on the read side.
516 log_file_header_builder.add_max_out_of_order_duration(
517 std::chrono::nanoseconds(3 * polling_period_).count());
518
Austin Schuhb06f03b2021-02-17 22:00:37 -0800519 log_file_header_builder.add_log_event_uuid(log_event_uuid_offset);
520 log_file_header_builder.add_logger_instance_uuid(logger_instance_uuid_offset);
521 if (!log_start_uuid_offset.IsNull()) {
522 log_file_header_builder.add_log_start_uuid(log_start_uuid_offset);
523 }
524 log_file_header_builder.add_logger_node_boot_uuid(
525 logger_node_boot_uuid_offset);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800526
527 if (!config_sha256_offset.IsNull()) {
528 log_file_header_builder.add_configuration_sha256(config_sha256_offset);
529 }
530
531 fbb.FinishSizePrefixed(log_file_header_builder.Finish());
532 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> result(
533 fbb.Release());
534
535 CHECK(result.Verify()) << ": Built a corrupted header.";
536
537 return result;
538}
539
540void Logger::ResetStatisics() {
541 max_message_fetch_time_ = std::chrono::nanoseconds::zero();
542 max_message_fetch_time_channel_ = -1;
543 max_message_fetch_time_size_ = -1;
544 total_message_fetch_time_ = std::chrono::nanoseconds::zero();
545 total_message_fetch_count_ = 0;
546 total_message_fetch_bytes_ = 0;
547 total_nop_fetch_time_ = std::chrono::nanoseconds::zero();
548 total_nop_fetch_count_ = 0;
549 max_copy_time_ = std::chrono::nanoseconds::zero();
550 max_copy_time_channel_ = -1;
551 max_copy_time_size_ = -1;
552 total_copy_time_ = std::chrono::nanoseconds::zero();
553 total_copy_count_ = 0;
554 total_copy_bytes_ = 0;
555}
556
557void Logger::Rotate() {
558 for (const Node *node : log_namer_->nodes()) {
Austin Schuh73340842021-07-30 22:32:06 -0700559 log_namer_->Rotate(node);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800560 }
561}
562
563void Logger::LogUntil(monotonic_clock::time_point t) {
564 // Grab the latest ServerStatistics message. This will always have the
565 // oppertunity to be >= to the current time, so it will always represent any
566 // reboots which may have happened.
567 WriteMissingTimestamps();
568
569 // Write each channel to disk, one at a time.
570 for (FetcherStruct &f : fetchers_) {
571 while (true) {
572 if (f.written) {
573 const auto start = event_loop_->monotonic_now();
574 const bool got_new = f.fetcher->FetchNext();
575 const auto end = event_loop_->monotonic_now();
576 RecordFetchResult(start, end, got_new, &f);
577 if (!got_new) {
578 VLOG(2) << "No new data on "
579 << configuration::CleanedChannelToString(
580 f.fetcher->channel());
581 break;
582 }
583 f.written = false;
584 }
585
586 // TODO(james): Write tests to exercise this logic.
587 if (f.fetcher->context().monotonic_event_time >= t) {
588 break;
589 }
590 if (f.writer != nullptr) {
Austin Schuh572924a2021-07-30 22:32:12 -0700591 const UUID source_node_boot_uuid =
Austin Schuh5b728b72021-06-16 14:57:15 -0700592 static_cast<int>(node_index_) != f.data_node_index
Austin Schuh572924a2021-07-30 22:32:12 -0700593 ? f.fetcher->context().source_boot_uuid
594 : event_loop_->boot_uuid();
Austin Schuhb06f03b2021-02-17 22:00:37 -0800595 // Write!
596 const auto start = event_loop_->monotonic_now();
597 flatbuffers::FlatBufferBuilder fbb(f.fetcher->context().size +
598 max_header_size_);
599 fbb.ForceDefaults(true);
600
601 fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
602 f.channel_index, f.log_type));
603 const auto end = event_loop_->monotonic_now();
604 RecordCreateMessageTime(start, end, &f);
605
606 VLOG(2) << "Writing data as node "
Austin Schuh5b728b72021-06-16 14:57:15 -0700607 << FlatbufferToJson(node_) << " for channel "
Austin Schuhb06f03b2021-02-17 22:00:37 -0800608 << configuration::CleanedChannelToString(f.fetcher->channel())
609 << " to " << f.writer->filename() << " data "
610 << FlatbufferToJson(
611 flatbuffers::GetSizePrefixedRoot<MessageHeader>(
612 fbb.GetBufferPointer()));
613
614 max_header_size_ = std::max(max_header_size_,
615 fbb.GetSize() - f.fetcher->context().size);
Austin Schuhe46492f2021-07-31 19:49:41 -0700616 f.writer->QueueMessage(&fbb, source_node_boot_uuid, end);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800617 }
618
619 if (f.timestamp_writer != nullptr) {
620 // And now handle timestamps.
621 const auto start = event_loop_->monotonic_now();
622 flatbuffers::FlatBufferBuilder fbb;
623 fbb.ForceDefaults(true);
624
625 fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
626 f.channel_index,
627 LogType::kLogDeliveryTimeOnly));
628 const auto end = event_loop_->monotonic_now();
629 RecordCreateMessageTime(start, end, &f);
630
631 VLOG(2) << "Writing timestamps as node "
Austin Schuh5b728b72021-06-16 14:57:15 -0700632 << FlatbufferToJson(node_) << " for channel "
Austin Schuhb06f03b2021-02-17 22:00:37 -0800633 << configuration::CleanedChannelToString(f.fetcher->channel())
634 << " to " << f.timestamp_writer->filename() << " timestamp "
635 << FlatbufferToJson(
636 flatbuffers::GetSizePrefixedRoot<MessageHeader>(
637 fbb.GetBufferPointer()));
638
Austin Schuhe46492f2021-07-31 19:49:41 -0700639 // Tell our writer that we know something about the remote boot.
Austin Schuh72211ae2021-08-05 14:02:30 -0700640 f.timestamp_writer->UpdateRemote(
641 f.data_node_index, f.fetcher->context().source_boot_uuid,
642 f.fetcher->context().monotonic_remote_time,
643 f.fetcher->context().monotonic_event_time, f.reliable_forwarding);
Austin Schuhe46492f2021-07-31 19:49:41 -0700644 f.timestamp_writer->QueueMessage(&fbb, event_loop_->boot_uuid(), end);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800645 }
646
647 if (f.contents_writer != nullptr) {
648 const auto start = event_loop_->monotonic_now();
649 // And now handle the special message contents channel. Copy the
650 // message into a FlatBufferBuilder and save it to disk.
651 // TODO(austin): We can be more efficient here when we start to
652 // care...
653 flatbuffers::FlatBufferBuilder fbb;
654 fbb.ForceDefaults(true);
655
656 const RemoteMessage *msg =
657 flatbuffers::GetRoot<RemoteMessage>(f.fetcher->context().data);
658
659 CHECK(msg->has_boot_uuid()) << ": " << aos::FlatbufferToJson(msg);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800660
661 logger::MessageHeader::Builder message_header_builder(fbb);
662
663 // TODO(austin): This needs to check the channel_index and confirm
664 // that it should be logged before squirreling away the timestamp to
665 // disk. We don't want to log irrelevant timestamps.
666
667 // Note: this must match the same order as MessageBridgeServer and
668 // PackMessage. We want identical headers to have identical
669 // on-the-wire formats to make comparing them easier.
670
671 // Translate from the channel index that the event loop uses to the
672 // channel index in the log file.
673 message_header_builder.add_channel_index(
674 event_loop_to_logged_channel_index_[msg->channel_index()]);
675
676 message_header_builder.add_queue_index(msg->queue_index());
677 message_header_builder.add_monotonic_sent_time(
678 msg->monotonic_sent_time());
679 message_header_builder.add_realtime_sent_time(
680 msg->realtime_sent_time());
681
682 message_header_builder.add_monotonic_remote_time(
683 msg->monotonic_remote_time());
684 message_header_builder.add_realtime_remote_time(
685 msg->realtime_remote_time());
686 message_header_builder.add_remote_queue_index(
687 msg->remote_queue_index());
688
689 message_header_builder.add_monotonic_timestamp_time(
690 f.fetcher->context()
691 .monotonic_event_time.time_since_epoch()
692 .count());
693
694 fbb.FinishSizePrefixed(message_header_builder.Finish());
695 const auto end = event_loop_->monotonic_now();
696 RecordCreateMessageTime(start, end, &f);
697
Austin Schuh5e14d842022-01-21 12:02:15 -0800698 // Timestamps tell us information about what happened too!
699 // Capture any reboots so UpdateRemote is properly recorded.
700 f.contents_writer->UpdateBoot(UUID::FromVector(msg->boot_uuid()));
701
702 // Start with recording info about the data flowing from our node to the
703 // remote.
704 f.contents_writer->UpdateRemote(
705 node_index_, event_loop_->boot_uuid(),
706 monotonic_clock::time_point(
707 chrono::nanoseconds(msg->monotonic_remote_time())),
708 monotonic_clock::time_point(
709 chrono::nanoseconds(msg->monotonic_sent_time())),
710 f.reliable_forwarding);
711
Austin Schuhe46492f2021-07-31 19:49:41 -0700712 f.contents_writer->QueueMessage(
Austin Schuh572924a2021-07-30 22:32:12 -0700713 &fbb, UUID::FromVector(msg->boot_uuid()), end);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800714 }
715
716 f.written = true;
717 }
718 }
719 last_synchronized_time_ = t;
720}
721
Austin Schuh30586902021-03-30 22:54:08 -0700722void Logger::DoLogData(const monotonic_clock::time_point end_time,
723 bool run_on_logged) {
Austin Schuhb06f03b2021-02-17 22:00:37 -0800724 // We want to guarantee that messages aren't out of order by more than
725 // max_out_of_order_duration. To do this, we need sync points. Every write
726 // cycle should be a sync point.
727
728 do {
729 // Move the sync point up by at most polling_period. This forces one sync
730 // per iteration, even if it is small.
731 LogUntil(std::min(last_synchronized_time_ + polling_period_, end_time));
732
Austin Schuh30586902021-03-30 22:54:08 -0700733 if (run_on_logged) {
734 on_logged_period_();
735 }
Austin Schuhb06f03b2021-02-17 22:00:37 -0800736
737 // If we missed cycles, we could be pretty far behind. Spin until we are
738 // caught up.
739 } while (last_synchronized_time_ + polling_period_ < end_time);
740}
741
742void Logger::RecordFetchResult(aos::monotonic_clock::time_point start,
743 aos::monotonic_clock::time_point end,
744 bool got_new, FetcherStruct *fetcher) {
745 const auto duration = end - start;
746 if (!got_new) {
747 ++total_nop_fetch_count_;
748 total_nop_fetch_time_ += duration;
749 return;
750 }
751 ++total_message_fetch_count_;
752 total_message_fetch_bytes_ += fetcher->fetcher->context().size;
753 total_message_fetch_time_ += duration;
754 if (duration > max_message_fetch_time_) {
755 max_message_fetch_time_ = duration;
756 max_message_fetch_time_channel_ = fetcher->channel_index;
757 max_message_fetch_time_size_ = fetcher->fetcher->context().size;
758 }
759}
760
761void Logger::RecordCreateMessageTime(aos::monotonic_clock::time_point start,
762 aos::monotonic_clock::time_point end,
763 FetcherStruct *fetcher) {
764 const auto duration = end - start;
765 total_copy_time_ += duration;
766 ++total_copy_count_;
767 total_copy_bytes_ += fetcher->fetcher->context().size;
768 if (duration > max_copy_time_) {
769 max_copy_time_ = duration;
770 max_copy_time_channel_ = fetcher->channel_index;
771 max_copy_time_size_ = fetcher->fetcher->context().size;
772 }
773}
774
775} // namespace logger
776} // namespace aos