blob: f840c2243848fc5c3bc339da4a59d84da255146f [file] [log] [blame]
Austin Schuhb06f03b2021-02-17 22:00:37 -08001#include "aos/events/logging/log_writer.h"
2
Austin Schuh6bb8a822021-03-31 23:04:39 -07003#include <dirent.h>
4
Austin Schuhb06f03b2021-02-17 22:00:37 -08005#include <functional>
6#include <map>
7#include <vector>
8
9#include "aos/configuration.h"
10#include "aos/events/event_loop.h"
11#include "aos/network/message_bridge_server_generated.h"
12#include "aos/network/team_number.h"
Austin Schuh61e973f2021-02-21 21:43:56 -080013#include "aos/network/timestamp_channel.h"
Austin Schuhb0e439d2023-05-15 10:55:40 -070014#include "aos/sha256.h"
Austin Schuhb06f03b2021-02-17 22:00:37 -080015
Stephan Pleinesf63bde82024-01-13 15:59:33 -080016namespace aos::logger {
Austin Schuhb06f03b2021-02-17 22:00:37 -080017namespace {
18using message_bridge::RemoteMessage;
Austin Schuhbd06ae42021-03-31 22:48:21 -070019namespace chrono = std::chrono;
Austin Schuhb06f03b2021-02-17 22:00:37 -080020} // namespace
21
22Logger::Logger(EventLoop *event_loop, const Configuration *configuration,
23 std::function<bool(const Channel *)> should_log)
24 : event_loop_(event_loop),
25 configuration_(configuration),
Austin Schuh5b728b72021-06-16 14:57:15 -070026 node_(configuration::GetNode(configuration_, event_loop->node())),
27 node_index_(configuration::GetNodeIndex(configuration_, node_)),
Austin Schuhb06f03b2021-02-17 22:00:37 -080028 name_(network::GetHostname()),
Austin Schuh2d612c82023-07-17 13:37:48 -070029 timer_handler_(event_loop_->AddTimer([this]() {
30 DoLogData(event_loop_->monotonic_now() - logging_delay_, true);
31 })),
Austin Schuhb06f03b2021-02-17 22:00:37 -080032 server_statistics_fetcher_(
Austin Schuh54ffea42023-08-23 13:27:04 -070033 configuration::NodesCount(event_loop_->configuration()) > 1u
Austin Schuhb06f03b2021-02-17 22:00:37 -080034 ? event_loop_->MakeFetcher<message_bridge::ServerStatistics>(
35 "/aos")
36 : aos::Fetcher<message_bridge::ServerStatistics>()) {
Austin Schuh58646e22021-08-23 23:51:46 -070037 timer_handler_->set_name("channel_poll");
Austin Schuh5b728b72021-06-16 14:57:15 -070038 VLOG(1) << "Creating logger for " << FlatbufferToJson(node_);
Austin Schuhb06f03b2021-02-17 22:00:37 -080039
Naman Gupta41d70c22022-11-21 15:29:52 -080040 // When we are logging remote timestamps, we need to be able to translate
41 // from the channel index that the event loop uses to the channel index in
42 // the config in the log file.
Austin Schuh01f3b392022-01-25 20:03:09 -080043 event_loop_to_logged_channel_index_.resize(
44 event_loop->configuration()->channels()->size(), -1);
45 for (size_t event_loop_channel_index = 0;
46 event_loop_channel_index <
47 event_loop->configuration()->channels()->size();
48 ++event_loop_channel_index) {
49 const Channel *event_loop_channel =
50 event_loop->configuration()->channels()->Get(event_loop_channel_index);
51
52 const Channel *logged_channel = aos::configuration::GetChannel(
53 configuration_, event_loop_channel->name()->string_view(),
54 event_loop_channel->type()->string_view(), "", node_);
55
56 if (logged_channel != nullptr) {
57 event_loop_to_logged_channel_index_[event_loop_channel_index] =
58 configuration::ChannelIndex(configuration_, logged_channel);
59 }
60 }
61
62 // Map to match source channels with the timestamp logger, if the contents
63 // should be reliable, and a list of all channels logged on it to be treated
64 // as reliable.
65 std::map<const Channel *, std::tuple<const Node *, bool, std::vector<bool>>>
66 timestamp_logger_channels;
Austin Schuhb06f03b2021-02-17 22:00:37 -080067
Austin Schuh61e973f2021-02-21 21:43:56 -080068 message_bridge::ChannelTimestampFinder finder(event_loop_);
69 for (const Channel *channel : *event_loop_->configuration()->channels()) {
70 if (!configuration::ChannelIsSendableOnNode(channel, event_loop_->node())) {
Austin Schuhb06f03b2021-02-17 22:00:37 -080071 continue;
72 }
Austin Schuh61e973f2021-02-21 21:43:56 -080073 if (!channel->has_destination_nodes()) {
74 continue;
75 }
Austin Schuh01f3b392022-01-25 20:03:09 -080076 const size_t channel_index =
77 configuration::ChannelIndex(event_loop_->configuration(), channel);
78
Austin Schuh61e973f2021-02-21 21:43:56 -080079 for (const Connection *connection : *channel->destination_nodes()) {
80 if (configuration::ConnectionDeliveryTimeIsLoggedOnNode(
81 connection, event_loop_->node())) {
82 const Node *other_node = configuration::GetNode(
Austin Schuh5b728b72021-06-16 14:57:15 -070083 configuration_, connection->name()->string_view());
Austin Schuh61e973f2021-02-21 21:43:56 -080084
85 VLOG(1) << "Timestamps are logged from "
86 << FlatbufferToJson(other_node);
Austin Schuh01f3b392022-01-25 20:03:09 -080087 // True if each channel's remote timestamps are split into a separate
88 // RemoteMessage channel.
89 const bool is_split =
90 finder.SplitChannelForChannel(channel, connection) != nullptr;
91
92 const Channel *const timestamp_logger_channel =
93 finder.ForChannel(channel, connection);
94
95 auto it = timestamp_logger_channels.find(timestamp_logger_channel);
96 if (it != timestamp_logger_channels.end()) {
97 CHECK(!is_split);
98 CHECK_LT(channel_index, std::get<2>(it->second).size());
Brian Smartt796cca02022-04-12 15:07:21 -070099 std::get<2>(it->second)[channel_index] =
100 (connection->time_to_live() == 0);
Austin Schuh01f3b392022-01-25 20:03:09 -0800101 } else {
102 if (is_split) {
103 timestamp_logger_channels.insert(std::make_pair(
104 timestamp_logger_channel,
105 std::make_tuple(other_node, (connection->time_to_live() == 0),
106 std::vector<bool>())));
107 } else {
108 std::vector<bool> channel_reliable_contents(
109 event_loop->configuration()->channels()->size(), false);
110 channel_reliable_contents[channel_index] =
111 (connection->time_to_live() == 0);
112
113 timestamp_logger_channels.insert(std::make_pair(
114 timestamp_logger_channel,
115 std::make_tuple(other_node, false,
116 std::move(channel_reliable_contents))));
117 }
118 }
Austin Schuh61e973f2021-02-21 21:43:56 -0800119 }
120 }
Austin Schuhb06f03b2021-02-17 22:00:37 -0800121 }
122
Austin Schuhb06f03b2021-02-17 22:00:37 -0800123 for (size_t channel_index = 0;
124 channel_index < configuration_->channels()->size(); ++channel_index) {
125 const Channel *const config_channel =
126 configuration_->channels()->Get(channel_index);
127 // The MakeRawFetcher method needs a channel which is in the event loop
128 // configuration() object, not the configuration_ object. Go look that up
129 // from the config.
130 const Channel *channel = aos::configuration::GetChannel(
131 event_loop_->configuration(), config_channel->name()->string_view(),
132 config_channel->type()->string_view(), "", event_loop_->node());
133 CHECK(channel != nullptr)
134 << ": Failed to look up channel "
135 << aos::configuration::CleanedChannelToString(config_channel);
Austin Schuh5b728b72021-06-16 14:57:15 -0700136 if (!should_log(config_channel)) {
Austin Schuhb06f03b2021-02-17 22:00:37 -0800137 continue;
138 }
139
140 FetcherStruct fs;
141 fs.channel_index = channel_index;
142 fs.channel = channel;
143
144 const bool is_local =
Austin Schuh5b728b72021-06-16 14:57:15 -0700145 configuration::ChannelIsSendableOnNode(config_channel, node_);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800146
147 const bool is_readable =
Austin Schuh5b728b72021-06-16 14:57:15 -0700148 configuration::ChannelIsReadableOnNode(config_channel, node_);
Austin Schuh01f3b392022-01-25 20:03:09 -0800149 const bool is_logged =
150 configuration::ChannelMessageIsLoggedOnNode(config_channel, node_);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800151 const bool log_message = is_logged && is_readable;
152
153 bool log_delivery_times = false;
Austin Schuh54ffea42023-08-23 13:27:04 -0700154 if (configuration::NodesCount(configuration_) > 1u) {
Austin Schuh72211ae2021-08-05 14:02:30 -0700155 const aos::Connection *connection =
Austin Schuh5b728b72021-06-16 14:57:15 -0700156 configuration::ConnectionToNode(config_channel, node_);
Austin Schuh72211ae2021-08-05 14:02:30 -0700157
Austin Schuhb06f03b2021-02-17 22:00:37 -0800158 log_delivery_times = configuration::ConnectionDeliveryTimeIsLoggedOnNode(
Austin Schuh72211ae2021-08-05 14:02:30 -0700159 connection, event_loop_->node());
160
161 CHECK_EQ(log_delivery_times,
162 configuration::ConnectionDeliveryTimeIsLoggedOnNode(
Austin Schuh5b728b72021-06-16 14:57:15 -0700163 config_channel, node_, node_));
Austin Schuh72211ae2021-08-05 14:02:30 -0700164
165 if (connection) {
166 fs.reliable_forwarding = (connection->time_to_live() == 0);
167 }
Austin Schuhb06f03b2021-02-17 22:00:37 -0800168 }
169
Austin Schuh01f3b392022-01-25 20:03:09 -0800170 // Now, detect a RemoteMessage timestamp logger where we should just log
171 // the contents to a file directly.
Austin Schuhb06f03b2021-02-17 22:00:37 -0800172 const bool log_contents = timestamp_logger_channels.find(channel) !=
173 timestamp_logger_channels.end();
174
175 if (log_message || log_delivery_times || log_contents) {
176 fs.fetcher = event_loop->MakeRawFetcher(channel);
177 VLOG(1) << "Logging channel "
178 << configuration::CleanedChannelToString(channel);
179
180 if (log_delivery_times) {
181 VLOG(1) << " Delivery times";
182 fs.wants_timestamp_writer = true;
Austin Schuh5b728b72021-06-16 14:57:15 -0700183 fs.timestamp_node_index = static_cast<int>(node_index_);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800184 }
Austin Schuhe46492f2021-07-31 19:49:41 -0700185 // Both the timestamp and data writers want data_node_index so it knows
186 // what the source node is.
187 if (log_message || log_delivery_times) {
Austin Schuhb06f03b2021-02-17 22:00:37 -0800188 if (!is_local) {
189 const Node *source_node = configuration::GetNode(
190 configuration_, channel->source_node()->string_view());
191 fs.data_node_index =
192 configuration::GetNodeIndex(configuration_, source_node);
Austin Schuhe46492f2021-07-31 19:49:41 -0700193 }
194 }
195 if (log_message) {
196 VLOG(1) << " Data";
197 fs.wants_writer = true;
198 if (!is_local) {
Austin Schuhb06f03b2021-02-17 22:00:37 -0800199 fs.log_type = LogType::kLogRemoteMessage;
200 } else {
Austin Schuh5b728b72021-06-16 14:57:15 -0700201 fs.data_node_index = static_cast<int>(node_index_);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800202 }
203 }
204 if (log_contents) {
205 VLOG(1) << "Timestamp logger channel "
206 << configuration::CleanedChannelToString(channel);
Austin Schuh01f3b392022-01-25 20:03:09 -0800207 auto timestamp_logger_channel_info =
208 timestamp_logger_channels.find(channel);
209 CHECK(timestamp_logger_channel_info != timestamp_logger_channels.end());
210 fs.timestamp_node = std::get<0>(timestamp_logger_channel_info->second);
211 fs.reliable_contents =
212 std::get<1>(timestamp_logger_channel_info->second);
213 fs.channel_reliable_contents =
214 std::get<2>(timestamp_logger_channel_info->second);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800215 fs.wants_contents_writer = true;
216 fs.contents_node_index =
217 configuration::GetNodeIndex(configuration_, fs.timestamp_node);
218 }
219 fetchers_.emplace_back(std::move(fs));
220 }
221 }
Austin Schuhb06f03b2021-02-17 22:00:37 -0800222}
223
224Logger::~Logger() {
225 if (log_namer_) {
226 // If we are replaying a log file, or in simulation, we want to force the
227 // last bit of data to be logged. The easiest way to deal with this is to
Austin Schuh01f3b392022-01-25 20:03:09 -0800228 // poll everything as we go to destroy the class, ie, shut down the
229 // logger, and write it to disk.
Austin Schuhb06f03b2021-02-17 22:00:37 -0800230 StopLogging(event_loop_->monotonic_now());
231 }
232}
233
Austin Schuh3b2b5b52023-07-05 11:36:46 -0700234aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> PackConfiguration(
235 const Configuration *const configuration) {
236 flatbuffers::FlatBufferBuilder fbb;
237 flatbuffers::Offset<aos::Configuration> configuration_offset =
238 CopyFlatBuffer(configuration, &fbb);
239 LogFileHeader::Builder log_file_header_builder(fbb);
240 log_file_header_builder.add_configuration(configuration_offset);
241 fbb.FinishSizePrefixed(log_file_header_builder.Finish());
242 return fbb.Release();
243}
244
Brian Smartt796cca02022-04-12 15:07:21 -0700245std::string Logger::WriteConfiguration(LogNamer *log_namer) {
Austin Schuhb06f03b2021-02-17 22:00:37 -0800246 std::string config_sha256;
Brian Smartt03c00da2022-02-24 10:25:00 -0800247
Austin Schuhb06f03b2021-02-17 22:00:37 -0800248 if (separate_config_) {
Austin Schuh3b2b5b52023-07-05 11:36:46 -0700249 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config_header =
250 PackConfiguration(configuration_);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800251 config_sha256 = Sha256(config_header.span());
Alexei Strots6bd1e642023-07-20 19:36:27 -0700252 VLOG(1) << "Config sha256 of " << config_sha256;
Brian Smartt03c00da2022-02-24 10:25:00 -0800253 log_namer->WriteConfiguration(&config_header, config_sha256);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800254 }
255
Brian Smartt03c00da2022-02-24 10:25:00 -0800256 return config_sha256;
257}
258
259void Logger::StartLogging(std::unique_ptr<LogNamer> log_namer,
260 std::optional<UUID> log_start_uuid) {
261 CHECK(!log_namer_) << ": Already logging";
262
263 VLOG(1) << "Starting logger for " << FlatbufferToJson(node_);
264
265 auto config_sha256 = WriteConfiguration(log_namer.get());
266
267 log_namer_ = std::move(log_namer);
268
Austin Schuhb06f03b2021-02-17 22:00:37 -0800269 log_event_uuid_ = UUID::Random();
270 log_start_uuid_ = log_start_uuid;
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700271 log_namer_->SetHeaderTemplate(MakeHeader(config_sha256));
Austin Schuhb06f03b2021-02-17 22:00:37 -0800272
273 // We want to do as much work as possible before the initial Fetch. Time
274 // between that and actually starting to log opens up the possibility of
275 // falling off the end of the queue during that time.
276
277 for (FetcherStruct &f : fetchers_) {
278 if (f.wants_writer) {
279 f.writer = log_namer_->MakeWriter(f.channel);
280 }
281 if (f.wants_timestamp_writer) {
282 f.timestamp_writer = log_namer_->MakeTimestampWriter(f.channel);
283 }
284 if (f.wants_contents_writer) {
285 f.contents_writer = log_namer_->MakeForwardedTimestampWriter(
286 f.channel, CHECK_NOTNULL(f.timestamp_node));
287 }
288 }
289
Austin Schuha42ee962021-03-31 22:49:30 -0700290 const aos::monotonic_clock::time_point beginning_time =
291 event_loop_->monotonic_now();
292
Austin Schuh2d612c82023-07-17 13:37:48 -0700293 log_until_time_ = beginning_time;
294
Austin Schuhb06f03b2021-02-17 22:00:37 -0800295 // Grab data from each channel right before we declare the log file started
296 // so we can capture the latest message on each channel. This lets us have
297 // non periodic messages with configuration that now get logged.
298 for (FetcherStruct &f : fetchers_) {
299 const auto start = event_loop_->monotonic_now();
300 const bool got_new = f.fetcher->Fetch();
301 const auto end = event_loop_->monotonic_now();
302 RecordFetchResult(start, end, got_new, &f);
303
304 // If there is a message, we want to write it.
305 f.written = f.fetcher->context().data == nullptr;
306 }
307
308 // Clear out any old timestamps in case we are re-starting logging.
Austin Schuh572924a2021-07-30 22:32:12 -0700309 for (size_t i = 0; i < configuration::NodesCount(configuration_); ++i) {
Austin Schuh58646e22021-08-23 23:51:46 -0700310 log_namer_->ClearStartTimes();
Austin Schuhb06f03b2021-02-17 22:00:37 -0800311 }
312
Austin Schuha42ee962021-03-31 22:49:30 -0700313 const aos::monotonic_clock::time_point fetch_time =
314 event_loop_->monotonic_now();
Austin Schuhb06f03b2021-02-17 22:00:37 -0800315 WriteHeader();
Austin Schuha42ee962021-03-31 22:49:30 -0700316 const aos::monotonic_clock::time_point header_time =
317 event_loop_->monotonic_now();
Austin Schuhb06f03b2021-02-17 22:00:37 -0800318
Brian Smartt796cca02022-04-12 15:07:21 -0700319 VLOG(1) << "Logging node as " << FlatbufferToJson(node_) << " start_time "
320 << last_synchronized_time_ << ", took "
Brian Smartt03c00da2022-02-24 10:25:00 -0800321 << chrono::duration<double>(fetch_time - beginning_time).count()
322 << " to fetch, "
323 << chrono::duration<double>(header_time - fetch_time).count()
324 << " to write headers, boot uuid " << event_loop_->boot_uuid();
Austin Schuhb06f03b2021-02-17 22:00:37 -0800325
326 // Force logging up until the start of the log file now, so the messages at
327 // the start are always ordered before the rest of the messages.
328 // Note: this ship may have already sailed, but we don't have to make it
329 // worse.
330 // TODO(austin): Test...
Austin Schuh855f8932021-03-19 22:01:32 -0700331 //
Naman Gupta41d70c22022-11-21 15:29:52 -0800332 // This is safe to call here since we have set last_synchronized_time_ as
333 // the same time as in the header, and all the data before it should be
334 // logged without ordering concerns.
Austin Schuhb06f03b2021-02-17 22:00:37 -0800335 LogUntil(last_synchronized_time_);
336
Philipp Schradera6712522023-07-05 20:25:11 -0700337 timer_handler_->Schedule(event_loop_->monotonic_now() + polling_period_,
338 polling_period_);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800339}
340
Brian Smartt796cca02022-04-12 15:07:21 -0700341std::unique_ptr<LogNamer> Logger::RestartLogging(
Austin Schuh2d612c82023-07-17 13:37:48 -0700342 std::unique_ptr<LogNamer> log_namer, std::optional<UUID> log_start_uuid,
343 std::optional<monotonic_clock::time_point> end_time) {
Brian Smartt03c00da2022-02-24 10:25:00 -0800344 CHECK(log_namer_) << ": Unexpected restart while not logging";
345
346 VLOG(1) << "Restarting logger for " << FlatbufferToJson(node_);
347
Austin Schuh2d612c82023-07-17 13:37:48 -0700348 // Grab a representative time on both the RT and monotonic clock.
349 // Average a monotonic clock before and after to reduce the error.
350 const aos::monotonic_clock::time_point monotonic_now1 =
351 event_loop_->monotonic_now();
352 const aos::realtime_clock::time_point realtime_now =
353 event_loop_->realtime_now();
354 const aos::monotonic_clock::time_point monotonic_now2 =
355 event_loop_->monotonic_now();
Austin Schuh08dba8f2023-05-01 08:29:30 -0700356
Austin Schuh2d612c82023-07-17 13:37:48 -0700357 // Log until the provided end time.
358 if (end_time) {
359 CHECK_LE(*end_time, monotonic_now1) << ": Can't log into the future.";
360 // DoLogData is a bit fragile.
361 if (*end_time > last_synchronized_time_) {
362 DoLogData(*end_time, false);
Austin Schuh08dba8f2023-05-01 08:29:30 -0700363 }
364 }
365
Austin Schuh2d612c82023-07-17 13:37:48 -0700366 // We are now synchronized up to last_synchronized_time_. We only have record
367 // of messages from before last_synchronized_time_, so it is a safe start
368 // time.
Brian Smartt03c00da2022-02-24 10:25:00 -0800369
370 std::unique_ptr<LogNamer> old_log_namer = std::move(log_namer_);
371 log_namer_ = std::move(log_namer);
372
Austin Schuh2d612c82023-07-17 13:37:48 -0700373 // Our start time is now how far we logged until before.
374 const aos::monotonic_clock::time_point monotonic_start_time =
375 last_synchronized_time_;
Austin Schuh41f8df92022-04-15 11:45:52 -0700376 const aos::realtime_clock::time_point realtime_start_time =
Austin Schuh2d612c82023-07-17 13:37:48 -0700377 (realtime_now + (last_synchronized_time_.time_since_epoch() -
378 ((monotonic_now1.time_since_epoch() +
379 monotonic_now2.time_since_epoch()) /
380 2)));
Brian Smartt03c00da2022-02-24 10:25:00 -0800381
382 auto config_sha256 = WriteConfiguration(log_namer_.get());
383
384 log_event_uuid_ = UUID::Random();
385 log_start_uuid_ = log_start_uuid;
386
387 log_namer_->SetHeaderTemplate(MakeHeader(config_sha256));
388
389 // Note that WriteHeader updates last_synchronized_time_ to be the
390 // current time when it is called, which is then the "start time"
391 // of the new (restarted) log. This timestamp will be after
Naman Gupta41d70c22022-11-21 15:29:52 -0800392 // the timestamp of the last message fetched on each channel, but is
393 // carefully picked per the comment above to not violate
394 // max_out_of_order_duration.
Austin Schuh41f8df92022-04-15 11:45:52 -0700395 WriteHeader(monotonic_start_time, realtime_start_time);
Brian Smartt03c00da2022-02-24 10:25:00 -0800396
397 const aos::monotonic_clock::time_point header_time =
398 event_loop_->monotonic_now();
399
Austin Schuh08dba8f2023-05-01 08:29:30 -0700400 // Close out the old writers to free up memory to be used by the new writers.
401 old_log_namer->Close();
402
Brian Smartt03c00da2022-02-24 10:25:00 -0800403 for (FetcherStruct &f : fetchers_) {
Brian Smartt03c00da2022-02-24 10:25:00 -0800404 // Create writers from the new namer
Brian Smartt03c00da2022-02-24 10:25:00 -0800405
406 if (f.wants_writer) {
Austin Schuh08dba8f2023-05-01 08:29:30 -0700407 f.writer = log_namer_->MakeWriter(f.channel);
Brian Smartt03c00da2022-02-24 10:25:00 -0800408 }
409 if (f.wants_timestamp_writer) {
Austin Schuh08dba8f2023-05-01 08:29:30 -0700410 f.timestamp_writer = log_namer_->MakeTimestampWriter(f.channel);
Brian Smartt03c00da2022-02-24 10:25:00 -0800411 }
412 if (f.wants_contents_writer) {
Austin Schuh08dba8f2023-05-01 08:29:30 -0700413 f.contents_writer = log_namer_->MakeForwardedTimestampWriter(
Brian Smartt03c00da2022-02-24 10:25:00 -0800414 f.channel, CHECK_NOTNULL(f.timestamp_node));
415 }
416
Austin Schuh08dba8f2023-05-01 08:29:30 -0700417 // Mark each channel with data as not written. That triggers each channel
418 // to be re-logged.
419 f.written = f.fetcher->context().data == nullptr;
Brian Smartt03c00da2022-02-24 10:25:00 -0800420 }
421
Austin Schuh08dba8f2023-05-01 08:29:30 -0700422 // And now make sure to log everything up to the start time in 1 big go so we
423 // make sure we have it before we let the world start logging normally again.
424 LogUntil(monotonic_start_time);
425
Brian Smartt03c00da2022-02-24 10:25:00 -0800426 const aos::monotonic_clock::time_point channel_time =
427 event_loop_->monotonic_now();
428
Brian Smartt796cca02022-04-12 15:07:21 -0700429 VLOG(1) << "Logging node as " << FlatbufferToJson(node_) << " restart_time "
430 << last_synchronized_time_ << ", took "
Austin Schuh2d612c82023-07-17 13:37:48 -0700431 << chrono::duration<double>(header_time - monotonic_now1).count()
Brian Smartt03c00da2022-02-24 10:25:00 -0800432 << " to prepare and write header, "
433 << chrono::duration<double>(channel_time - header_time).count()
Brian Smartt796cca02022-04-12 15:07:21 -0700434 << " to write initial channel messages, boot uuid "
435 << event_loop_->boot_uuid();
Brian Smartt03c00da2022-02-24 10:25:00 -0800436
437 return old_log_namer;
438}
439
Austin Schuhb06f03b2021-02-17 22:00:37 -0800440std::unique_ptr<LogNamer> Logger::StopLogging(
441 aos::monotonic_clock::time_point end_time) {
442 CHECK(log_namer_) << ": Not logging right now";
443
444 if (end_time != aos::monotonic_clock::min_time) {
Austin Schuh30586902021-03-30 22:54:08 -0700445 // Folks like to use the on_logged_period_ callback to trigger stop and
446 // start events. We can't have those then recurse and try to stop again.
447 // Rather than making everything reentrant, let's just instead block the
448 // callback here.
449 DoLogData(end_time, false);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800450 }
451 timer_handler_->Disable();
452
453 for (FetcherStruct &f : fetchers_) {
454 f.writer = nullptr;
455 f.timestamp_writer = nullptr;
456 f.contents_writer = nullptr;
457 }
Austin Schuhb06f03b2021-02-17 22:00:37 -0800458
459 log_event_uuid_ = UUID::Zero();
Austin Schuh34f9e482021-03-31 22:54:18 -0700460 log_start_uuid_ = std::nullopt;
Austin Schuhb06f03b2021-02-17 22:00:37 -0800461
Alexei Strotsbc776d52023-04-15 23:46:45 -0700462 log_namer_->Close();
463
Austin Schuhb06f03b2021-02-17 22:00:37 -0800464 return std::move(log_namer_);
465}
466
Austin Schuh41f8df92022-04-15 11:45:52 -0700467void Logger::WriteHeader(aos::monotonic_clock::time_point monotonic_start_time,
468 aos::realtime_clock::time_point realtime_start_time) {
Austin Schuh54ffea42023-08-23 13:27:04 -0700469 if (configuration::NodesCount(configuration_) > 1u) {
Austin Schuhb06f03b2021-02-17 22:00:37 -0800470 server_statistics_fetcher_.Fetch();
471 }
472
Austin Schuh41f8df92022-04-15 11:45:52 -0700473 if (monotonic_start_time == aos::monotonic_clock::min_time) {
474 monotonic_start_time = event_loop_->monotonic_now();
475 realtime_start_time = event_loop_->realtime_now();
476 }
Austin Schuhb06f03b2021-02-17 22:00:37 -0800477
478 // We need to pick a point in time to declare the log file "started". This
479 // starts here. It needs to be after everything is fetched so that the
480 // fetchers are all pointed at the most recent message before the start
481 // time.
482 last_synchronized_time_ = monotonic_start_time;
483
484 for (const Node *node : log_namer_->nodes()) {
485 const int node_index = configuration::GetNodeIndex(configuration_, node);
486 MaybeUpdateTimestamp(node, node_index, monotonic_start_time,
487 realtime_start_time);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800488 }
489}
490
Austin Schuhb06f03b2021-02-17 22:00:37 -0800491void Logger::WriteMissingTimestamps() {
Austin Schuh54ffea42023-08-23 13:27:04 -0700492 if (configuration::NodesCount(configuration_) > 1u) {
Austin Schuhb06f03b2021-02-17 22:00:37 -0800493 server_statistics_fetcher_.Fetch();
494 } else {
495 return;
496 }
497
498 if (server_statistics_fetcher_.get() == nullptr) {
499 return;
500 }
501
502 for (const Node *node : log_namer_->nodes()) {
503 const int node_index = configuration::GetNodeIndex(configuration_, node);
504 if (MaybeUpdateTimestamp(
505 node, node_index,
506 server_statistics_fetcher_.context().monotonic_event_time,
507 server_statistics_fetcher_.context().realtime_event_time)) {
Austin Schuh58646e22021-08-23 23:51:46 -0700508 VLOG(1) << "Timestamps changed on " << aos::FlatbufferToJson(node);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800509 }
510 }
511}
512
Austin Schuhb06f03b2021-02-17 22:00:37 -0800513bool Logger::MaybeUpdateTimestamp(
514 const Node *node, int node_index,
515 aos::monotonic_clock::time_point monotonic_start_time,
516 aos::realtime_clock::time_point realtime_start_time) {
517 // Bail early if the start times are already set.
Austin Schuh58646e22021-08-23 23:51:46 -0700518 if (node_ == node || !configuration::MultiNode(configuration_)) {
519 if (log_namer_->monotonic_start_time(node_index,
520 event_loop_->boot_uuid()) !=
521 monotonic_clock::min_time) {
522 return false;
523 }
Brian Smartt03c00da2022-02-24 10:25:00 -0800524
Austin Schuhb06f03b2021-02-17 22:00:37 -0800525 // There are no offsets to compute for ourself, so always succeed.
Austin Schuh58646e22021-08-23 23:51:46 -0700526 log_namer_->SetStartTimes(node_index, event_loop_->boot_uuid(),
527 monotonic_start_time, realtime_start_time,
528 monotonic_start_time, realtime_start_time);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800529 return true;
530 } else if (server_statistics_fetcher_.get() != nullptr) {
531 // We must be a remote node now. Look for the connection and see if it is
532 // connected.
James Kuszmaul17607fb2021-10-15 20:00:32 -0700533 CHECK(server_statistics_fetcher_->has_connections());
Austin Schuhb06f03b2021-02-17 22:00:37 -0800534
535 for (const message_bridge::ServerConnection *connection :
536 *server_statistics_fetcher_->connections()) {
537 if (connection->node()->name()->string_view() !=
538 node->name()->string_view()) {
539 continue;
540 }
541
542 if (connection->state() != message_bridge::State::CONNECTED) {
543 VLOG(1) << node->name()->string_view()
544 << " is not connected, can't start it yet.";
545 break;
546 }
547
Austin Schuhb06f03b2021-02-17 22:00:37 -0800548 if (!connection->has_monotonic_offset()) {
549 VLOG(1) << "Missing monotonic offset for setting start time for node "
550 << aos::FlatbufferToJson(node);
551 break;
552 }
553
James Kuszmaul17607fb2021-10-15 20:00:32 -0700554 CHECK(connection->has_boot_uuid());
Austin Schuh58646e22021-08-23 23:51:46 -0700555 const UUID boot_uuid =
556 UUID::FromString(connection->boot_uuid()->string_view());
557
558 if (log_namer_->monotonic_start_time(node_index, boot_uuid) !=
559 monotonic_clock::min_time) {
560 break;
561 }
562
563 VLOG(1) << "Updating start time for "
564 << aos::FlatbufferToJson(connection);
565
Austin Schuhb06f03b2021-02-17 22:00:37 -0800566 // Found it and it is connected. Compensate and go.
Austin Schuh73340842021-07-30 22:32:06 -0700567 log_namer_->SetStartTimes(
Austin Schuh58646e22021-08-23 23:51:46 -0700568 node_index, boot_uuid,
Austin Schuh73340842021-07-30 22:32:06 -0700569 monotonic_start_time +
570 std::chrono::nanoseconds(connection->monotonic_offset()),
571 realtime_start_time, monotonic_start_time, realtime_start_time);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800572 return true;
573 }
574 }
575 return false;
576}
577
578aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> Logger::MakeHeader(
Austin Schuh73340842021-07-30 22:32:06 -0700579 std::string_view config_sha256) {
Austin Schuhb06f03b2021-02-17 22:00:37 -0800580 flatbuffers::FlatBufferBuilder fbb;
581 fbb.ForceDefaults(true);
582
583 flatbuffers::Offset<aos::Configuration> configuration_offset;
584 if (!separate_config_) {
585 configuration_offset = CopyFlatBuffer(configuration_, &fbb);
586 } else {
587 CHECK(!config_sha256.empty());
588 }
589
590 const flatbuffers::Offset<flatbuffers::String> name_offset =
591 fbb.CreateString(name_);
592
Austin Schuhfa712682022-05-11 16:43:42 -0700593 const flatbuffers::Offset<flatbuffers::String> logger_sha1_offset =
594 logger_sha1_.empty() ? 0 : fbb.CreateString(logger_sha1_);
595 const flatbuffers::Offset<flatbuffers::String> logger_version_offset =
596 logger_version_.empty() ? 0 : fbb.CreateString(logger_version_);
597
Austin Schuhb06f03b2021-02-17 22:00:37 -0800598 CHECK(log_event_uuid_ != UUID::Zero());
599 const flatbuffers::Offset<flatbuffers::String> log_event_uuid_offset =
Austin Schuh5e2bfb82021-03-13 22:46:55 -0800600 log_event_uuid_.PackString(&fbb);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800601
602 const flatbuffers::Offset<flatbuffers::String> logger_instance_uuid_offset =
Austin Schuh5e2bfb82021-03-13 22:46:55 -0800603 logger_instance_uuid_.PackString(&fbb);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800604
605 flatbuffers::Offset<flatbuffers::String> log_start_uuid_offset;
Austin Schuh34f9e482021-03-31 22:54:18 -0700606 if (log_start_uuid_) {
607 log_start_uuid_offset = fbb.CreateString(log_start_uuid_->ToString());
Austin Schuhb06f03b2021-02-17 22:00:37 -0800608 }
609
610 flatbuffers::Offset<flatbuffers::String> config_sha256_offset;
611 if (!config_sha256.empty()) {
612 config_sha256_offset = fbb.CreateString(config_sha256);
613 }
614
615 const flatbuffers::Offset<flatbuffers::String> logger_node_boot_uuid_offset =
Austin Schuh5e2bfb82021-03-13 22:46:55 -0800616 event_loop_->boot_uuid().PackString(&fbb);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800617
Austin Schuhb06f03b2021-02-17 22:00:37 -0800618 flatbuffers::Offset<Node> logger_node_offset;
619
620 if (configuration::MultiNode(configuration_)) {
Austin Schuh5b728b72021-06-16 14:57:15 -0700621 logger_node_offset = RecursiveCopyFlatBuffer(node_, &fbb);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800622 }
623
624 aos::logger::LogFileHeader::Builder log_file_header_builder(fbb);
625
626 log_file_header_builder.add_name(name_offset);
Austin Schuhfa712682022-05-11 16:43:42 -0700627 if (!logger_sha1_offset.IsNull()) {
628 log_file_header_builder.add_logger_sha1(logger_sha1_offset);
629 }
630 if (!logger_version_offset.IsNull()) {
631 log_file_header_builder.add_logger_version(logger_version_offset);
632 }
Austin Schuhb06f03b2021-02-17 22:00:37 -0800633
634 // Only add the node if we are running in a multinode configuration.
Austin Schuh73340842021-07-30 22:32:06 -0700635 if (configuration::MultiNode(configuration_)) {
Austin Schuhb06f03b2021-02-17 22:00:37 -0800636 log_file_header_builder.add_logger_node(logger_node_offset);
637 }
638
639 if (!configuration_offset.IsNull()) {
640 log_file_header_builder.add_configuration(configuration_offset);
641 }
642 // The worst case theoretical out of order is the polling period times 2.
643 // One message could get logged right after the boundary, but be for right
644 // before the next boundary. And the reverse could happen for another
645 // message. Report back 3x to be extra safe, and because the cost isn't
646 // huge on the read side.
647 log_file_header_builder.add_max_out_of_order_duration(
648 std::chrono::nanoseconds(3 * polling_period_).count());
649
Austin Schuhb06f03b2021-02-17 22:00:37 -0800650 log_file_header_builder.add_log_event_uuid(log_event_uuid_offset);
651 log_file_header_builder.add_logger_instance_uuid(logger_instance_uuid_offset);
652 if (!log_start_uuid_offset.IsNull()) {
653 log_file_header_builder.add_log_start_uuid(log_start_uuid_offset);
654 }
655 log_file_header_builder.add_logger_node_boot_uuid(
656 logger_node_boot_uuid_offset);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800657
658 if (!config_sha256_offset.IsNull()) {
659 log_file_header_builder.add_configuration_sha256(config_sha256_offset);
660 }
661
662 fbb.FinishSizePrefixed(log_file_header_builder.Finish());
663 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> result(
664 fbb.Release());
665
666 CHECK(result.Verify()) << ": Built a corrupted header.";
667
668 return result;
669}
670
671void Logger::ResetStatisics() {
672 max_message_fetch_time_ = std::chrono::nanoseconds::zero();
673 max_message_fetch_time_channel_ = -1;
674 max_message_fetch_time_size_ = -1;
675 total_message_fetch_time_ = std::chrono::nanoseconds::zero();
676 total_message_fetch_count_ = 0;
677 total_message_fetch_bytes_ = 0;
678 total_nop_fetch_time_ = std::chrono::nanoseconds::zero();
679 total_nop_fetch_count_ = 0;
680 max_copy_time_ = std::chrono::nanoseconds::zero();
681 max_copy_time_channel_ = -1;
682 max_copy_time_size_ = -1;
683 total_copy_time_ = std::chrono::nanoseconds::zero();
684 total_copy_count_ = 0;
685 total_copy_bytes_ = 0;
Philipp Schrader78dc1212023-08-16 18:17:47 -0700686 max_log_delay_ = std::chrono::nanoseconds::zero();
687 max_log_delay_channel_ = -1;
Austin Schuhb06f03b2021-02-17 22:00:37 -0800688}
689
690void Logger::Rotate() {
691 for (const Node *node : log_namer_->nodes()) {
Austin Schuh73340842021-07-30 22:32:06 -0700692 log_namer_->Rotate(node);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800693 }
694}
695
Brian Smartt03c00da2022-02-24 10:25:00 -0800696void Logger::WriteData(NewDataWriter *writer, const FetcherStruct &f) {
697 if (writer != nullptr) {
698 const UUID source_node_boot_uuid =
699 static_cast<int>(node_index_) != f.data_node_index
700 ? f.fetcher->context().source_boot_uuid
701 : event_loop_->boot_uuid();
702 // Write!
703 const auto start = event_loop_->monotonic_now();
Brian Smartt03c00da2022-02-24 10:25:00 -0800704
Naman Gupta41d70c22022-11-21 15:29:52 -0800705 ContextDataCopier coppier(f.fetcher->context(), f.channel_index, f.log_type,
706 event_loop_);
Brian Smartt03c00da2022-02-24 10:25:00 -0800707
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700708 aos::monotonic_clock::time_point message_time =
709 static_cast<int>(node_index_) != f.data_node_index
710 ? f.fetcher->context().monotonic_remote_time
711 : f.fetcher->context().monotonic_event_time;
Mithun Bharadwaja5f9d482023-08-02 16:10:40 -0700712 writer->CopyDataMessage(&coppier, source_node_boot_uuid, start,
713 message_time);
Austin Schuh48d10d62022-10-16 22:19:23 -0700714 RecordCreateMessageTime(start, coppier.end_time(), f);
Brian Smartt03c00da2022-02-24 10:25:00 -0800715
Brian Smartt796cca02022-04-12 15:07:21 -0700716 VLOG(2) << "Wrote data as node " << FlatbufferToJson(node_)
717 << " for channel "
Brian Smartt03c00da2022-02-24 10:25:00 -0800718 << configuration::CleanedChannelToString(f.fetcher->channel())
Alexei Strotsbc082d82023-05-03 08:43:42 -0700719 << " to " << writer->name();
Brian Smartt03c00da2022-02-24 10:25:00 -0800720 }
721}
722
Brian Smartt796cca02022-04-12 15:07:21 -0700723void Logger::WriteTimestamps(NewDataWriter *timestamp_writer,
724 const FetcherStruct &f) {
Brian Smartt03c00da2022-02-24 10:25:00 -0800725 if (timestamp_writer != nullptr) {
726 // And now handle timestamps.
Brian Smartt03c00da2022-02-24 10:25:00 -0800727
728 // Tell our writer that we know something about the remote boot.
729 timestamp_writer->UpdateRemote(
730 f.data_node_index, f.fetcher->context().source_boot_uuid,
731 f.fetcher->context().monotonic_remote_time,
732 f.fetcher->context().monotonic_event_time, f.reliable_forwarding);
Austin Schuh48d10d62022-10-16 22:19:23 -0700733
734 const auto start = event_loop_->monotonic_now();
735 ContextDataCopier coppier(f.fetcher->context(), f.channel_index,
Naman Gupta41d70c22022-11-21 15:29:52 -0800736 LogType::kLogDeliveryTimeOnly, event_loop_);
Austin Schuh48d10d62022-10-16 22:19:23 -0700737
Mithun Bharadwaja5f9d482023-08-02 16:10:40 -0700738 timestamp_writer->CopyTimestampMessage(
739 &coppier, event_loop_->boot_uuid(), start,
740 f.fetcher->context().monotonic_event_time);
Austin Schuh48d10d62022-10-16 22:19:23 -0700741 RecordCreateMessageTime(start, coppier.end_time(), f);
Brian Smartt03c00da2022-02-24 10:25:00 -0800742
Brian Smartt796cca02022-04-12 15:07:21 -0700743 VLOG(2) << "Wrote timestamps as node " << FlatbufferToJson(node_)
744 << " for channel "
Brian Smartt03c00da2022-02-24 10:25:00 -0800745 << configuration::CleanedChannelToString(f.fetcher->channel())
Alexei Strotsbc082d82023-05-03 08:43:42 -0700746 << " to " << timestamp_writer->name() << " timestamp";
Brian Smartt03c00da2022-02-24 10:25:00 -0800747 }
748}
749
Brian Smartt796cca02022-04-12 15:07:21 -0700750void Logger::WriteContent(NewDataWriter *contents_writer,
751 const FetcherStruct &f) {
Brian Smartt03c00da2022-02-24 10:25:00 -0800752 if (contents_writer != nullptr) {
753 const auto start = event_loop_->monotonic_now();
754 // And now handle the special message contents channel. Copy the
755 // message into a FlatBufferBuilder and save it to disk.
Brian Smartt03c00da2022-02-24 10:25:00 -0800756 const RemoteMessage *msg =
757 flatbuffers::GetRoot<RemoteMessage>(f.fetcher->context().data);
758
759 CHECK(msg->has_boot_uuid()) << ": " << aos::FlatbufferToJson(msg);
Brian Smartt03c00da2022-02-24 10:25:00 -0800760 // Translate from the channel index that the event loop uses to the
761 // channel index in the log file.
Austin Schuhf2d0e682022-10-16 14:20:58 -0700762 const int channel_index =
763 event_loop_to_logged_channel_index_[msg->channel_index()];
Brian Smartt03c00da2022-02-24 10:25:00 -0800764
765 const aos::monotonic_clock::time_point monotonic_timestamp_time =
766 f.fetcher->context().monotonic_event_time;
Brian Smartt03c00da2022-02-24 10:25:00 -0800767
Brian Smartt03c00da2022-02-24 10:25:00 -0800768 // Timestamps tell us information about what happened too!
769 // Capture any reboots so UpdateRemote is properly recorded.
770 contents_writer->UpdateBoot(UUID::FromVector(msg->boot_uuid()));
771
772 // Start with recording info about the data flowing from our node to the
773 // remote.
774 const bool reliable =
775 f.channel_reliable_contents.size() != 0u
776 ? f.channel_reliable_contents[msg->channel_index()]
777 : f.reliable_contents;
778
Brian Smartt796cca02022-04-12 15:07:21 -0700779 contents_writer->UpdateRemote(
780 node_index_, event_loop_->boot_uuid(),
Brian Smartt03c00da2022-02-24 10:25:00 -0800781 monotonic_clock::time_point(
782 chrono::nanoseconds(msg->monotonic_remote_time())),
783 monotonic_clock::time_point(
784 chrono::nanoseconds(msg->monotonic_sent_time())),
785 reliable, monotonic_timestamp_time);
786
Austin Schuh48d10d62022-10-16 22:19:23 -0700787 RemoteMessageCopier coppier(msg, channel_index, monotonic_timestamp_time,
Naman Gupta41d70c22022-11-21 15:29:52 -0800788 event_loop_);
Austin Schuh48d10d62022-10-16 22:19:23 -0700789
Mithun Bharadwaja5f9d482023-08-02 16:10:40 -0700790 contents_writer->CopyRemoteTimestampMessage(
Mithun Bharadwaja5cb8e02023-08-02 16:10:40 -0700791 &coppier, UUID::FromVector(msg->boot_uuid()), start,
792 monotonic_clock::time_point(
793 chrono::nanoseconds(msg->monotonic_sent_time())));
Austin Schuh48d10d62022-10-16 22:19:23 -0700794
795 RecordCreateMessageTime(start, coppier.end_time(), f);
Brian Smartt03c00da2022-02-24 10:25:00 -0800796 }
797}
798
799void Logger::WriteFetchedRecord(FetcherStruct &f) {
800 WriteData(f.writer, f);
801 WriteTimestamps(f.timestamp_writer, f);
802 WriteContent(f.contents_writer, f);
803}
804
Austin Schuh08dba8f2023-05-01 08:29:30 -0700805std::pair<bool, monotonic_clock::time_point> Logger::LogUntil(
806 monotonic_clock::time_point t) {
807 bool wrote_messages = false;
808 monotonic_clock::time_point newest_record = monotonic_clock::min_time;
Brian Smartt03c00da2022-02-24 10:25:00 -0800809
Austin Schuh2d612c82023-07-17 13:37:48 -0700810 log_until_time_ = t;
811
Austin Schuhb06f03b2021-02-17 22:00:37 -0800812 // Grab the latest ServerStatistics message. This will always have the
813 // oppertunity to be >= to the current time, so it will always represent any
814 // reboots which may have happened.
815 WriteMissingTimestamps();
816
817 // Write each channel to disk, one at a time.
818 for (FetcherStruct &f : fetchers_) {
Austin Schuh08dba8f2023-05-01 08:29:30 -0700819 if (f.fetcher->context().data != nullptr) {
820 newest_record =
821 std::max(newest_record, f.fetcher->context().monotonic_event_time);
822 }
823
Austin Schuhb06f03b2021-02-17 22:00:37 -0800824 while (true) {
825 if (f.written) {
826 const auto start = event_loop_->monotonic_now();
Austin Schuh2d612c82023-07-17 13:37:48 -0700827 const bool got_new =
828 f.fetcher->FetchNextIf(std::ref(fetch_next_if_fn_));
Austin Schuhb06f03b2021-02-17 22:00:37 -0800829 const auto end = event_loop_->monotonic_now();
830 RecordFetchResult(start, end, got_new, &f);
831 if (!got_new) {
832 VLOG(2) << "No new data on "
833 << configuration::CleanedChannelToString(
834 f.fetcher->channel());
835 break;
836 }
Austin Schuh08dba8f2023-05-01 08:29:30 -0700837 newest_record =
838 std::max(newest_record, f.fetcher->context().monotonic_event_time);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800839 f.written = false;
840 }
841
842 // TODO(james): Write tests to exercise this logic.
Austin Schuh2d612c82023-07-17 13:37:48 -0700843 CHECK_LE(f.fetcher->context().monotonic_event_time, t);
844
845 // At startup, we can end up grabbing a message at the current time.
846 // Ignore it.
847 if (f.fetcher->context().monotonic_event_time == t) {
Austin Schuhb06f03b2021-02-17 22:00:37 -0800848 break;
849 }
Austin Schuhb06f03b2021-02-17 22:00:37 -0800850
Brian Smartt03c00da2022-02-24 10:25:00 -0800851 WriteFetchedRecord(f);
Austin Schuh08dba8f2023-05-01 08:29:30 -0700852 wrote_messages = true;
Austin Schuhb06f03b2021-02-17 22:00:37 -0800853
854 f.written = true;
855 }
856 }
857 last_synchronized_time_ = t;
Brian Smartt03c00da2022-02-24 10:25:00 -0800858
Austin Schuh08dba8f2023-05-01 08:29:30 -0700859 return std::make_pair(wrote_messages, newest_record);
Austin Schuhb06f03b2021-02-17 22:00:37 -0800860}
861
Austin Schuh30586902021-03-30 22:54:08 -0700862void Logger::DoLogData(const monotonic_clock::time_point end_time,
863 bool run_on_logged) {
Austin Schuh2d612c82023-07-17 13:37:48 -0700864 if (end_time < last_synchronized_time_) return;
865
866 DCHECK(is_started());
Austin Schuhb06f03b2021-02-17 22:00:37 -0800867 // We want to guarantee that messages aren't out of order by more than
868 // max_out_of_order_duration. To do this, we need sync points. Every write
869 // cycle should be a sync point.
870
871 do {
872 // Move the sync point up by at most polling_period. This forces one sync
873 // per iteration, even if it is small.
874 LogUntil(std::min(last_synchronized_time_ + polling_period_, end_time));
875
Austin Schuh30586902021-03-30 22:54:08 -0700876 if (run_on_logged) {
Austin Schuh2f864452023-07-17 14:53:08 -0700877 on_logged_period_(last_synchronized_time_);
Austin Schuh30586902021-03-30 22:54:08 -0700878 }
Austin Schuhb06f03b2021-02-17 22:00:37 -0800879
880 // If we missed cycles, we could be pretty far behind. Spin until we are
881 // caught up.
882 } while (last_synchronized_time_ + polling_period_ < end_time);
883}
884
885void Logger::RecordFetchResult(aos::monotonic_clock::time_point start,
886 aos::monotonic_clock::time_point end,
887 bool got_new, FetcherStruct *fetcher) {
888 const auto duration = end - start;
889 if (!got_new) {
890 ++total_nop_fetch_count_;
891 total_nop_fetch_time_ += duration;
892 return;
893 }
894 ++total_message_fetch_count_;
895 total_message_fetch_bytes_ += fetcher->fetcher->context().size;
896 total_message_fetch_time_ += duration;
897 if (duration > max_message_fetch_time_) {
898 max_message_fetch_time_ = duration;
899 max_message_fetch_time_channel_ = fetcher->channel_index;
900 max_message_fetch_time_size_ = fetcher->fetcher->context().size;
901 }
902}
903
904void Logger::RecordCreateMessageTime(aos::monotonic_clock::time_point start,
905 aos::monotonic_clock::time_point end,
Brian Smartt03c00da2022-02-24 10:25:00 -0800906 const FetcherStruct &fetcher) {
Austin Schuhb06f03b2021-02-17 22:00:37 -0800907 const auto duration = end - start;
908 total_copy_time_ += duration;
909 ++total_copy_count_;
Brian Smartt03c00da2022-02-24 10:25:00 -0800910 total_copy_bytes_ += fetcher.fetcher->context().size;
Austin Schuhb06f03b2021-02-17 22:00:37 -0800911 if (duration > max_copy_time_) {
912 max_copy_time_ = duration;
Brian Smartt03c00da2022-02-24 10:25:00 -0800913 max_copy_time_channel_ = fetcher.channel_index;
914 max_copy_time_size_ = fetcher.fetcher->context().size;
Austin Schuhb06f03b2021-02-17 22:00:37 -0800915 }
Philipp Schrader78dc1212023-08-16 18:17:47 -0700916 const auto log_delay = end - fetcher.fetcher->context().monotonic_event_time;
917 if (log_delay > max_log_delay_) {
918 max_log_delay_ = log_delay;
919 max_log_delay_channel_ = fetcher.channel_index;
920 }
Austin Schuhb06f03b2021-02-17 22:00:37 -0800921}
922
Stephan Pleinesf63bde82024-01-13 15:59:33 -0800923} // namespace aos::logger