blob: 0e1dece86476aace4f38e3b972758d4e9c932fca [file] [log] [blame]
Austin Schuhb06f03b2021-02-17 22:00:37 -08001#include "aos/events/logging/log_writer.h"
2
Austin Schuh6bb8a822021-03-31 23:04:39 -07003#include <dirent.h>
4
Austin Schuhb06f03b2021-02-17 22:00:37 -08005#include <functional>
6#include <map>
7#include <vector>
8
9#include "aos/configuration.h"
10#include "aos/events/event_loop.h"
11#include "aos/network/message_bridge_server_generated.h"
12#include "aos/network/team_number.h"
Austin Schuh61e973f2021-02-21 21:43:56 -080013#include "aos/network/timestamp_channel.h"
Austin Schuhb06f03b2021-02-17 22:00:37 -080014
15namespace aos {
16namespace logger {
17namespace {
18using message_bridge::RemoteMessage;
Austin Schuhbd06ae42021-03-31 22:48:21 -070019namespace chrono = std::chrono;
Austin Schuhb06f03b2021-02-17 22:00:37 -080020} // namespace
21
22Logger::Logger(EventLoop *event_loop, const Configuration *configuration,
23 std::function<bool(const Channel *)> should_log)
24 : event_loop_(event_loop),
25 configuration_(configuration),
Austin Schuh5b728b72021-06-16 14:57:15 -070026 node_(configuration::GetNode(configuration_, event_loop->node())),
27 node_index_(configuration::GetNodeIndex(configuration_, node_)),
Austin Schuhb06f03b2021-02-17 22:00:37 -080028 name_(network::GetHostname()),
29 timer_handler_(event_loop_->AddTimer(
Austin Schuh30586902021-03-30 22:54:08 -070030 [this]() { DoLogData(event_loop_->monotonic_now(), true); })),
Austin Schuhb06f03b2021-02-17 22:00:37 -080031 server_statistics_fetcher_(
32 configuration::MultiNode(event_loop_->configuration())
33 ? event_loop_->MakeFetcher<message_bridge::ServerStatistics>(
34 "/aos")
35 : aos::Fetcher<message_bridge::ServerStatistics>()) {
Austin Schuh58646e22021-08-23 23:51:46 -070036 timer_handler_->set_name("channel_poll");
Austin Schuh5b728b72021-06-16 14:57:15 -070037 VLOG(1) << "Creating logger for " << FlatbufferToJson(node_);
Austin Schuhb06f03b2021-02-17 22:00:37 -080038
Austin Schuh01f3b392022-01-25 20:03:09 -080039 // When we are logging remote timestamps, we need to be able to translate from
40 // the channel index that the event loop uses to the channel index in the
41 // config in the log file.
42 event_loop_to_logged_channel_index_.resize(
43 event_loop->configuration()->channels()->size(), -1);
44 for (size_t event_loop_channel_index = 0;
45 event_loop_channel_index <
46 event_loop->configuration()->channels()->size();
47 ++event_loop_channel_index) {
48 const Channel *event_loop_channel =
49 event_loop->configuration()->channels()->Get(event_loop_channel_index);
50
51 const Channel *logged_channel = aos::configuration::GetChannel(
52 configuration_, event_loop_channel->name()->string_view(),
53 event_loop_channel->type()->string_view(), "", node_);
54
55 if (logged_channel != nullptr) {
56 event_loop_to_logged_channel_index_[event_loop_channel_index] =
57 configuration::ChannelIndex(configuration_, logged_channel);
58 }
59 }
60
61 // Map to match source channels with the timestamp logger, if the contents
62 // should be reliable, and a list of all channels logged on it to be treated
63 // as reliable.
64 std::map<const Channel *, std::tuple<const Node *, bool, std::vector<bool>>>
65 timestamp_logger_channels;
Austin Schuhb06f03b2021-02-17 22:00:37 -080066
Austin Schuh61e973f2021-02-21 21:43:56 -080067 message_bridge::ChannelTimestampFinder finder(event_loop_);
68 for (const Channel *channel : *event_loop_->configuration()->channels()) {
69 if (!configuration::ChannelIsSendableOnNode(channel, event_loop_->node())) {
Austin Schuhb06f03b2021-02-17 22:00:37 -080070 continue;
71 }
Austin Schuh61e973f2021-02-21 21:43:56 -080072 if (!channel->has_destination_nodes()) {
73 continue;
74 }
Austin Schuh01f3b392022-01-25 20:03:09 -080075 const size_t channel_index =
76 configuration::ChannelIndex(event_loop_->configuration(), channel);
77
Austin Schuh61e973f2021-02-21 21:43:56 -080078 for (const Connection *connection : *channel->destination_nodes()) {
79 if (configuration::ConnectionDeliveryTimeIsLoggedOnNode(
80 connection, event_loop_->node())) {
81 const Node *other_node = configuration::GetNode(
Austin Schuh5b728b72021-06-16 14:57:15 -070082 configuration_, connection->name()->string_view());
Austin Schuh61e973f2021-02-21 21:43:56 -080083
84 VLOG(1) << "Timestamps are logged from "
85 << FlatbufferToJson(other_node);
Austin Schuh01f3b392022-01-25 20:03:09 -080086 // True if each channel's remote timestamps are split into a separate
87 // RemoteMessage channel.
88 const bool is_split =
89 finder.SplitChannelForChannel(channel, connection) != nullptr;
90
91 const Channel *const timestamp_logger_channel =
92 finder.ForChannel(channel, connection);
93
94 auto it = timestamp_logger_channels.find(timestamp_logger_channel);
95 if (it != timestamp_logger_channels.end()) {
96 CHECK(!is_split);
97 CHECK_LT(channel_index, std::get<2>(it->second).size());
98 std::get<2>(it->second)[channel_index] = (connection->time_to_live() == 0);
99 } else {
100 if (is_split) {
101 timestamp_logger_channels.insert(std::make_pair(
102 timestamp_logger_channel,
103 std::make_tuple(other_node, (connection->time_to_live() == 0),
104 std::vector<bool>())));
105 } else {
106 std::vector<bool> channel_reliable_contents(
107 event_loop->configuration()->channels()->size(), false);
108 channel_reliable_contents[channel_index] =
109 (connection->time_to_live() == 0);
110
111 timestamp_logger_channels.insert(std::make_pair(
112 timestamp_logger_channel,
113 std::make_tuple(other_node, false,
114 std::move(channel_reliable_contents))));
115 }
116 }
Austin Schuh61e973f2021-02-21 21:43:56 -0800117 }
118 }
Austin Schuhb06f03b2021-02-17 22:00:37 -0800119 }
120
Austin Schuhb06f03b2021-02-17 22:00:37 -0800121 for (size_t channel_index = 0;
122 channel_index < configuration_->channels()->size(); ++channel_index) {
123 const Channel *const config_channel =
124 configuration_->channels()->Get(channel_index);
125 // The MakeRawFetcher method needs a channel which is in the event loop
126 // configuration() object, not the configuration_ object. Go look that up
127 // from the config.
128 const Channel *channel = aos::configuration::GetChannel(
129 event_loop_->configuration(), config_channel->name()->string_view(),
130 config_channel->type()->string_view(), "", event_loop_->node());
131 CHECK(channel != nullptr)
132 << ": Failed to look up channel "
133 << aos::configuration::CleanedChannelToString(config_channel);
Austin Schuh5b728b72021-06-16 14:57:15 -0700134 if (!should_log(config_channel)) {
Austin Schuhb06f03b2021-02-17 22:00:37 -0800135 continue;
136 }
137
138 FetcherStruct fs;
139 fs.channel_index = channel_index;
140 fs.channel = channel;
141
142 const bool is_local =
Austin Schuh5b728b72021-06-16 14:57:15 -0700143 configuration::ChannelIsSendableOnNode(config_channel, node_);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800144
145 const bool is_readable =
Austin Schuh5b728b72021-06-16 14:57:15 -0700146 configuration::ChannelIsReadableOnNode(config_channel, node_);
Austin Schuh01f3b392022-01-25 20:03:09 -0800147 const bool is_logged =
148 configuration::ChannelMessageIsLoggedOnNode(config_channel, node_);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800149 const bool log_message = is_logged && is_readable;
150
151 bool log_delivery_times = false;
Austin Schuh5b728b72021-06-16 14:57:15 -0700152 if (configuration::MultiNode(configuration_)) {
Austin Schuh72211ae2021-08-05 14:02:30 -0700153 const aos::Connection *connection =
Austin Schuh5b728b72021-06-16 14:57:15 -0700154 configuration::ConnectionToNode(config_channel, node_);
Austin Schuh72211ae2021-08-05 14:02:30 -0700155
Austin Schuhb06f03b2021-02-17 22:00:37 -0800156 log_delivery_times = configuration::ConnectionDeliveryTimeIsLoggedOnNode(
Austin Schuh72211ae2021-08-05 14:02:30 -0700157 connection, event_loop_->node());
158
159 CHECK_EQ(log_delivery_times,
160 configuration::ConnectionDeliveryTimeIsLoggedOnNode(
Austin Schuh5b728b72021-06-16 14:57:15 -0700161 config_channel, node_, node_));
Austin Schuh72211ae2021-08-05 14:02:30 -0700162
163 if (connection) {
164 fs.reliable_forwarding = (connection->time_to_live() == 0);
165 }
Austin Schuhb06f03b2021-02-17 22:00:37 -0800166 }
167
Austin Schuh01f3b392022-01-25 20:03:09 -0800168 // Now, detect a RemoteMessage timestamp logger where we should just log
169 // the contents to a file directly.
Austin Schuhb06f03b2021-02-17 22:00:37 -0800170 const bool log_contents = timestamp_logger_channels.find(channel) !=
171 timestamp_logger_channels.end();
172
173 if (log_message || log_delivery_times || log_contents) {
174 fs.fetcher = event_loop->MakeRawFetcher(channel);
175 VLOG(1) << "Logging channel "
176 << configuration::CleanedChannelToString(channel);
177
178 if (log_delivery_times) {
179 VLOG(1) << " Delivery times";
180 fs.wants_timestamp_writer = true;
Austin Schuh5b728b72021-06-16 14:57:15 -0700181 fs.timestamp_node_index = static_cast<int>(node_index_);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800182 }
Austin Schuhe46492f2021-07-31 19:49:41 -0700183 // Both the timestamp and data writers want data_node_index so it knows
184 // what the source node is.
185 if (log_message || log_delivery_times) {
Austin Schuhb06f03b2021-02-17 22:00:37 -0800186 if (!is_local) {
187 const Node *source_node = configuration::GetNode(
188 configuration_, channel->source_node()->string_view());
189 fs.data_node_index =
190 configuration::GetNodeIndex(configuration_, source_node);
Austin Schuhe46492f2021-07-31 19:49:41 -0700191 }
192 }
193 if (log_message) {
194 VLOG(1) << " Data";
195 fs.wants_writer = true;
196 if (!is_local) {
Austin Schuhb06f03b2021-02-17 22:00:37 -0800197 fs.log_type = LogType::kLogRemoteMessage;
198 } else {
Austin Schuh5b728b72021-06-16 14:57:15 -0700199 fs.data_node_index = static_cast<int>(node_index_);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800200 }
201 }
202 if (log_contents) {
203 VLOG(1) << "Timestamp logger channel "
204 << configuration::CleanedChannelToString(channel);
Austin Schuh01f3b392022-01-25 20:03:09 -0800205 auto timestamp_logger_channel_info =
206 timestamp_logger_channels.find(channel);
207 CHECK(timestamp_logger_channel_info != timestamp_logger_channels.end());
208 fs.timestamp_node = std::get<0>(timestamp_logger_channel_info->second);
209 fs.reliable_contents =
210 std::get<1>(timestamp_logger_channel_info->second);
211 fs.channel_reliable_contents =
212 std::get<2>(timestamp_logger_channel_info->second);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800213 fs.wants_contents_writer = true;
214 fs.contents_node_index =
215 configuration::GetNodeIndex(configuration_, fs.timestamp_node);
216 }
217 fetchers_.emplace_back(std::move(fs));
218 }
219 }
Austin Schuhb06f03b2021-02-17 22:00:37 -0800220}
221
222Logger::~Logger() {
223 if (log_namer_) {
224 // If we are replaying a log file, or in simulation, we want to force the
225 // last bit of data to be logged. The easiest way to deal with this is to
Austin Schuh01f3b392022-01-25 20:03:09 -0800226 // poll everything as we go to destroy the class, ie, shut down the
227 // logger, and write it to disk.
Austin Schuhb06f03b2021-02-17 22:00:37 -0800228 StopLogging(event_loop_->monotonic_now());
229 }
230}
231
Austin Schuh6bb8a822021-03-31 23:04:39 -0700232bool Logger::RenameLogBase(std::string new_base_name) {
233 if (new_base_name == log_namer_->base_name()) {
234 return true;
235 }
236 std::string current_directory = std::string(log_namer_->base_name());
237 std::string new_directory = new_base_name;
238
239 auto current_path_split = current_directory.rfind("/");
240 auto new_path_split = new_directory.rfind("/");
241
242 CHECK(new_base_name.substr(new_path_split) ==
243 current_directory.substr(current_path_split))
244 << "Rename of file base from " << current_directory << " to "
245 << new_directory << " is not supported.";
246
247 current_directory.resize(current_path_split);
248 new_directory.resize(new_path_split);
249 DIR *dir = opendir(current_directory.c_str());
250 if (dir) {
251 closedir(dir);
252 const int result = rename(current_directory.c_str(), new_directory.c_str());
253 if (result != 0) {
254 PLOG(ERROR) << "Unable to rename " << current_directory << " to "
255 << new_directory;
256 return false;
257 }
258 } else {
259 // Handle if directory was already renamed.
260 dir = opendir(new_directory.c_str());
261 if (!dir) {
262 LOG(ERROR) << "Old directory " << current_directory
263 << " missing and new directory " << new_directory
264 << " not present.";
265 return false;
266 }
267 closedir(dir);
268 }
269
270 log_namer_->set_base_name(new_base_name);
271 Rotate();
272 return true;
273}
274
Austin Schuhb06f03b2021-02-17 22:00:37 -0800275void Logger::StartLogging(std::unique_ptr<LogNamer> log_namer,
Austin Schuh34f9e482021-03-31 22:54:18 -0700276 std::optional<UUID> log_start_uuid) {
Austin Schuhb06f03b2021-02-17 22:00:37 -0800277 CHECK(!log_namer_) << ": Already logging";
278 log_namer_ = std::move(log_namer);
279
280 std::string config_sha256;
281 if (separate_config_) {
282 flatbuffers::FlatBufferBuilder fbb;
283 flatbuffers::Offset<aos::Configuration> configuration_offset =
284 CopyFlatBuffer(configuration_, &fbb);
285 LogFileHeader::Builder log_file_header_builder(fbb);
286 log_file_header_builder.add_configuration(configuration_offset);
287 fbb.FinishSizePrefixed(log_file_header_builder.Finish());
288 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config_header(
289 fbb.Release());
290 config_sha256 = Sha256(config_header.span());
291 LOG(INFO) << "Config sha256 of " << config_sha256;
292 log_namer_->WriteConfiguration(&config_header, config_sha256);
293 }
294
295 log_event_uuid_ = UUID::Random();
296 log_start_uuid_ = log_start_uuid;
Austin Schuh5b728b72021-06-16 14:57:15 -0700297 VLOG(1) << "Starting logger for " << FlatbufferToJson(node_);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800298
299 // We want to do as much work as possible before the initial Fetch. Time
300 // between that and actually starting to log opens up the possibility of
301 // falling off the end of the queue during that time.
302
303 for (FetcherStruct &f : fetchers_) {
304 if (f.wants_writer) {
305 f.writer = log_namer_->MakeWriter(f.channel);
306 }
307 if (f.wants_timestamp_writer) {
308 f.timestamp_writer = log_namer_->MakeTimestampWriter(f.channel);
309 }
310 if (f.wants_contents_writer) {
311 f.contents_writer = log_namer_->MakeForwardedTimestampWriter(
312 f.channel, CHECK_NOTNULL(f.timestamp_node));
313 }
314 }
315
Austin Schuh73340842021-07-30 22:32:06 -0700316 log_namer_->SetHeaderTemplate(MakeHeader(config_sha256));
Austin Schuhb06f03b2021-02-17 22:00:37 -0800317
Austin Schuha42ee962021-03-31 22:49:30 -0700318 const aos::monotonic_clock::time_point beginning_time =
319 event_loop_->monotonic_now();
320
Austin Schuhb06f03b2021-02-17 22:00:37 -0800321 // Grab data from each channel right before we declare the log file started
322 // so we can capture the latest message on each channel. This lets us have
323 // non periodic messages with configuration that now get logged.
324 for (FetcherStruct &f : fetchers_) {
325 const auto start = event_loop_->monotonic_now();
326 const bool got_new = f.fetcher->Fetch();
327 const auto end = event_loop_->monotonic_now();
328 RecordFetchResult(start, end, got_new, &f);
329
330 // If there is a message, we want to write it.
331 f.written = f.fetcher->context().data == nullptr;
332 }
333
334 // Clear out any old timestamps in case we are re-starting logging.
Austin Schuh572924a2021-07-30 22:32:12 -0700335 for (size_t i = 0; i < configuration::NodesCount(configuration_); ++i) {
Austin Schuh58646e22021-08-23 23:51:46 -0700336 log_namer_->ClearStartTimes();
Austin Schuhb06f03b2021-02-17 22:00:37 -0800337 }
338
Austin Schuha42ee962021-03-31 22:49:30 -0700339 const aos::monotonic_clock::time_point fetch_time =
340 event_loop_->monotonic_now();
Austin Schuhb06f03b2021-02-17 22:00:37 -0800341 WriteHeader();
Austin Schuha42ee962021-03-31 22:49:30 -0700342 const aos::monotonic_clock::time_point header_time =
343 event_loop_->monotonic_now();
Austin Schuhb06f03b2021-02-17 22:00:37 -0800344
Austin Schuh5b728b72021-06-16 14:57:15 -0700345 LOG(INFO) << "Logging node as " << FlatbufferToJson(node_)
Austin Schuha42ee962021-03-31 22:49:30 -0700346 << " start_time " << last_synchronized_time_ << ", took "
347 << chrono::duration<double>(fetch_time - beginning_time).count()
348 << " to fetch, "
349 << chrono::duration<double>(header_time - fetch_time).count()
350 << " to write headers, boot uuid " << event_loop_->boot_uuid();
Austin Schuhb06f03b2021-02-17 22:00:37 -0800351
352 // Force logging up until the start of the log file now, so the messages at
353 // the start are always ordered before the rest of the messages.
354 // Note: this ship may have already sailed, but we don't have to make it
355 // worse.
356 // TODO(austin): Test...
Austin Schuh855f8932021-03-19 22:01:32 -0700357 //
358 // This is safe to call here since we have set last_synchronized_time_ as the
359 // same time as in the header, and all the data before it should be logged
360 // without ordering concerns.
Austin Schuhb06f03b2021-02-17 22:00:37 -0800361 LogUntil(last_synchronized_time_);
362
363 timer_handler_->Setup(event_loop_->monotonic_now() + polling_period_,
364 polling_period_);
365}
366
367std::unique_ptr<LogNamer> Logger::StopLogging(
368 aos::monotonic_clock::time_point end_time) {
369 CHECK(log_namer_) << ": Not logging right now";
370
371 if (end_time != aos::monotonic_clock::min_time) {
Austin Schuh30586902021-03-30 22:54:08 -0700372 // Folks like to use the on_logged_period_ callback to trigger stop and
373 // start events. We can't have those then recurse and try to stop again.
374 // Rather than making everything reentrant, let's just instead block the
375 // callback here.
376 DoLogData(end_time, false);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800377 }
378 timer_handler_->Disable();
379
380 for (FetcherStruct &f : fetchers_) {
381 f.writer = nullptr;
382 f.timestamp_writer = nullptr;
383 f.contents_writer = nullptr;
384 }
Austin Schuhb06f03b2021-02-17 22:00:37 -0800385
386 log_event_uuid_ = UUID::Zero();
Austin Schuh34f9e482021-03-31 22:54:18 -0700387 log_start_uuid_ = std::nullopt;
Austin Schuhb06f03b2021-02-17 22:00:37 -0800388
389 return std::move(log_namer_);
390}
391
392void Logger::WriteHeader() {
393 if (configuration::MultiNode(configuration_)) {
394 server_statistics_fetcher_.Fetch();
395 }
396
Austin Schuh73340842021-07-30 22:32:06 -0700397 const aos::monotonic_clock::time_point monotonic_start_time =
Austin Schuhb06f03b2021-02-17 22:00:37 -0800398 event_loop_->monotonic_now();
Austin Schuh73340842021-07-30 22:32:06 -0700399 const aos::realtime_clock::time_point realtime_start_time =
Austin Schuhb06f03b2021-02-17 22:00:37 -0800400 event_loop_->realtime_now();
401
402 // We need to pick a point in time to declare the log file "started". This
403 // starts here. It needs to be after everything is fetched so that the
404 // fetchers are all pointed at the most recent message before the start
405 // time.
406 last_synchronized_time_ = monotonic_start_time;
407
408 for (const Node *node : log_namer_->nodes()) {
409 const int node_index = configuration::GetNodeIndex(configuration_, node);
410 MaybeUpdateTimestamp(node, node_index, monotonic_start_time,
411 realtime_start_time);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800412 }
413}
414
Austin Schuhb06f03b2021-02-17 22:00:37 -0800415void Logger::WriteMissingTimestamps() {
416 if (configuration::MultiNode(configuration_)) {
417 server_statistics_fetcher_.Fetch();
418 } else {
419 return;
420 }
421
422 if (server_statistics_fetcher_.get() == nullptr) {
423 return;
424 }
425
426 for (const Node *node : log_namer_->nodes()) {
427 const int node_index = configuration::GetNodeIndex(configuration_, node);
428 if (MaybeUpdateTimestamp(
429 node, node_index,
430 server_statistics_fetcher_.context().monotonic_event_time,
431 server_statistics_fetcher_.context().realtime_event_time)) {
Austin Schuh58646e22021-08-23 23:51:46 -0700432 VLOG(1) << "Timestamps changed on " << aos::FlatbufferToJson(node);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800433 }
434 }
435}
436
Austin Schuhb06f03b2021-02-17 22:00:37 -0800437bool Logger::MaybeUpdateTimestamp(
438 const Node *node, int node_index,
439 aos::monotonic_clock::time_point monotonic_start_time,
440 aos::realtime_clock::time_point realtime_start_time) {
441 // Bail early if the start times are already set.
Austin Schuh58646e22021-08-23 23:51:46 -0700442 if (node_ == node || !configuration::MultiNode(configuration_)) {
443 if (log_namer_->monotonic_start_time(node_index,
444 event_loop_->boot_uuid()) !=
445 monotonic_clock::min_time) {
446 return false;
447 }
Austin Schuhb06f03b2021-02-17 22:00:37 -0800448 // There are no offsets to compute for ourself, so always succeed.
Austin Schuh58646e22021-08-23 23:51:46 -0700449 log_namer_->SetStartTimes(node_index, event_loop_->boot_uuid(),
450 monotonic_start_time, realtime_start_time,
451 monotonic_start_time, realtime_start_time);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800452 return true;
453 } else if (server_statistics_fetcher_.get() != nullptr) {
454 // We must be a remote node now. Look for the connection and see if it is
455 // connected.
James Kuszmaul17607fb2021-10-15 20:00:32 -0700456 CHECK(server_statistics_fetcher_->has_connections());
Austin Schuhb06f03b2021-02-17 22:00:37 -0800457
458 for (const message_bridge::ServerConnection *connection :
459 *server_statistics_fetcher_->connections()) {
460 if (connection->node()->name()->string_view() !=
461 node->name()->string_view()) {
462 continue;
463 }
464
465 if (connection->state() != message_bridge::State::CONNECTED) {
466 VLOG(1) << node->name()->string_view()
467 << " is not connected, can't start it yet.";
468 break;
469 }
470
Austin Schuhb06f03b2021-02-17 22:00:37 -0800471 if (!connection->has_monotonic_offset()) {
472 VLOG(1) << "Missing monotonic offset for setting start time for node "
473 << aos::FlatbufferToJson(node);
474 break;
475 }
476
James Kuszmaul17607fb2021-10-15 20:00:32 -0700477 CHECK(connection->has_boot_uuid());
Austin Schuh58646e22021-08-23 23:51:46 -0700478 const UUID boot_uuid =
479 UUID::FromString(connection->boot_uuid()->string_view());
480
481 if (log_namer_->monotonic_start_time(node_index, boot_uuid) !=
482 monotonic_clock::min_time) {
483 break;
484 }
485
486 VLOG(1) << "Updating start time for "
487 << aos::FlatbufferToJson(connection);
488
Austin Schuhb06f03b2021-02-17 22:00:37 -0800489 // Found it and it is connected. Compensate and go.
Austin Schuh73340842021-07-30 22:32:06 -0700490 log_namer_->SetStartTimes(
Austin Schuh58646e22021-08-23 23:51:46 -0700491 node_index, boot_uuid,
Austin Schuh73340842021-07-30 22:32:06 -0700492 monotonic_start_time +
493 std::chrono::nanoseconds(connection->monotonic_offset()),
494 realtime_start_time, monotonic_start_time, realtime_start_time);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800495 return true;
496 }
497 }
498 return false;
499}
500
501aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> Logger::MakeHeader(
Austin Schuh73340842021-07-30 22:32:06 -0700502 std::string_view config_sha256) {
Austin Schuhb06f03b2021-02-17 22:00:37 -0800503 flatbuffers::FlatBufferBuilder fbb;
504 fbb.ForceDefaults(true);
505
506 flatbuffers::Offset<aos::Configuration> configuration_offset;
507 if (!separate_config_) {
508 configuration_offset = CopyFlatBuffer(configuration_, &fbb);
509 } else {
510 CHECK(!config_sha256.empty());
511 }
512
513 const flatbuffers::Offset<flatbuffers::String> name_offset =
514 fbb.CreateString(name_);
515
516 CHECK(log_event_uuid_ != UUID::Zero());
517 const flatbuffers::Offset<flatbuffers::String> log_event_uuid_offset =
Austin Schuh5e2bfb82021-03-13 22:46:55 -0800518 log_event_uuid_.PackString(&fbb);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800519
520 const flatbuffers::Offset<flatbuffers::String> logger_instance_uuid_offset =
Austin Schuh5e2bfb82021-03-13 22:46:55 -0800521 logger_instance_uuid_.PackString(&fbb);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800522
523 flatbuffers::Offset<flatbuffers::String> log_start_uuid_offset;
Austin Schuh34f9e482021-03-31 22:54:18 -0700524 if (log_start_uuid_) {
525 log_start_uuid_offset = fbb.CreateString(log_start_uuid_->ToString());
Austin Schuhb06f03b2021-02-17 22:00:37 -0800526 }
527
528 flatbuffers::Offset<flatbuffers::String> config_sha256_offset;
529 if (!config_sha256.empty()) {
530 config_sha256_offset = fbb.CreateString(config_sha256);
531 }
532
533 const flatbuffers::Offset<flatbuffers::String> logger_node_boot_uuid_offset =
Austin Schuh5e2bfb82021-03-13 22:46:55 -0800534 event_loop_->boot_uuid().PackString(&fbb);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800535
Austin Schuhb06f03b2021-02-17 22:00:37 -0800536 flatbuffers::Offset<Node> logger_node_offset;
537
538 if (configuration::MultiNode(configuration_)) {
Austin Schuh5b728b72021-06-16 14:57:15 -0700539 logger_node_offset = RecursiveCopyFlatBuffer(node_, &fbb);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800540 }
541
542 aos::logger::LogFileHeader::Builder log_file_header_builder(fbb);
543
544 log_file_header_builder.add_name(name_offset);
545
546 // Only add the node if we are running in a multinode configuration.
Austin Schuh73340842021-07-30 22:32:06 -0700547 if (configuration::MultiNode(configuration_)) {
Austin Schuhb06f03b2021-02-17 22:00:37 -0800548 log_file_header_builder.add_logger_node(logger_node_offset);
549 }
550
551 if (!configuration_offset.IsNull()) {
552 log_file_header_builder.add_configuration(configuration_offset);
553 }
554 // The worst case theoretical out of order is the polling period times 2.
555 // One message could get logged right after the boundary, but be for right
556 // before the next boundary. And the reverse could happen for another
557 // message. Report back 3x to be extra safe, and because the cost isn't
558 // huge on the read side.
559 log_file_header_builder.add_max_out_of_order_duration(
560 std::chrono::nanoseconds(3 * polling_period_).count());
561
Austin Schuhb06f03b2021-02-17 22:00:37 -0800562 log_file_header_builder.add_log_event_uuid(log_event_uuid_offset);
563 log_file_header_builder.add_logger_instance_uuid(logger_instance_uuid_offset);
564 if (!log_start_uuid_offset.IsNull()) {
565 log_file_header_builder.add_log_start_uuid(log_start_uuid_offset);
566 }
567 log_file_header_builder.add_logger_node_boot_uuid(
568 logger_node_boot_uuid_offset);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800569
570 if (!config_sha256_offset.IsNull()) {
571 log_file_header_builder.add_configuration_sha256(config_sha256_offset);
572 }
573
574 fbb.FinishSizePrefixed(log_file_header_builder.Finish());
575 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> result(
576 fbb.Release());
577
578 CHECK(result.Verify()) << ": Built a corrupted header.";
579
580 return result;
581}
582
583void Logger::ResetStatisics() {
584 max_message_fetch_time_ = std::chrono::nanoseconds::zero();
585 max_message_fetch_time_channel_ = -1;
586 max_message_fetch_time_size_ = -1;
587 total_message_fetch_time_ = std::chrono::nanoseconds::zero();
588 total_message_fetch_count_ = 0;
589 total_message_fetch_bytes_ = 0;
590 total_nop_fetch_time_ = std::chrono::nanoseconds::zero();
591 total_nop_fetch_count_ = 0;
592 max_copy_time_ = std::chrono::nanoseconds::zero();
593 max_copy_time_channel_ = -1;
594 max_copy_time_size_ = -1;
595 total_copy_time_ = std::chrono::nanoseconds::zero();
596 total_copy_count_ = 0;
597 total_copy_bytes_ = 0;
598}
599
600void Logger::Rotate() {
601 for (const Node *node : log_namer_->nodes()) {
Austin Schuh73340842021-07-30 22:32:06 -0700602 log_namer_->Rotate(node);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800603 }
604}
605
606void Logger::LogUntil(monotonic_clock::time_point t) {
607 // Grab the latest ServerStatistics message. This will always have the
608 // oppertunity to be >= to the current time, so it will always represent any
609 // reboots which may have happened.
610 WriteMissingTimestamps();
611
612 // Write each channel to disk, one at a time.
613 for (FetcherStruct &f : fetchers_) {
614 while (true) {
615 if (f.written) {
616 const auto start = event_loop_->monotonic_now();
617 const bool got_new = f.fetcher->FetchNext();
618 const auto end = event_loop_->monotonic_now();
619 RecordFetchResult(start, end, got_new, &f);
620 if (!got_new) {
621 VLOG(2) << "No new data on "
622 << configuration::CleanedChannelToString(
623 f.fetcher->channel());
624 break;
625 }
626 f.written = false;
627 }
628
629 // TODO(james): Write tests to exercise this logic.
630 if (f.fetcher->context().monotonic_event_time >= t) {
631 break;
632 }
633 if (f.writer != nullptr) {
Austin Schuh572924a2021-07-30 22:32:12 -0700634 const UUID source_node_boot_uuid =
Austin Schuh5b728b72021-06-16 14:57:15 -0700635 static_cast<int>(node_index_) != f.data_node_index
Austin Schuh572924a2021-07-30 22:32:12 -0700636 ? f.fetcher->context().source_boot_uuid
637 : event_loop_->boot_uuid();
Austin Schuhb06f03b2021-02-17 22:00:37 -0800638 // Write!
639 const auto start = event_loop_->monotonic_now();
640 flatbuffers::FlatBufferBuilder fbb(f.fetcher->context().size +
641 max_header_size_);
642 fbb.ForceDefaults(true);
643
644 fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
645 f.channel_index, f.log_type));
646 const auto end = event_loop_->monotonic_now();
647 RecordCreateMessageTime(start, end, &f);
648
649 VLOG(2) << "Writing data as node "
Austin Schuh5b728b72021-06-16 14:57:15 -0700650 << FlatbufferToJson(node_) << " for channel "
Austin Schuhb06f03b2021-02-17 22:00:37 -0800651 << configuration::CleanedChannelToString(f.fetcher->channel())
652 << " to " << f.writer->filename() << " data "
653 << FlatbufferToJson(
654 flatbuffers::GetSizePrefixedRoot<MessageHeader>(
655 fbb.GetBufferPointer()));
656
657 max_header_size_ = std::max(max_header_size_,
658 fbb.GetSize() - f.fetcher->context().size);
Austin Schuhe46492f2021-07-31 19:49:41 -0700659 f.writer->QueueMessage(&fbb, source_node_boot_uuid, end);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800660 }
661
662 if (f.timestamp_writer != nullptr) {
663 // And now handle timestamps.
664 const auto start = event_loop_->monotonic_now();
665 flatbuffers::FlatBufferBuilder fbb;
666 fbb.ForceDefaults(true);
667
668 fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
669 f.channel_index,
670 LogType::kLogDeliveryTimeOnly));
671 const auto end = event_loop_->monotonic_now();
672 RecordCreateMessageTime(start, end, &f);
673
674 VLOG(2) << "Writing timestamps as node "
Austin Schuh5b728b72021-06-16 14:57:15 -0700675 << FlatbufferToJson(node_) << " for channel "
Austin Schuhb06f03b2021-02-17 22:00:37 -0800676 << configuration::CleanedChannelToString(f.fetcher->channel())
677 << " to " << f.timestamp_writer->filename() << " timestamp "
678 << FlatbufferToJson(
679 flatbuffers::GetSizePrefixedRoot<MessageHeader>(
680 fbb.GetBufferPointer()));
681
Austin Schuhe46492f2021-07-31 19:49:41 -0700682 // Tell our writer that we know something about the remote boot.
Austin Schuh72211ae2021-08-05 14:02:30 -0700683 f.timestamp_writer->UpdateRemote(
684 f.data_node_index, f.fetcher->context().source_boot_uuid,
685 f.fetcher->context().monotonic_remote_time,
686 f.fetcher->context().monotonic_event_time, f.reliable_forwarding);
Austin Schuhe46492f2021-07-31 19:49:41 -0700687 f.timestamp_writer->QueueMessage(&fbb, event_loop_->boot_uuid(), end);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800688 }
689
690 if (f.contents_writer != nullptr) {
691 const auto start = event_loop_->monotonic_now();
692 // And now handle the special message contents channel. Copy the
693 // message into a FlatBufferBuilder and save it to disk.
694 // TODO(austin): We can be more efficient here when we start to
695 // care...
696 flatbuffers::FlatBufferBuilder fbb;
697 fbb.ForceDefaults(true);
698
699 const RemoteMessage *msg =
700 flatbuffers::GetRoot<RemoteMessage>(f.fetcher->context().data);
701
702 CHECK(msg->has_boot_uuid()) << ": " << aos::FlatbufferToJson(msg);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800703
704 logger::MessageHeader::Builder message_header_builder(fbb);
705
706 // TODO(austin): This needs to check the channel_index and confirm
707 // that it should be logged before squirreling away the timestamp to
708 // disk. We don't want to log irrelevant timestamps.
709
710 // Note: this must match the same order as MessageBridgeServer and
711 // PackMessage. We want identical headers to have identical
712 // on-the-wire formats to make comparing them easier.
713
714 // Translate from the channel index that the event loop uses to the
715 // channel index in the log file.
716 message_header_builder.add_channel_index(
717 event_loop_to_logged_channel_index_[msg->channel_index()]);
718
719 message_header_builder.add_queue_index(msg->queue_index());
720 message_header_builder.add_monotonic_sent_time(
721 msg->monotonic_sent_time());
722 message_header_builder.add_realtime_sent_time(
723 msg->realtime_sent_time());
724
725 message_header_builder.add_monotonic_remote_time(
726 msg->monotonic_remote_time());
727 message_header_builder.add_realtime_remote_time(
728 msg->realtime_remote_time());
729 message_header_builder.add_remote_queue_index(
730 msg->remote_queue_index());
731
Austin Schuhf5f99f32022-02-07 20:05:37 -0800732 const aos::monotonic_clock::time_point monotonic_timestamp_time =
733 f.fetcher->context().monotonic_event_time;
Austin Schuhb06f03b2021-02-17 22:00:37 -0800734 message_header_builder.add_monotonic_timestamp_time(
Austin Schuhf5f99f32022-02-07 20:05:37 -0800735 monotonic_timestamp_time.time_since_epoch().count());
Austin Schuhb06f03b2021-02-17 22:00:37 -0800736
737 fbb.FinishSizePrefixed(message_header_builder.Finish());
738 const auto end = event_loop_->monotonic_now();
739 RecordCreateMessageTime(start, end, &f);
740
Austin Schuh5e14d842022-01-21 12:02:15 -0800741 // Timestamps tell us information about what happened too!
742 // Capture any reboots so UpdateRemote is properly recorded.
743 f.contents_writer->UpdateBoot(UUID::FromVector(msg->boot_uuid()));
744
745 // Start with recording info about the data flowing from our node to the
746 // remote.
Austin Schuh01f3b392022-01-25 20:03:09 -0800747 const bool reliable =
748 f.channel_reliable_contents.size() != 0u
749 ? f.channel_reliable_contents[msg->channel_index()]
750 : f.reliable_contents;
751
Austin Schuh5e14d842022-01-21 12:02:15 -0800752 f.contents_writer->UpdateRemote(
753 node_index_, event_loop_->boot_uuid(),
754 monotonic_clock::time_point(
755 chrono::nanoseconds(msg->monotonic_remote_time())),
756 monotonic_clock::time_point(
757 chrono::nanoseconds(msg->monotonic_sent_time())),
Austin Schuhf5f99f32022-02-07 20:05:37 -0800758 reliable, monotonic_timestamp_time);
Austin Schuh5e14d842022-01-21 12:02:15 -0800759
Austin Schuhe46492f2021-07-31 19:49:41 -0700760 f.contents_writer->QueueMessage(
Austin Schuh572924a2021-07-30 22:32:12 -0700761 &fbb, UUID::FromVector(msg->boot_uuid()), end);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800762 }
763
764 f.written = true;
765 }
766 }
767 last_synchronized_time_ = t;
768}
769
Austin Schuh30586902021-03-30 22:54:08 -0700770void Logger::DoLogData(const monotonic_clock::time_point end_time,
771 bool run_on_logged) {
Austin Schuhb06f03b2021-02-17 22:00:37 -0800772 // We want to guarantee that messages aren't out of order by more than
773 // max_out_of_order_duration. To do this, we need sync points. Every write
774 // cycle should be a sync point.
775
776 do {
777 // Move the sync point up by at most polling_period. This forces one sync
778 // per iteration, even if it is small.
779 LogUntil(std::min(last_synchronized_time_ + polling_period_, end_time));
780
Austin Schuh30586902021-03-30 22:54:08 -0700781 if (run_on_logged) {
782 on_logged_period_();
783 }
Austin Schuhb06f03b2021-02-17 22:00:37 -0800784
785 // If we missed cycles, we could be pretty far behind. Spin until we are
786 // caught up.
787 } while (last_synchronized_time_ + polling_period_ < end_time);
788}
789
790void Logger::RecordFetchResult(aos::monotonic_clock::time_point start,
791 aos::monotonic_clock::time_point end,
792 bool got_new, FetcherStruct *fetcher) {
793 const auto duration = end - start;
794 if (!got_new) {
795 ++total_nop_fetch_count_;
796 total_nop_fetch_time_ += duration;
797 return;
798 }
799 ++total_message_fetch_count_;
800 total_message_fetch_bytes_ += fetcher->fetcher->context().size;
801 total_message_fetch_time_ += duration;
802 if (duration > max_message_fetch_time_) {
803 max_message_fetch_time_ = duration;
804 max_message_fetch_time_channel_ = fetcher->channel_index;
805 max_message_fetch_time_size_ = fetcher->fetcher->context().size;
806 }
807}
808
809void Logger::RecordCreateMessageTime(aos::monotonic_clock::time_point start,
810 aos::monotonic_clock::time_point end,
811 FetcherStruct *fetcher) {
812 const auto duration = end - start;
813 total_copy_time_ += duration;
814 ++total_copy_count_;
815 total_copy_bytes_ += fetcher->fetcher->context().size;
816 if (duration > max_copy_time_) {
817 max_copy_time_ = duration;
818 max_copy_time_channel_ = fetcher->channel_index;
819 max_copy_time_size_ = fetcher->fetcher->context().size;
820 }
821}
822
823} // namespace logger
824} // namespace aos