blob: c111a739acf63456c48c721fc40fc747846aa8b3 [file] [log] [blame]
Austin Schuhb06f03b2021-02-17 22:00:37 -08001#include "aos/events/logging/log_reader.h"
Austin Schuhe309d2a2019-11-29 13:25:21 -08002
3#include <fcntl.h>
Austin Schuh4c4e0092019-12-22 16:18:03 -08004#include <limits.h>
Austin Schuhe309d2a2019-11-29 13:25:21 -08005#include <sys/stat.h>
6#include <sys/types.h>
7#include <sys/uio.h>
Brian Silverman8ff74aa2021-02-05 16:37:15 -08008
Austin Schuhe309d2a2019-11-29 13:25:21 -08009#include <vector>
10
Austin Schuh2f8fd752020-09-01 22:38:28 -070011#include "absl/strings/escaping.h"
Austin Schuhe309d2a2019-11-29 13:25:21 -080012#include "absl/types/span.h"
13#include "aos/events/event_loop.h"
Austin Schuhf6f9bf32020-10-11 14:37:43 -070014#include "aos/events/logging/logfile_sorting.h"
James Kuszmaul38735e82019-12-07 16:42:06 -080015#include "aos/events/logging/logger_generated.h"
Austin Schuh64fab802020-09-09 22:47:47 -070016#include "aos/events/logging/uuid.h"
Austin Schuhe309d2a2019-11-29 13:25:21 -080017#include "aos/flatbuffer_merge.h"
Austin Schuh0ca1fd32020-12-18 22:53:05 -080018#include "aos/network/multinode_timestamp_filter.h"
Austin Schuh0de30f32020-12-06 12:44:28 -080019#include "aos/network/remote_message_generated.h"
20#include "aos/network/remote_message_schema.h"
Austin Schuh288479d2019-12-18 19:47:52 -080021#include "aos/network/team_number.h"
Austin Schuhe309d2a2019-11-29 13:25:21 -080022#include "aos/time/time.h"
Brian Silvermanae7c0332020-09-30 16:58:23 -070023#include "aos/util/file.h"
Austin Schuhe309d2a2019-11-29 13:25:21 -080024#include "flatbuffers/flatbuffers.h"
Austin Schuh8c399962020-12-25 21:51:45 -080025#include "openssl/sha.h"
Austin Schuhe309d2a2019-11-29 13:25:21 -080026
Austin Schuh15649d62019-12-28 16:36:38 -080027DEFINE_bool(skip_missing_forwarding_entries, false,
28 "If true, drop any forwarding entries with missing data. If "
29 "false, CHECK.");
Austin Schuhe309d2a2019-11-29 13:25:21 -080030
Austin Schuh0ca1fd32020-12-18 22:53:05 -080031DECLARE_bool(timestamps_to_csv);
Austin Schuh8bd96322020-02-13 21:18:22 -080032
Austin Schuh2f8fd752020-09-01 22:38:28 -070033DEFINE_bool(skip_order_validation, false,
34 "If true, ignore any out of orderness in replay");
35
Austin Schuhf0688662020-12-19 15:37:45 -080036DEFINE_double(
37 time_estimation_buffer_seconds, 2.0,
38 "The time to buffer ahead in the log file to accurately reconstruct time.");
39
Austin Schuhe309d2a2019-11-29 13:25:21 -080040namespace aos {
41namespace logger {
Austin Schuh0afc4d12020-10-19 11:42:04 -070042namespace {
Austin Schuh8c399962020-12-25 21:51:45 -080043
Austin Schuh315b96b2020-12-11 21:21:12 -080044std::string LogFileVectorToString(std::vector<LogFile> log_files) {
45 std::stringstream ss;
Austin Schuh297d2352021-01-21 19:02:17 -080046 for (const auto &f : log_files) {
Austin Schuh315b96b2020-12-11 21:21:12 -080047 ss << f << "\n";
48 }
49 return ss.str();
50}
51
Austin Schuh0de30f32020-12-06 12:44:28 -080052// Copies the channel, removing the schema as we go. If new_name is provided,
53// it is used instead of the name inside the channel. If new_type is provided,
54// it is used instead of the type in the channel.
55flatbuffers::Offset<Channel> CopyChannel(const Channel *c,
56 std::string_view new_name,
57 std::string_view new_type,
58 flatbuffers::FlatBufferBuilder *fbb) {
59 flatbuffers::Offset<flatbuffers::String> name_offset =
60 fbb->CreateSharedString(new_name.empty() ? c->name()->string_view()
61 : new_name);
62 flatbuffers::Offset<flatbuffers::String> type_offset =
63 fbb->CreateSharedString(new_type.empty() ? c->type()->str() : new_type);
64 flatbuffers::Offset<flatbuffers::String> source_node_offset =
65 c->has_source_node() ? fbb->CreateSharedString(c->source_node()->str())
66 : 0;
67
68 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<Connection>>>
69 destination_nodes_offset =
70 aos::RecursiveCopyVectorTable(c->destination_nodes(), fbb);
71
72 flatbuffers::Offset<
73 flatbuffers::Vector<flatbuffers::Offset<flatbuffers::String>>>
74 logger_nodes_offset = aos::CopyVectorSharedString(c->logger_nodes(), fbb);
75
76 Channel::Builder channel_builder(*fbb);
77 channel_builder.add_name(name_offset);
78 channel_builder.add_type(type_offset);
79 if (c->has_frequency()) {
80 channel_builder.add_frequency(c->frequency());
81 }
82 if (c->has_max_size()) {
83 channel_builder.add_max_size(c->max_size());
84 }
85 if (c->has_num_senders()) {
86 channel_builder.add_num_senders(c->num_senders());
87 }
88 if (c->has_num_watchers()) {
89 channel_builder.add_num_watchers(c->num_watchers());
90 }
91 if (!source_node_offset.IsNull()) {
92 channel_builder.add_source_node(source_node_offset);
93 }
94 if (!destination_nodes_offset.IsNull()) {
95 channel_builder.add_destination_nodes(destination_nodes_offset);
96 }
97 if (c->has_logger()) {
98 channel_builder.add_logger(c->logger());
99 }
100 if (!logger_nodes_offset.IsNull()) {
101 channel_builder.add_logger_nodes(logger_nodes_offset);
102 }
103 if (c->has_read_method()) {
104 channel_builder.add_read_method(c->read_method());
105 }
106 if (c->has_num_readers()) {
107 channel_builder.add_num_readers(c->num_readers());
108 }
109 return channel_builder.Finish();
110}
111
Austin Schuhe309d2a2019-11-29 13:25:21 -0800112namespace chrono = std::chrono;
Austin Schuh0de30f32020-12-06 12:44:28 -0800113using message_bridge::RemoteMessage;
Austin Schuh0afc4d12020-10-19 11:42:04 -0700114} // namespace
Austin Schuhe309d2a2019-11-29 13:25:21 -0800115
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800116LogReader::LogReader(std::string_view filename,
117 const Configuration *replay_configuration)
Austin Schuh287d43d2020-12-04 20:19:33 -0800118 : LogReader(SortParts({std::string(filename)}), replay_configuration) {}
Austin Schuhfa895892020-01-07 20:07:41 -0800119
Austin Schuh287d43d2020-12-04 20:19:33 -0800120LogReader::LogReader(std::vector<LogFile> log_files,
Austin Schuhfa895892020-01-07 20:07:41 -0800121 const Configuration *replay_configuration)
Austin Schuh287d43d2020-12-04 20:19:33 -0800122 : log_files_(std::move(log_files)),
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800123 replay_configuration_(replay_configuration) {
Austin Schuh0ca51f32020-12-25 21:51:45 -0800124 CHECK_GT(log_files_.size(), 0u);
125 {
126 // Validate that we have the same config everwhere. This will be true if
127 // all the parts were sorted together and the configs match.
128 const Configuration *config = nullptr;
Austin Schuh297d2352021-01-21 19:02:17 -0800129 for (const LogFile &log_file : log_files_) {
130 if (log_file.config.get() == nullptr) {
131 LOG(FATAL) << "Couldn't find a config in " << log_file;
132 }
Austin Schuh0ca51f32020-12-25 21:51:45 -0800133 if (config == nullptr) {
134 config = log_file.config.get();
135 } else {
136 CHECK_EQ(config, log_file.config.get());
137 }
138 }
139 }
Austin Schuhdda74ec2021-01-03 19:30:37 -0800140
Austin Schuh6331ef92020-01-07 18:28:09 -0800141 MakeRemappedConfig();
Austin Schuh6f3babe2020-01-26 20:34:50 -0800142
Austin Schuh8d7e0bb2020-10-02 17:57:00 -0700143 // Remap all existing remote timestamp channels. They will be recreated, and
144 // the data logged isn't relevant anymore.
Austin Schuh3c5dae52020-10-06 18:55:18 -0700145 for (const Node *node : configuration::GetNodes(logged_configuration())) {
Austin Schuh8d7e0bb2020-10-02 17:57:00 -0700146 std::vector<const Node *> timestamp_logger_nodes =
147 configuration::TimestampNodes(logged_configuration(), node);
148 for (const Node *remote_node : timestamp_logger_nodes) {
149 const std::string channel = absl::StrCat(
150 "/aos/remote_timestamps/", remote_node->name()->string_view());
Austin Schuh0de30f32020-12-06 12:44:28 -0800151 // See if the log file is an old log with MessageHeader channels in it, or
152 // a newer log with RemoteMessage. If we find an older log, rename the
153 // type too along with the name.
154 if (HasChannel<MessageHeader>(channel, node)) {
155 CHECK(!HasChannel<RemoteMessage>(channel, node))
156 << ": Can't have both a MessageHeader and RemoteMessage remote "
157 "timestamp channel.";
James Kuszmaul4f106fb2021-01-05 20:53:02 -0800158 // In theory, we should check NOT_LOGGED like RemoteMessage and be more
159 // careful about updating the config, but there are fewer and fewer logs
160 // with MessageHeader remote messages, so it isn't worth the effort.
Austin Schuh0de30f32020-12-06 12:44:28 -0800161 RemapLoggedChannel<MessageHeader>(channel, node, "/original",
162 "aos.message_bridge.RemoteMessage");
163 } else {
164 CHECK(HasChannel<RemoteMessage>(channel, node))
165 << ": Failed to find {\"name\": \"" << channel << "\", \"type\": \""
166 << RemoteMessage::GetFullyQualifiedName() << "\"} for node "
167 << node->name()->string_view();
James Kuszmaul4f106fb2021-01-05 20:53:02 -0800168 // Only bother to remap if there's something on the channel. We can
169 // tell if the channel was marked NOT_LOGGED or not. This makes the
170 // config not change un-necesarily when we replay a log with NOT_LOGGED
171 // messages.
172 if (HasLoggedChannel<RemoteMessage>(channel, node)) {
173 RemapLoggedChannel<RemoteMessage>(channel, node);
174 }
Austin Schuh0de30f32020-12-06 12:44:28 -0800175 }
Austin Schuh8d7e0bb2020-10-02 17:57:00 -0700176 }
177 }
178
Austin Schuh6aa77be2020-02-22 21:06:40 -0800179 if (replay_configuration) {
180 CHECK_EQ(configuration::MultiNode(configuration()),
181 configuration::MultiNode(replay_configuration))
Austin Schuh2f8fd752020-09-01 22:38:28 -0700182 << ": Log file and replay config need to both be multi or single "
183 "node.";
Austin Schuh6aa77be2020-02-22 21:06:40 -0800184 }
185
Austin Schuh6f3babe2020-01-26 20:34:50 -0800186 if (!configuration::MultiNode(configuration())) {
Austin Schuh287d43d2020-12-04 20:19:33 -0800187 states_.emplace_back(std::make_unique<State>(
188 std::make_unique<TimestampMapper>(FilterPartsForNode(log_files_, ""))));
Austin Schuh8bd96322020-02-13 21:18:22 -0800189 } else {
Austin Schuh6aa77be2020-02-22 21:06:40 -0800190 if (replay_configuration) {
James Kuszmaul46d82582020-05-09 19:50:09 -0700191 CHECK_EQ(logged_configuration()->nodes()->size(),
Austin Schuh6aa77be2020-02-22 21:06:40 -0800192 replay_configuration->nodes()->size())
Austin Schuh2f8fd752020-09-01 22:38:28 -0700193 << ": Log file and replay config need to have matching nodes "
194 "lists.";
James Kuszmaul46d82582020-05-09 19:50:09 -0700195 for (const Node *node : *logged_configuration()->nodes()) {
196 if (configuration::GetNode(replay_configuration, node) == nullptr) {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700197 LOG(FATAL) << "Found node " << FlatbufferToJson(node)
198 << " in logged config that is not present in the replay "
199 "config.";
James Kuszmaul46d82582020-05-09 19:50:09 -0700200 }
201 }
Austin Schuh6aa77be2020-02-22 21:06:40 -0800202 }
Austin Schuh8bd96322020-02-13 21:18:22 -0800203 states_.resize(configuration()->nodes()->size());
Austin Schuh6f3babe2020-01-26 20:34:50 -0800204 }
Austin Schuhe309d2a2019-11-29 13:25:21 -0800205}
206
Austin Schuh6aa77be2020-02-22 21:06:40 -0800207LogReader::~LogReader() {
Austin Schuh39580f12020-08-01 14:44:08 -0700208 if (event_loop_factory_unique_ptr_) {
209 Deregister();
210 } else if (event_loop_factory_ != nullptr) {
211 LOG(FATAL) << "Must call Deregister before the SimulatedEventLoopFactory "
212 "is destroyed";
213 }
Austin Schuh2f8fd752020-09-01 22:38:28 -0700214 // Zero out some buffers. It's easy to do use-after-frees on these, so make
215 // it more obvious.
Austin Schuh39580f12020-08-01 14:44:08 -0700216 if (remapped_configuration_buffer_) {
217 remapped_configuration_buffer_->Wipe();
218 }
Austin Schuh8bd96322020-02-13 21:18:22 -0800219}
Austin Schuhe309d2a2019-11-29 13:25:21 -0800220
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800221const Configuration *LogReader::logged_configuration() const {
Austin Schuh0ca51f32020-12-25 21:51:45 -0800222 return log_files_[0].config.get();
Austin Schuhe309d2a2019-11-29 13:25:21 -0800223}
224
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800225const Configuration *LogReader::configuration() const {
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800226 return remapped_configuration_;
227}
228
Austin Schuh07676622021-01-21 18:59:17 -0800229std::vector<const Node *> LogReader::LoggedNodes() const {
230 return configuration::GetNodes(logged_configuration());
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800231}
Austin Schuh15649d62019-12-28 16:36:38 -0800232
Austin Schuh11d43732020-09-21 17:28:30 -0700233monotonic_clock::time_point LogReader::monotonic_start_time(
234 const Node *node) const {
Austin Schuh8bd96322020-02-13 21:18:22 -0800235 State *state =
236 states_[configuration::GetNodeIndex(configuration(), node)].get();
237 CHECK(state != nullptr) << ": Unknown node " << FlatbufferToJson(node);
238
Austin Schuh858c9f32020-08-31 16:56:12 -0700239 return state->monotonic_start_time();
Austin Schuhe309d2a2019-11-29 13:25:21 -0800240}
241
Austin Schuh11d43732020-09-21 17:28:30 -0700242realtime_clock::time_point LogReader::realtime_start_time(
243 const Node *node) const {
Austin Schuh8bd96322020-02-13 21:18:22 -0800244 State *state =
245 states_[configuration::GetNodeIndex(configuration(), node)].get();
246 CHECK(state != nullptr) << ": Unknown node " << FlatbufferToJson(node);
247
Austin Schuh858c9f32020-08-31 16:56:12 -0700248 return state->realtime_start_time();
Austin Schuhe309d2a2019-11-29 13:25:21 -0800249}
250
James Kuszmaul84ff3e52020-01-03 19:48:53 -0800251void LogReader::Register() {
252 event_loop_factory_unique_ptr_ =
Austin Schuhac0771c2020-01-07 18:36:30 -0800253 std::make_unique<SimulatedEventLoopFactory>(configuration());
James Kuszmaul84ff3e52020-01-03 19:48:53 -0800254 Register(event_loop_factory_unique_ptr_.get());
255}
256
Austin Schuh92547522019-12-28 14:33:43 -0800257void LogReader::Register(SimulatedEventLoopFactory *event_loop_factory) {
Austin Schuh92547522019-12-28 14:33:43 -0800258 event_loop_factory_ = event_loop_factory;
Austin Schuhe5bbd9e2020-09-21 17:29:20 -0700259 remapped_configuration_ = event_loop_factory_->configuration();
Austin Schuh0ca1fd32020-12-18 22:53:05 -0800260 filters_ =
261 std::make_unique<message_bridge::MultiNodeNoncausalOffsetEstimator>(
Austin Schuhba20ea72021-01-21 16:47:01 -0800262 event_loop_factory_->configuration(), logged_configuration(),
Austin Schuhfe3fb342021-01-16 18:50:37 -0800263 FLAGS_skip_order_validation,
264 chrono::duration_cast<chrono::nanoseconds>(
265 chrono::duration<double>(FLAGS_time_estimation_buffer_seconds)));
Austin Schuh92547522019-12-28 14:33:43 -0800266
Austin Schuhe639ea12021-01-25 13:00:22 -0800267 std::vector<TimestampMapper *> timestamp_mappers;
Brian Silvermand90905f2020-09-23 14:42:56 -0700268 for (const Node *node : configuration::GetNodes(configuration())) {
Austin Schuh8bd96322020-02-13 21:18:22 -0800269 const size_t node_index =
270 configuration::GetNodeIndex(configuration(), node);
Austin Schuh287d43d2020-12-04 20:19:33 -0800271 std::vector<LogParts> filtered_parts = FilterPartsForNode(
272 log_files_, node != nullptr ? node->name()->string_view() : "");
Austin Schuh315b96b2020-12-11 21:21:12 -0800273
274 // Confirm that all the parts are from the same boot if there are enough
275 // parts to not be from the same boot.
276 if (filtered_parts.size() > 1u) {
277 for (size_t i = 1; i < filtered_parts.size(); ++i) {
278 CHECK_EQ(filtered_parts[i].source_boot_uuid,
279 filtered_parts[0].source_boot_uuid)
280 << ": Found parts from different boots "
281 << LogFileVectorToString(log_files_);
282 }
James Kuszmaul4f106fb2021-01-05 20:53:02 -0800283 if (!filtered_parts[0].source_boot_uuid.empty()) {
284 event_loop_factory_->GetNodeEventLoopFactory(node)->set_boot_uuid(
285 filtered_parts[0].source_boot_uuid);
286 }
Austin Schuh315b96b2020-12-11 21:21:12 -0800287 }
288
Austin Schuh287d43d2020-12-04 20:19:33 -0800289 states_[node_index] = std::make_unique<State>(
290 filtered_parts.size() == 0u
291 ? nullptr
292 : std::make_unique<TimestampMapper>(std::move(filtered_parts)));
Austin Schuh8bd96322020-02-13 21:18:22 -0800293 State *state = states_[node_index].get();
Austin Schuh8d7e0bb2020-10-02 17:57:00 -0700294 state->set_event_loop(state->SetNodeEventLoopFactory(
Austin Schuh858c9f32020-08-31 16:56:12 -0700295 event_loop_factory_->GetNodeEventLoopFactory(node)));
Austin Schuh8d7e0bb2020-10-02 17:57:00 -0700296
297 state->SetChannelCount(logged_configuration()->channels()->size());
Austin Schuhe639ea12021-01-25 13:00:22 -0800298 timestamp_mappers.emplace_back(state->timestamp_mapper());
Austin Schuhcde938c2020-02-02 17:30:07 -0800299 }
Austin Schuhe639ea12021-01-25 13:00:22 -0800300 filters_->SetTimestampMappers(std::move(timestamp_mappers));
301
302 // Note: this needs to be set before any times are pulled, or we won't observe
303 // the timestamps.
Austin Schuh87dd3832021-01-01 23:07:31 -0800304 event_loop_factory_->SetTimeConverter(filters_.get());
Austin Schuh8d7e0bb2020-10-02 17:57:00 -0700305
Austin Schuh287d43d2020-12-04 20:19:33 -0800306 for (const Node *node : configuration::GetNodes(configuration())) {
307 const size_t node_index =
308 configuration::GetNodeIndex(configuration(), node);
309 State *state = states_[node_index].get();
310 for (const Node *other_node : configuration::GetNodes(configuration())) {
311 const size_t other_node_index =
312 configuration::GetNodeIndex(configuration(), other_node);
313 State *other_state = states_[other_node_index].get();
314 if (other_state != state) {
315 state->AddPeer(other_state);
316 }
317 }
318 }
319
Austin Schuh8d7e0bb2020-10-02 17:57:00 -0700320 // Register after making all the State objects so we can build references
321 // between them.
322 for (const Node *node : configuration::GetNodes(configuration())) {
323 const size_t node_index =
324 configuration::GetNodeIndex(configuration(), node);
325 State *state = states_[node_index].get();
326
327 Register(state->event_loop());
328 }
329
James Kuszmaul46d82582020-05-09 19:50:09 -0700330 if (live_nodes_ == 0) {
331 LOG(FATAL)
332 << "Don't have logs from any of the nodes in the replay config--are "
333 "you sure that the replay config matches the original config?";
334 }
Austin Schuh6f3babe2020-01-26 20:34:50 -0800335
Austin Schuh87dd3832021-01-01 23:07:31 -0800336 filters_->CheckGraph();
Austin Schuh6f3babe2020-01-26 20:34:50 -0800337
Austin Schuh858c9f32020-08-31 16:56:12 -0700338 for (std::unique_ptr<State> &state : states_) {
339 state->SeedSortedMessages();
340 }
341
Austin Schuh2f8fd752020-09-01 22:38:28 -0700342 // We want to start the log file at the last start time of the log files
343 // from all the nodes. Compute how long each node's simulation needs to run
344 // to move time to this point.
Austin Schuh8bd96322020-02-13 21:18:22 -0800345 distributed_clock::time_point start_time = distributed_clock::min_time;
Austin Schuhcde938c2020-02-02 17:30:07 -0800346
Austin Schuh2f8fd752020-09-01 22:38:28 -0700347 // TODO(austin): We want an "OnStart" callback for each node rather than
348 // running until the last node.
349
Austin Schuh8bd96322020-02-13 21:18:22 -0800350 for (std::unique_ptr<State> &state : states_) {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700351 VLOG(1) << "Start time is " << state->monotonic_start_time() << " for node "
352 << MaybeNodeName(state->event_loop()->node()) << "now "
353 << state->monotonic_now();
Austin Schuh287d43d2020-12-04 20:19:33 -0800354 if (state->monotonic_start_time() == monotonic_clock::min_time) {
355 continue;
356 }
Austin Schuh2f8fd752020-09-01 22:38:28 -0700357 // And start computing the start time on the distributed clock now that
358 // that works.
Austin Schuh858c9f32020-08-31 16:56:12 -0700359 start_time = std::max(
360 start_time, state->ToDistributedClock(state->monotonic_start_time()));
Austin Schuhcde938c2020-02-02 17:30:07 -0800361 }
Austin Schuh2f8fd752020-09-01 22:38:28 -0700362
Austin Schuh87dd3832021-01-01 23:07:31 -0800363 // TODO(austin): If a node doesn't have a start time, we might not queue
364 // enough. If this happens, we'll explode with a frozen error eventually.
365
Austin Schuh2f8fd752020-09-01 22:38:28 -0700366 CHECK_GE(start_time, distributed_clock::epoch())
367 << ": Hmm, we have a node starting before the start of time. Offset "
368 "everything.";
Austin Schuhcde938c2020-02-02 17:30:07 -0800369
Austin Schuh6f3babe2020-01-26 20:34:50 -0800370 // Forwarding is tracked per channel. If it is enabled, we want to turn it
371 // off. Otherwise messages replayed will get forwarded across to the other
Austin Schuh2f8fd752020-09-01 22:38:28 -0700372 // nodes, and also replayed on the other nodes. This may not satisfy all
373 // our users, but it'll start the discussion.
Austin Schuh6f3babe2020-01-26 20:34:50 -0800374 if (configuration::MultiNode(event_loop_factory_->configuration())) {
375 for (size_t i = 0; i < logged_configuration()->channels()->size(); ++i) {
376 const Channel *channel = logged_configuration()->channels()->Get(i);
377 const Node *node = configuration::GetNode(
378 configuration(), channel->source_node()->string_view());
379
Austin Schuh8bd96322020-02-13 21:18:22 -0800380 State *state =
381 states_[configuration::GetNodeIndex(configuration(), node)].get();
Austin Schuh6f3babe2020-01-26 20:34:50 -0800382
383 const Channel *remapped_channel =
Austin Schuh858c9f32020-08-31 16:56:12 -0700384 RemapChannel(state->event_loop(), channel);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800385
386 event_loop_factory_->DisableForwarding(remapped_channel);
387 }
Austin Schuh4c3b9702020-08-30 11:34:55 -0700388
389 // If we are replaying a log, we don't want a bunch of redundant messages
390 // from both the real message bridge and simulated message bridge.
391 event_loop_factory_->DisableStatistics();
Austin Schuh6f3babe2020-01-26 20:34:50 -0800392 }
393
Austin Schuhcde938c2020-02-02 17:30:07 -0800394 // While we are starting the system up, we might be relying on matching data
395 // to timestamps on log files where the timestamp log file starts before the
396 // data. In this case, it is reasonable to expect missing data.
Austin Schuhdda74ec2021-01-03 19:30:37 -0800397 {
398 const bool prior_ignore_missing_data = ignore_missing_data_;
399 ignore_missing_data_ = true;
400 VLOG(1) << "Running until " << start_time << " in Register";
401 event_loop_factory_->RunFor(start_time.time_since_epoch());
402 VLOG(1) << "At start time";
403 // Now that we are running for real, missing data means that the log file is
404 // corrupted or went wrong.
405 ignore_missing_data_ = prior_ignore_missing_data;
406 }
Austin Schuh92547522019-12-28 14:33:43 -0800407
Austin Schuh8bd96322020-02-13 21:18:22 -0800408 for (std::unique_ptr<State> &state : states_) {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700409 // Make the RT clock be correct before handing it to the user.
410 if (state->realtime_start_time() != realtime_clock::min_time) {
411 state->SetRealtimeOffset(state->monotonic_start_time(),
412 state->realtime_start_time());
413 }
414 VLOG(1) << "Start time is " << state->monotonic_start_time() << " for node "
415 << MaybeNodeName(state->event_loop()->node()) << "now "
416 << state->monotonic_now();
417 }
418
419 if (FLAGS_timestamps_to_csv) {
Austin Schuh0ca1fd32020-12-18 22:53:05 -0800420 filters_->Start(event_loop_factory);
Austin Schuh8bd96322020-02-13 21:18:22 -0800421 }
422}
423
Austin Schuh2f8fd752020-09-01 22:38:28 -0700424message_bridge::NoncausalOffsetEstimator *LogReader::GetFilter(
Austin Schuh8bd96322020-02-13 21:18:22 -0800425 const Node *node_a, const Node *node_b) {
Austin Schuh0ca1fd32020-12-18 22:53:05 -0800426 if (filters_) {
427 return filters_->GetFilter(node_a, node_b);
Austin Schuh8bd96322020-02-13 21:18:22 -0800428 }
Austin Schuh0ca1fd32020-12-18 22:53:05 -0800429 return nullptr;
Austin Schuh8bd96322020-02-13 21:18:22 -0800430}
431
Austin Schuhe309d2a2019-11-29 13:25:21 -0800432void LogReader::Register(EventLoop *event_loop) {
Austin Schuh8bd96322020-02-13 21:18:22 -0800433 State *state =
434 states_[configuration::GetNodeIndex(configuration(), event_loop->node())]
435 .get();
Austin Schuh6f3babe2020-01-26 20:34:50 -0800436
Austin Schuh858c9f32020-08-31 16:56:12 -0700437 state->set_event_loop(event_loop);
Austin Schuhe309d2a2019-11-29 13:25:21 -0800438
Tyler Chatow67ddb032020-01-12 14:30:04 -0800439 // We don't run timing reports when trying to print out logged data, because
440 // otherwise we would end up printing out the timing reports themselves...
441 // This is only really relevant when we are replaying into a simulation.
Austin Schuh6f3babe2020-01-26 20:34:50 -0800442 event_loop->SkipTimingReport();
443 event_loop->SkipAosLog();
Austin Schuh39788ff2019-12-01 18:22:57 -0800444
Austin Schuh8d7e0bb2020-10-02 17:57:00 -0700445 for (size_t logged_channel_index = 0;
446 logged_channel_index < logged_configuration()->channels()->size();
447 ++logged_channel_index) {
448 const Channel *channel = RemapChannel(
449 event_loop,
450 logged_configuration()->channels()->Get(logged_channel_index));
Austin Schuh8bd96322020-02-13 21:18:22 -0800451
Austin Schuh532656d2021-01-11 10:17:18 -0800452 if (channel->logger() == LoggerConfig::NOT_LOGGED) {
453 continue;
454 }
455
Austin Schuh2f8fd752020-09-01 22:38:28 -0700456 message_bridge::NoncausalOffsetEstimator *filter = nullptr;
Austin Schuh969cd602021-01-03 00:09:45 -0800457 RemoteMessageSender *remote_timestamp_sender = nullptr;
Austin Schuh8d7e0bb2020-10-02 17:57:00 -0700458
459 State *source_state = nullptr;
Austin Schuh8bd96322020-02-13 21:18:22 -0800460
461 if (!configuration::ChannelIsSendableOnNode(channel, event_loop->node()) &&
462 configuration::ChannelIsReadableOnNode(channel, event_loop->node())) {
Austin Schuh8d7e0bb2020-10-02 17:57:00 -0700463 // We've got a message which is being forwarded to this node.
464 const Node *source_node = configuration::GetNode(
Austin Schuh8bd96322020-02-13 21:18:22 -0800465 event_loop->configuration(), channel->source_node()->string_view());
Austin Schuh8d7e0bb2020-10-02 17:57:00 -0700466 filter = GetFilter(event_loop->node(), source_node);
Austin Schuh8bd96322020-02-13 21:18:22 -0800467
Austin Schuh8d7e0bb2020-10-02 17:57:00 -0700468 // Delivery timestamps are supposed to be logged back on the source node.
469 // Configure remote timestamps to be sent.
470 const bool delivery_time_is_logged =
471 configuration::ConnectionDeliveryTimeIsLoggedOnNode(
472 channel, event_loop->node(), source_node);
473
474 source_state =
475 states_[configuration::GetNodeIndex(configuration(), source_node)]
476 .get();
477
478 if (delivery_time_is_logged) {
479 remote_timestamp_sender =
480 source_state->RemoteTimestampSender(event_loop->node());
Austin Schuh8bd96322020-02-13 21:18:22 -0800481 }
482 }
Austin Schuh858c9f32020-08-31 16:56:12 -0700483
Austin Schuh8d7e0bb2020-10-02 17:57:00 -0700484 state->SetChannel(
485 logged_channel_index,
486 configuration::ChannelIndex(event_loop->configuration(), channel),
487 event_loop->MakeRawSender(channel), filter, remote_timestamp_sender,
488 source_state);
Austin Schuhe309d2a2019-11-29 13:25:21 -0800489 }
490
Austin Schuh6aa77be2020-02-22 21:06:40 -0800491 // If we didn't find any log files with data in them, we won't ever get a
492 // callback or be live. So skip the rest of the setup.
Austin Schuh287d43d2020-12-04 20:19:33 -0800493 if (state->OldestMessageTime() == monotonic_clock::max_time) {
Austin Schuh6aa77be2020-02-22 21:06:40 -0800494 return;
495 }
496
Austin Schuh858c9f32020-08-31 16:56:12 -0700497 state->set_timer_handler(event_loop->AddTimer([this, state]() {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700498 VLOG(1) << "Starting sending " << MaybeNodeName(state->event_loop()->node())
499 << "at " << state->event_loop()->context().monotonic_event_time
500 << " now " << state->monotonic_now();
Austin Schuh858c9f32020-08-31 16:56:12 -0700501 if (state->OldestMessageTime() == monotonic_clock::max_time) {
Austin Schuh6f3babe2020-01-26 20:34:50 -0800502 --live_nodes_;
Austin Schuh2f8fd752020-09-01 22:38:28 -0700503 VLOG(1) << MaybeNodeName(state->event_loop()->node()) << "Node down!";
James Kuszmaul71a81932020-12-15 21:08:01 -0800504 if (exit_on_finish_ && live_nodes_ == 0) {
Austin Schuh6f3babe2020-01-26 20:34:50 -0800505 event_loop_factory_->Exit();
506 }
James Kuszmaul314f1672020-01-03 20:02:08 -0800507 return;
508 }
Austin Schuh2f8fd752020-09-01 22:38:28 -0700509
Austin Schuhdda74ec2021-01-03 19:30:37 -0800510 TimestampedMessage timestamped_message = state->PopOldest();
Austin Schuh05b70472020-01-01 17:11:17 -0800511
Austin Schuhe309d2a2019-11-29 13:25:21 -0800512 const monotonic_clock::time_point monotonic_now =
Austin Schuh858c9f32020-08-31 16:56:12 -0700513 state->event_loop()->context().monotonic_event_time;
Austin Schuh2f8fd752020-09-01 22:38:28 -0700514 if (!FLAGS_skip_order_validation) {
Austin Schuh287d43d2020-12-04 20:19:33 -0800515 CHECK(monotonic_now == timestamped_message.monotonic_event_time)
Austin Schuh2f8fd752020-09-01 22:38:28 -0700516 << ": " << FlatbufferToJson(state->event_loop()->node()) << " Now "
517 << monotonic_now << " trying to send "
Austin Schuh287d43d2020-12-04 20:19:33 -0800518 << timestamped_message.monotonic_event_time << " failure "
Austin Schuh2f8fd752020-09-01 22:38:28 -0700519 << state->DebugString();
Austin Schuh287d43d2020-12-04 20:19:33 -0800520 } else if (monotonic_now != timestamped_message.monotonic_event_time) {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700521 LOG(WARNING) << "Check failed: monotonic_now == "
Austin Schuh287d43d2020-12-04 20:19:33 -0800522 "timestamped_message.monotonic_event_time) ("
Austin Schuh2f8fd752020-09-01 22:38:28 -0700523 << monotonic_now << " vs. "
Austin Schuh287d43d2020-12-04 20:19:33 -0800524 << timestamped_message.monotonic_event_time
Austin Schuh2f8fd752020-09-01 22:38:28 -0700525 << "): " << FlatbufferToJson(state->event_loop()->node())
526 << " Now " << monotonic_now << " trying to send "
Austin Schuh287d43d2020-12-04 20:19:33 -0800527 << timestamped_message.monotonic_event_time << " failure "
Austin Schuh2f8fd752020-09-01 22:38:28 -0700528 << state->DebugString();
529 }
Austin Schuhe309d2a2019-11-29 13:25:21 -0800530
Austin Schuh287d43d2020-12-04 20:19:33 -0800531 if (timestamped_message.monotonic_event_time >
Austin Schuh858c9f32020-08-31 16:56:12 -0700532 state->monotonic_start_time() ||
Austin Schuh15649d62019-12-28 16:36:38 -0800533 event_loop_factory_ != nullptr) {
Austin Schuhdda74ec2021-01-03 19:30:37 -0800534 if (timestamped_message.data.span().size() != 0u) {
535 if (timestamped_message.monotonic_remote_time !=
536 monotonic_clock::min_time) {
Austin Schuh8bd96322020-02-13 21:18:22 -0800537 // Confirm that the message was sent on the sending node before the
538 // destination node (this node). As a proxy, do this by making sure
539 // that time on the source node is past when the message was sent.
Austin Schuh87dd3832021-01-01 23:07:31 -0800540 //
541 // TODO(austin): <= means that the cause message (which we know) could
542 // happen after the effect even though we know they are at the same
543 // time. I doubt anyone will notice for a bit, but we should really
544 // fix that.
Austin Schuh2f8fd752020-09-01 22:38:28 -0700545 if (!FLAGS_skip_order_validation) {
Austin Schuh87dd3832021-01-01 23:07:31 -0800546 CHECK_LE(
Austin Schuh287d43d2020-12-04 20:19:33 -0800547 timestamped_message.monotonic_remote_time,
548 state->monotonic_remote_now(timestamped_message.channel_index))
Austin Schuh2f8fd752020-09-01 22:38:28 -0700549 << state->event_loop()->node()->name()->string_view() << " to "
Austin Schuh287d43d2020-12-04 20:19:33 -0800550 << state->remote_node(timestamped_message.channel_index)
551 ->name()
552 ->string_view()
Austin Schuh315b96b2020-12-11 21:21:12 -0800553 << " while trying to send a message on "
554 << configuration::CleanedChannelToString(
555 logged_configuration()->channels()->Get(
556 timestamped_message.channel_index))
Austin Schuh2f8fd752020-09-01 22:38:28 -0700557 << " " << state->DebugString();
Austin Schuh87dd3832021-01-01 23:07:31 -0800558 } else if (timestamped_message.monotonic_remote_time >
Austin Schuh287d43d2020-12-04 20:19:33 -0800559 state->monotonic_remote_now(
560 timestamped_message.channel_index)) {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700561 LOG(WARNING)
Austin Schuh287d43d2020-12-04 20:19:33 -0800562 << "Check failed: timestamped_message.monotonic_remote_time < "
563 "state->monotonic_remote_now(timestamped_message.channel_"
564 "index) ("
565 << timestamped_message.monotonic_remote_time << " vs. "
566 << state->monotonic_remote_now(
567 timestamped_message.channel_index)
568 << ") " << state->event_loop()->node()->name()->string_view()
569 << " to "
570 << state->remote_node(timestamped_message.channel_index)
571 ->name()
572 ->string_view()
573 << " currently " << timestamped_message.monotonic_event_time
Austin Schuh2f8fd752020-09-01 22:38:28 -0700574 << " ("
575 << state->ToDistributedClock(
Austin Schuh287d43d2020-12-04 20:19:33 -0800576 timestamped_message.monotonic_event_time)
Austin Schuh2f8fd752020-09-01 22:38:28 -0700577 << ") remote event time "
Austin Schuh287d43d2020-12-04 20:19:33 -0800578 << timestamped_message.monotonic_remote_time << " ("
Austin Schuh2f8fd752020-09-01 22:38:28 -0700579 << state->RemoteToDistributedClock(
Austin Schuh287d43d2020-12-04 20:19:33 -0800580 timestamped_message.channel_index,
581 timestamped_message.monotonic_remote_time)
Austin Schuh2f8fd752020-09-01 22:38:28 -0700582 << ") " << state->DebugString();
583 }
Austin Schuh8bd96322020-02-13 21:18:22 -0800584 }
585
Austin Schuh15649d62019-12-28 16:36:38 -0800586 // If we have access to the factory, use it to fix the realtime time.
Austin Schuh287d43d2020-12-04 20:19:33 -0800587 state->SetRealtimeOffset(timestamped_message.monotonic_event_time,
588 timestamped_message.realtime_event_time);
Austin Schuh15649d62019-12-28 16:36:38 -0800589
Austin Schuh2f8fd752020-09-01 22:38:28 -0700590 VLOG(1) << MaybeNodeName(state->event_loop()->node()) << "Sending "
Austin Schuh287d43d2020-12-04 20:19:33 -0800591 << timestamped_message.monotonic_event_time;
Austin Schuh2f8fd752020-09-01 22:38:28 -0700592 // TODO(austin): std::move channel_data in and make that efficient in
593 // simulation.
Austin Schuh287d43d2020-12-04 20:19:33 -0800594 state->Send(std::move(timestamped_message));
Austin Schuhdda74ec2021-01-03 19:30:37 -0800595 } else if (!ignore_missing_data_ &&
Austin Schuh5ee56872021-01-30 16:53:34 -0800596 // When starting up, we can have data which was sent before the
597 // log starts, but the timestamp was after the log starts. This
598 // is unreasonable to avoid, so ignore the missing data.
599 timestamped_message.monotonic_remote_time >=
600 state->monotonic_remote_start_time(
601 timestamped_message.channel_index) &&
Austin Schuhdda74ec2021-01-03 19:30:37 -0800602 !FLAGS_skip_missing_forwarding_entries) {
Austin Schuh5ee56872021-01-30 16:53:34 -0800603 // We've found a timestamp without data that we expect to have data for.
604 // This likely means that we are at the end of the log file. Record it
605 // and CHECK that in the rest of the log file, we don't find any more
606 // data on that channel. Not all channels will end at the same point in
607 // time since they can be in different files.
Austin Schuhdda74ec2021-01-03 19:30:37 -0800608 VLOG(1) << "Found the last message on channel "
609 << timestamped_message.channel_index;
610
611 // Vector storing if we've seen a nullptr message or not per channel.
612 std::vector<bool> last_message;
613 last_message.resize(logged_configuration()->channels()->size(), false);
614
615 last_message[timestamped_message.channel_index] = true;
616
617 // Now that we found the end of one channel, artificially stop the
618 // rest. It is confusing when part of your data gets replayed but not
Austin Schuh5ee56872021-01-30 16:53:34 -0800619 // all. Read the rest of the messages and drop them on the floor while
620 // doing some basic validation.
Austin Schuh858c9f32020-08-31 16:56:12 -0700621 while (state->OldestMessageTime() != monotonic_clock::max_time) {
Austin Schuhdda74ec2021-01-03 19:30:37 -0800622 TimestampedMessage next = state->PopOldest();
623 // Make sure that once we have seen the last message on a channel,
624 // data doesn't start back up again. If the user wants to play
625 // through events like this, they can set
626 // --skip_missing_forwarding_entries or ignore_missing_data_.
627 CHECK_LT(next.channel_index, last_message.size());
628 if (next.data.span().size() == 0u) {
629 last_message[next.channel_index] = true;
630 } else {
631 if (last_message[next.channel_index]) {
632 LOG(FATAL)
633 << "Found missing data in the middle of the log file on "
634 "channel "
635 << next.channel_index << " Last "
636 << last_message[next.channel_index] << state->DebugString();
637 }
638 }
Austin Schuh8bd96322020-02-13 21:18:22 -0800639 }
Austin Schuh92547522019-12-28 14:33:43 -0800640 }
Austin Schuhe309d2a2019-11-29 13:25:21 -0800641 } else {
Austin Schuh6f3babe2020-01-26 20:34:50 -0800642 LOG(WARNING)
643 << "Not sending data from before the start of the log file. "
Austin Schuh287d43d2020-12-04 20:19:33 -0800644 << timestamped_message.monotonic_event_time.time_since_epoch().count()
Austin Schuh6f3babe2020-01-26 20:34:50 -0800645 << " start " << monotonic_start_time().time_since_epoch().count()
Austin Schuhd85baf82020-10-19 11:50:12 -0700646 << " "
Austin Schuh287d43d2020-12-04 20:19:33 -0800647 << FlatbufferToJson(timestamped_message.data,
Austin Schuhd85baf82020-10-19 11:50:12 -0700648 {.multi_line = false, .max_vector_size = 100});
Austin Schuhe309d2a2019-11-29 13:25:21 -0800649 }
650
Austin Schuh858c9f32020-08-31 16:56:12 -0700651 const monotonic_clock::time_point next_time = state->OldestMessageTime();
Austin Schuh6f3babe2020-01-26 20:34:50 -0800652 if (next_time != monotonic_clock::max_time) {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700653 VLOG(1) << "Scheduling " << MaybeNodeName(state->event_loop()->node())
654 << "wakeup for " << next_time << "("
655 << state->ToDistributedClock(next_time)
656 << " distributed), now is " << state->monotonic_now();
Austin Schuh858c9f32020-08-31 16:56:12 -0700657 state->Setup(next_time);
James Kuszmaul314f1672020-01-03 20:02:08 -0800658 } else {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700659 VLOG(1) << MaybeNodeName(state->event_loop()->node())
660 << "No next message, scheduling shutdown";
661 // Set a timer up immediately after now to die. If we don't do this,
662 // then the senders waiting on the message we just read will never get
663 // called.
Austin Schuheecb9282020-01-08 17:43:30 -0800664 if (event_loop_factory_ != nullptr) {
Austin Schuh858c9f32020-08-31 16:56:12 -0700665 state->Setup(monotonic_now + event_loop_factory_->send_delay() +
666 std::chrono::nanoseconds(1));
Austin Schuheecb9282020-01-08 17:43:30 -0800667 }
Austin Schuhe309d2a2019-11-29 13:25:21 -0800668 }
Austin Schuh8bd96322020-02-13 21:18:22 -0800669
Austin Schuh2f8fd752020-09-01 22:38:28 -0700670 VLOG(1) << MaybeNodeName(state->event_loop()->node()) << "Done sending at "
671 << state->event_loop()->context().monotonic_event_time << " now "
672 << state->monotonic_now();
Austin Schuh858c9f32020-08-31 16:56:12 -0700673 }));
Austin Schuhe309d2a2019-11-29 13:25:21 -0800674
Austin Schuh6f3babe2020-01-26 20:34:50 -0800675 ++live_nodes_;
676
Austin Schuh858c9f32020-08-31 16:56:12 -0700677 if (state->OldestMessageTime() != monotonic_clock::max_time) {
678 event_loop->OnRun([state]() { state->Setup(state->OldestMessageTime()); });
Austin Schuhe309d2a2019-11-29 13:25:21 -0800679 }
680}
681
682void LogReader::Deregister() {
James Kuszmaul84ff3e52020-01-03 19:48:53 -0800683 // Make sure that things get destroyed in the correct order, rather than
684 // relying on getting the order correct in the class definition.
Austin Schuh8bd96322020-02-13 21:18:22 -0800685 for (std::unique_ptr<State> &state : states_) {
Austin Schuh858c9f32020-08-31 16:56:12 -0700686 state->Deregister();
Austin Schuhe309d2a2019-11-29 13:25:21 -0800687 }
Austin Schuh92547522019-12-28 14:33:43 -0800688
James Kuszmaul84ff3e52020-01-03 19:48:53 -0800689 event_loop_factory_unique_ptr_.reset();
690 event_loop_factory_ = nullptr;
Austin Schuhe309d2a2019-11-29 13:25:21 -0800691}
692
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800693void LogReader::RemapLoggedChannel(std::string_view name, std::string_view type,
Austin Schuh0de30f32020-12-06 12:44:28 -0800694 std::string_view add_prefix,
695 std::string_view new_type) {
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800696 for (size_t ii = 0; ii < logged_configuration()->channels()->size(); ++ii) {
697 const Channel *const channel = logged_configuration()->channels()->Get(ii);
698 if (channel->name()->str() == name &&
699 channel->type()->string_view() == type) {
700 CHECK_EQ(0u, remapped_channels_.count(ii))
701 << "Already remapped channel "
702 << configuration::CleanedChannelToString(channel);
Austin Schuh0de30f32020-12-06 12:44:28 -0800703 RemappedChannel remapped_channel;
704 remapped_channel.remapped_name =
705 std::string(add_prefix) + std::string(name);
706 remapped_channel.new_type = new_type;
707 remapped_channels_[ii] = std::move(remapped_channel);
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800708 VLOG(1) << "Remapping channel "
709 << configuration::CleanedChannelToString(channel)
Austin Schuh0de30f32020-12-06 12:44:28 -0800710 << " to have name " << remapped_channels_[ii].remapped_name;
Austin Schuh6331ef92020-01-07 18:28:09 -0800711 MakeRemappedConfig();
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800712 return;
713 }
714 }
715 LOG(FATAL) << "Unabled to locate channel with name " << name << " and type "
716 << type;
717}
718
Austin Schuh01b4c352020-09-21 23:09:39 -0700719void LogReader::RemapLoggedChannel(std::string_view name, std::string_view type,
720 const Node *node,
Austin Schuh0de30f32020-12-06 12:44:28 -0800721 std::string_view add_prefix,
722 std::string_view new_type) {
Austin Schuh01b4c352020-09-21 23:09:39 -0700723 VLOG(1) << "Node is " << aos::FlatbufferToJson(node);
724 const Channel *remapped_channel =
725 configuration::GetChannel(logged_configuration(), name, type, "", node);
726 CHECK(remapped_channel != nullptr) << ": Failed to find {\"name\": \"" << name
727 << "\", \"type\": \"" << type << "\"}";
728 VLOG(1) << "Original {\"name\": \"" << name << "\", \"type\": \"" << type
729 << "\"}";
730 VLOG(1) << "Remapped "
731 << aos::configuration::StrippedChannelToString(remapped_channel);
732
733 // We want to make /spray on node 0 go to /0/spray by snooping the maps. And
734 // we want it to degrade if the heuristics fail to just work.
735 //
736 // The easiest way to do this is going to be incredibly specific and verbose.
737 // Look up /spray, to /0/spray. Then, prefix the result with /original to get
738 // /original/0/spray. Then, create a map from /original/spray to
739 // /original/0/spray for just the type we were asked for.
740 if (name != remapped_channel->name()->string_view()) {
741 MapT new_map;
742 new_map.match = std::make_unique<ChannelT>();
743 new_map.match->name = absl::StrCat(add_prefix, name);
744 new_map.match->type = type;
745 if (node != nullptr) {
746 new_map.match->source_node = node->name()->str();
747 }
748 new_map.rename = std::make_unique<ChannelT>();
749 new_map.rename->name =
750 absl::StrCat(add_prefix, remapped_channel->name()->string_view());
751 maps_.emplace_back(std::move(new_map));
752 }
753
754 const size_t channel_index =
755 configuration::ChannelIndex(logged_configuration(), remapped_channel);
756 CHECK_EQ(0u, remapped_channels_.count(channel_index))
757 << "Already remapped channel "
758 << configuration::CleanedChannelToString(remapped_channel);
Austin Schuh0de30f32020-12-06 12:44:28 -0800759
760 RemappedChannel remapped_channel_struct;
761 remapped_channel_struct.remapped_name =
762 std::string(add_prefix) +
763 std::string(remapped_channel->name()->string_view());
764 remapped_channel_struct.new_type = new_type;
765 remapped_channels_[channel_index] = std::move(remapped_channel_struct);
Austin Schuh01b4c352020-09-21 23:09:39 -0700766 MakeRemappedConfig();
767}
768
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800769void LogReader::MakeRemappedConfig() {
Austin Schuh8bd96322020-02-13 21:18:22 -0800770 for (std::unique_ptr<State> &state : states_) {
Austin Schuh6aa77be2020-02-22 21:06:40 -0800771 if (state) {
Austin Schuh858c9f32020-08-31 16:56:12 -0700772 CHECK(!state->event_loop())
Austin Schuh6aa77be2020-02-22 21:06:40 -0800773 << ": Can't change the mapping after the events are scheduled.";
774 }
Austin Schuh6f3babe2020-01-26 20:34:50 -0800775 }
Austin Schuhac0771c2020-01-07 18:36:30 -0800776
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800777 // If no remapping occurred and we are using the original config, then there
778 // is nothing interesting to do here.
779 if (remapped_channels_.empty() && replay_configuration_ == nullptr) {
Austin Schuh6f3babe2020-01-26 20:34:50 -0800780 remapped_configuration_ = logged_configuration();
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800781 return;
782 }
783 // Config to copy Channel definitions from. Use the specified
784 // replay_configuration_ if it has been provided.
785 const Configuration *const base_config = replay_configuration_ == nullptr
786 ? logged_configuration()
787 : replay_configuration_;
Austin Schuh0de30f32020-12-06 12:44:28 -0800788
789 // Create a config with all the channels, but un-sorted/merged. Collect up
790 // the schemas while we do this. Call MergeConfiguration to sort everything,
791 // and then merge it all in together.
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800792
793 // This is the builder that we use for the config containing all the new
794 // channels.
Austin Schuh0de30f32020-12-06 12:44:28 -0800795 flatbuffers::FlatBufferBuilder fbb;
796 fbb.ForceDefaults(true);
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800797 std::vector<flatbuffers::Offset<Channel>> channel_offsets;
Austin Schuh0de30f32020-12-06 12:44:28 -0800798
799 CHECK_EQ(Channel::MiniReflectTypeTable()->num_elems, 13u)
800 << ": Merging logic needs to be updated when the number of channel "
801 "fields changes.";
802
803 // List of schemas.
804 std::map<std::string_view, FlatbufferVector<reflection::Schema>> schema_map;
805 // Make sure our new RemoteMessage schema is in there for old logs without it.
806 schema_map.insert(std::make_pair(
807 RemoteMessage::GetFullyQualifiedName(),
808 FlatbufferVector<reflection::Schema>(FlatbufferSpan<reflection::Schema>(
809 message_bridge::RemoteMessageSchema()))));
810
811 // Reconstruct the remapped channels.
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800812 for (auto &pair : remapped_channels_) {
Austin Schuh0de30f32020-12-06 12:44:28 -0800813 const Channel *const c = CHECK_NOTNULL(configuration::GetChannel(
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800814 base_config, logged_configuration()->channels()->Get(pair.first), "",
815 nullptr));
Austin Schuh0de30f32020-12-06 12:44:28 -0800816 channel_offsets.emplace_back(
817 CopyChannel(c, pair.second.remapped_name, "", &fbb));
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800818 }
Austin Schuh01b4c352020-09-21 23:09:39 -0700819
Austin Schuh0de30f32020-12-06 12:44:28 -0800820 // Now reconstruct the original channels, translating types as needed
821 for (const Channel *c : *base_config->channels()) {
822 // Search for a mapping channel.
823 std::string_view new_type = "";
824 for (auto &pair : remapped_channels_) {
825 const Channel *const remapped_channel =
826 logged_configuration()->channels()->Get(pair.first);
827 if (remapped_channel->name()->string_view() == c->name()->string_view() &&
828 remapped_channel->type()->string_view() == c->type()->string_view()) {
829 new_type = pair.second.new_type;
830 break;
831 }
832 }
833
834 // Copy everything over.
835 channel_offsets.emplace_back(CopyChannel(c, "", new_type, &fbb));
836
837 // Add the schema if it doesn't exist.
838 if (schema_map.find(c->type()->string_view()) == schema_map.end()) {
839 CHECK(c->has_schema());
840 schema_map.insert(std::make_pair(c->type()->string_view(),
841 RecursiveCopyFlatBuffer(c->schema())));
842 }
843 }
844
845 // The MergeConfiguration API takes a vector, not a map. Convert.
846 std::vector<FlatbufferVector<reflection::Schema>> schemas;
847 while (!schema_map.empty()) {
848 schemas.emplace_back(std::move(schema_map.begin()->second));
849 schema_map.erase(schema_map.begin());
850 }
851
852 // Create the Configuration containing the new channels that we want to add.
853 const flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<Channel>>>
854 channels_offset =
855 channel_offsets.empty() ? 0 : fbb.CreateVector(channel_offsets);
856
857 // Copy over the old maps.
Austin Schuh01b4c352020-09-21 23:09:39 -0700858 std::vector<flatbuffers::Offset<Map>> map_offsets;
Austin Schuh0de30f32020-12-06 12:44:28 -0800859 if (base_config->maps()) {
860 for (const Map *map : *base_config->maps()) {
861 map_offsets.emplace_back(aos::RecursiveCopyFlatBuffer(map, &fbb));
862 }
863 }
864
865 // Now create the new maps. These are second so they take effect first.
Austin Schuh01b4c352020-09-21 23:09:39 -0700866 for (const MapT &map : maps_) {
867 const flatbuffers::Offset<flatbuffers::String> match_name_offset =
Austin Schuh0de30f32020-12-06 12:44:28 -0800868 fbb.CreateString(map.match->name);
Austin Schuh01b4c352020-09-21 23:09:39 -0700869 const flatbuffers::Offset<flatbuffers::String> match_type_offset =
Austin Schuh0de30f32020-12-06 12:44:28 -0800870 fbb.CreateString(map.match->type);
Austin Schuh01b4c352020-09-21 23:09:39 -0700871 const flatbuffers::Offset<flatbuffers::String> rename_name_offset =
Austin Schuh0de30f32020-12-06 12:44:28 -0800872 fbb.CreateString(map.rename->name);
Austin Schuh01b4c352020-09-21 23:09:39 -0700873 flatbuffers::Offset<flatbuffers::String> match_source_node_offset;
874 if (!map.match->source_node.empty()) {
Austin Schuh0de30f32020-12-06 12:44:28 -0800875 match_source_node_offset = fbb.CreateString(map.match->source_node);
Austin Schuh01b4c352020-09-21 23:09:39 -0700876 }
Austin Schuh0de30f32020-12-06 12:44:28 -0800877 Channel::Builder match_builder(fbb);
Austin Schuh01b4c352020-09-21 23:09:39 -0700878 match_builder.add_name(match_name_offset);
879 match_builder.add_type(match_type_offset);
880 if (!map.match->source_node.empty()) {
881 match_builder.add_source_node(match_source_node_offset);
882 }
883 const flatbuffers::Offset<Channel> match_offset = match_builder.Finish();
884
Austin Schuh0de30f32020-12-06 12:44:28 -0800885 Channel::Builder rename_builder(fbb);
Austin Schuh01b4c352020-09-21 23:09:39 -0700886 rename_builder.add_name(rename_name_offset);
887 const flatbuffers::Offset<Channel> rename_offset = rename_builder.Finish();
888
Austin Schuh0de30f32020-12-06 12:44:28 -0800889 Map::Builder map_builder(fbb);
Austin Schuh01b4c352020-09-21 23:09:39 -0700890 map_builder.add_match(match_offset);
891 map_builder.add_rename(rename_offset);
892 map_offsets.emplace_back(map_builder.Finish());
893 }
894
Austin Schuh0de30f32020-12-06 12:44:28 -0800895 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<Map>>>
896 maps_offsets = map_offsets.empty() ? 0 : fbb.CreateVector(map_offsets);
Austin Schuh01b4c352020-09-21 23:09:39 -0700897
Austin Schuh0de30f32020-12-06 12:44:28 -0800898 // And copy everything else over.
899 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<Node>>>
900 nodes_offset = aos::RecursiveCopyVectorTable(base_config->nodes(), &fbb);
901
902 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<Application>>>
903 applications_offset =
904 aos::RecursiveCopyVectorTable(base_config->applications(), &fbb);
905
906 // Now insert everything else in unmodified.
907 ConfigurationBuilder configuration_builder(fbb);
908 if (!channels_offset.IsNull()) {
909 configuration_builder.add_channels(channels_offset);
910 }
911 if (!maps_offsets.IsNull()) {
912 configuration_builder.add_maps(maps_offsets);
913 }
914 if (!nodes_offset.IsNull()) {
915 configuration_builder.add_nodes(nodes_offset);
916 }
917 if (!applications_offset.IsNull()) {
918 configuration_builder.add_applications(applications_offset);
919 }
920
921 if (base_config->has_channel_storage_duration()) {
922 configuration_builder.add_channel_storage_duration(
923 base_config->channel_storage_duration());
924 }
925
926 CHECK_EQ(Configuration::MiniReflectTypeTable()->num_elems, 6u)
927 << ": Merging logic needs to be updated when the number of configuration "
928 "fields changes.";
929
930 fbb.Finish(configuration_builder.Finish());
931
932 // Clean it up and return it! By using MergeConfiguration here, we'll
933 // actually get a deduplicated config for free too.
934 FlatbufferDetachedBuffer<Configuration> new_merged_config =
935 configuration::MergeConfiguration(
936 FlatbufferDetachedBuffer<Configuration>(fbb.Release()));
937
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800938 remapped_configuration_buffer_ =
939 std::make_unique<FlatbufferDetachedBuffer<Configuration>>(
Austin Schuh0de30f32020-12-06 12:44:28 -0800940 configuration::MergeConfiguration(new_merged_config, schemas));
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800941
942 remapped_configuration_ = &remapped_configuration_buffer_->message();
Austin Schuh0de30f32020-12-06 12:44:28 -0800943
944 // TODO(austin): Lazily re-build to save CPU?
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800945}
946
Austin Schuh6f3babe2020-01-26 20:34:50 -0800947const Channel *LogReader::RemapChannel(const EventLoop *event_loop,
948 const Channel *channel) {
949 std::string_view channel_name = channel->name()->string_view();
950 std::string_view channel_type = channel->type()->string_view();
951 const int channel_index =
952 configuration::ChannelIndex(logged_configuration(), channel);
953 // If the channel is remapped, find the correct channel name to use.
954 if (remapped_channels_.count(channel_index) > 0) {
Austin Schuhee711052020-08-24 16:06:09 -0700955 VLOG(3) << "Got remapped channel on "
Austin Schuh6f3babe2020-01-26 20:34:50 -0800956 << configuration::CleanedChannelToString(channel);
Austin Schuh0de30f32020-12-06 12:44:28 -0800957 channel_name = remapped_channels_[channel_index].remapped_name;
Austin Schuh6f3babe2020-01-26 20:34:50 -0800958 }
959
Austin Schuhee711052020-08-24 16:06:09 -0700960 VLOG(2) << "Going to remap channel " << channel_name << " " << channel_type;
Austin Schuh6f3babe2020-01-26 20:34:50 -0800961 const Channel *remapped_channel = configuration::GetChannel(
962 event_loop->configuration(), channel_name, channel_type,
963 event_loop->name(), event_loop->node());
964
965 CHECK(remapped_channel != nullptr)
966 << ": Unable to send {\"name\": \"" << channel_name << "\", \"type\": \""
967 << channel_type << "\"} because it is not in the provided configuration.";
968
969 return remapped_channel;
970}
971
Austin Schuh287d43d2020-12-04 20:19:33 -0800972LogReader::State::State(std::unique_ptr<TimestampMapper> timestamp_mapper)
973 : timestamp_mapper_(std::move(timestamp_mapper)) {}
974
975void LogReader::State::AddPeer(State *peer) {
976 if (timestamp_mapper_ && peer->timestamp_mapper_) {
977 timestamp_mapper_->AddPeer(peer->timestamp_mapper_.get());
978 }
979}
Austin Schuh858c9f32020-08-31 16:56:12 -0700980
981EventLoop *LogReader::State::SetNodeEventLoopFactory(
982 NodeEventLoopFactory *node_event_loop_factory) {
983 node_event_loop_factory_ = node_event_loop_factory;
984 event_loop_unique_ptr_ =
985 node_event_loop_factory_->MakeEventLoop("log_reader");
986 return event_loop_unique_ptr_.get();
987}
988
989void LogReader::State::SetChannelCount(size_t count) {
990 channels_.resize(count);
Austin Schuh8d7e0bb2020-10-02 17:57:00 -0700991 remote_timestamp_senders_.resize(count);
Austin Schuh858c9f32020-08-31 16:56:12 -0700992 filters_.resize(count);
Austin Schuh8d7e0bb2020-10-02 17:57:00 -0700993 channel_source_state_.resize(count);
994 factory_channel_index_.resize(count);
995 queue_index_map_.resize(count);
Austin Schuh858c9f32020-08-31 16:56:12 -0700996}
997
998void LogReader::State::SetChannel(
Austin Schuh8d7e0bb2020-10-02 17:57:00 -0700999 size_t logged_channel_index, size_t factory_channel_index,
1000 std::unique_ptr<RawSender> sender,
Austin Schuh2f8fd752020-09-01 22:38:28 -07001001 message_bridge::NoncausalOffsetEstimator *filter,
Austin Schuh969cd602021-01-03 00:09:45 -08001002 RemoteMessageSender *remote_timestamp_sender, State *source_state) {
Austin Schuh8d7e0bb2020-10-02 17:57:00 -07001003 channels_[logged_channel_index] = std::move(sender);
1004 filters_[logged_channel_index] = filter;
1005 remote_timestamp_senders_[logged_channel_index] = remote_timestamp_sender;
1006
1007 if (source_state) {
1008 channel_source_state_[logged_channel_index] = source_state;
1009
1010 if (remote_timestamp_sender != nullptr) {
1011 source_state->queue_index_map_[logged_channel_index] =
Austin Schuh9942bae2021-01-07 22:06:44 -08001012 std::make_unique<std::vector<State::ContiguousSentTimestamp>>();
Austin Schuh8d7e0bb2020-10-02 17:57:00 -07001013 }
1014 }
1015
1016 factory_channel_index_[logged_channel_index] = factory_channel_index;
1017}
1018
Austin Schuh287d43d2020-12-04 20:19:33 -08001019bool LogReader::State::Send(const TimestampedMessage &timestamped_message) {
1020 aos::RawSender *sender = channels_[timestamped_message.channel_index].get();
Austin Schuh8d7e0bb2020-10-02 17:57:00 -07001021 uint32_t remote_queue_index = 0xffffffff;
1022
Austin Schuh287d43d2020-12-04 20:19:33 -08001023 if (remote_timestamp_senders_[timestamped_message.channel_index] != nullptr) {
Austin Schuh9942bae2021-01-07 22:06:44 -08001024 std::vector<ContiguousSentTimestamp> *queue_index_map = CHECK_NOTNULL(
Austin Schuh287d43d2020-12-04 20:19:33 -08001025 CHECK_NOTNULL(channel_source_state_[timestamped_message.channel_index])
1026 ->queue_index_map_[timestamped_message.channel_index]
1027 .get());
Austin Schuh8d7e0bb2020-10-02 17:57:00 -07001028
Austin Schuh9942bae2021-01-07 22:06:44 -08001029 struct SentTimestamp {
1030 monotonic_clock::time_point monotonic_event_time;
1031 uint32_t queue_index;
1032 } search;
1033
Austin Schuh287d43d2020-12-04 20:19:33 -08001034 search.monotonic_event_time = timestamped_message.monotonic_remote_time;
Austin Schuh287d43d2020-12-04 20:19:33 -08001035 search.queue_index = timestamped_message.remote_queue_index;
Austin Schuh8d7e0bb2020-10-02 17:57:00 -07001036
1037 // Find the sent time if available.
1038 auto element = std::lower_bound(
1039 queue_index_map->begin(), queue_index_map->end(), search,
Austin Schuh9942bae2021-01-07 22:06:44 -08001040 [](ContiguousSentTimestamp a, SentTimestamp b) {
1041 if (a.ending_monotonic_event_time < b.monotonic_event_time) {
Austin Schuh8d7e0bb2020-10-02 17:57:00 -07001042 return true;
1043 }
Austin Schuh9942bae2021-01-07 22:06:44 -08001044 if (a.starting_monotonic_event_time > b.monotonic_event_time) {
Austin Schuh8d7e0bb2020-10-02 17:57:00 -07001045 return false;
1046 }
Austin Schuh9942bae2021-01-07 22:06:44 -08001047
1048 if (a.ending_queue_index < b.queue_index) {
Austin Schuh8d7e0bb2020-10-02 17:57:00 -07001049 return true;
1050 }
Austin Schuh9942bae2021-01-07 22:06:44 -08001051 if (a.starting_queue_index >= b.queue_index) {
1052 return false;
1053 }
Austin Schuh8d7e0bb2020-10-02 17:57:00 -07001054
Austin Schuh9942bae2021-01-07 22:06:44 -08001055 // If it isn't clearly below or above, it is below. Since we return
1056 // the last element <, this will return a match.
Austin Schuh8d7e0bb2020-10-02 17:57:00 -07001057 return false;
1058 });
1059
1060 // TODO(austin): Be a bit more principled here, but we will want to do that
1061 // after the logger rewrite. We hit this when one node finishes, but the
1062 // other node isn't done yet. So there is no send time, but there is a
1063 // receive time.
1064 if (element != queue_index_map->end()) {
Austin Schuh9942bae2021-01-07 22:06:44 -08001065 CHECK_GE(timestamped_message.monotonic_remote_time,
1066 element->starting_monotonic_event_time);
1067 CHECK_LE(timestamped_message.monotonic_remote_time,
1068 element->ending_monotonic_event_time);
1069 CHECK_GE(timestamped_message.remote_queue_index,
1070 element->starting_queue_index);
1071 CHECK_LE(timestamped_message.remote_queue_index,
1072 element->ending_queue_index);
Austin Schuh8d7e0bb2020-10-02 17:57:00 -07001073
Austin Schuh9942bae2021-01-07 22:06:44 -08001074 remote_queue_index = timestamped_message.remote_queue_index +
1075 element->actual_queue_index -
1076 element->starting_queue_index;
1077 } else {
1078 VLOG(1) << "No timestamp match in the map.";
Austin Schuh8d7e0bb2020-10-02 17:57:00 -07001079 }
1080 }
1081
1082 // Send! Use the replayed queue index here instead of the logged queue index
1083 // for the remote queue index. This makes re-logging work.
Austin Schuh287d43d2020-12-04 20:19:33 -08001084 const bool sent = sender->Send(
1085 timestamped_message.data.message().data()->Data(),
1086 timestamped_message.data.message().data()->size(),
1087 timestamped_message.monotonic_remote_time,
1088 timestamped_message.realtime_remote_time, remote_queue_index);
Austin Schuh8d7e0bb2020-10-02 17:57:00 -07001089 if (!sent) return false;
1090
Austin Schuh287d43d2020-12-04 20:19:33 -08001091 if (queue_index_map_[timestamped_message.channel_index]) {
Austin Schuh9942bae2021-01-07 22:06:44 -08001092 if (queue_index_map_[timestamped_message.channel_index]->empty()) {
1093 // Nothing here, start a range with 0 length.
1094 ContiguousSentTimestamp timestamp;
1095 timestamp.starting_monotonic_event_time =
1096 timestamp.ending_monotonic_event_time =
1097 timestamped_message.monotonic_event_time;
1098 timestamp.starting_queue_index = timestamp.ending_queue_index =
1099 timestamped_message.queue_index;
1100 timestamp.actual_queue_index = sender->sent_queue_index();
1101 queue_index_map_[timestamped_message.channel_index]->emplace_back(
1102 timestamp);
1103 } else {
1104 // We've got something. See if the next timestamp is still contiguous. If
1105 // so, grow it.
1106 ContiguousSentTimestamp *back =
1107 &queue_index_map_[timestamped_message.channel_index]->back();
1108 if ((back->starting_queue_index - back->actual_queue_index) ==
1109 (timestamped_message.queue_index - sender->sent_queue_index())) {
1110 back->ending_queue_index = timestamped_message.queue_index;
1111 back->ending_monotonic_event_time =
1112 timestamped_message.monotonic_event_time;
1113 } else {
1114 // Otherwise, make a new one.
1115 ContiguousSentTimestamp timestamp;
1116 timestamp.starting_monotonic_event_time =
1117 timestamp.ending_monotonic_event_time =
1118 timestamped_message.monotonic_event_time;
1119 timestamp.starting_queue_index = timestamp.ending_queue_index =
1120 timestamped_message.queue_index;
1121 timestamp.actual_queue_index = sender->sent_queue_index();
1122 queue_index_map_[timestamped_message.channel_index]->emplace_back(
1123 timestamp);
1124 }
1125 }
1126
1127 // TODO(austin): Should we prune the map? On a many day log, I only saw the
1128 // queue index diverge a couple of elements, which would be a very small
1129 // map.
Austin Schuh287d43d2020-12-04 20:19:33 -08001130 } else if (remote_timestamp_senders_[timestamped_message.channel_index] !=
1131 nullptr) {
Austin Schuh969cd602021-01-03 00:09:45 -08001132 flatbuffers::FlatBufferBuilder fbb;
1133 fbb.ForceDefaults(true);
Austin Schuh315b96b2020-12-11 21:21:12 -08001134 flatbuffers::Offset<flatbuffers::String> boot_uuid_offset =
Austin Schuh969cd602021-01-03 00:09:45 -08001135 fbb.CreateString(event_loop_->boot_uuid().string_view());
Austin Schuh315b96b2020-12-11 21:21:12 -08001136
Austin Schuh969cd602021-01-03 00:09:45 -08001137 RemoteMessage::Builder message_header_builder(fbb);
Austin Schuh8d7e0bb2020-10-02 17:57:00 -07001138
1139 message_header_builder.add_channel_index(
Austin Schuh287d43d2020-12-04 20:19:33 -08001140 factory_channel_index_[timestamped_message.channel_index]);
Austin Schuh8d7e0bb2020-10-02 17:57:00 -07001141
1142 // Swap the remote and sent metrics. They are from the sender's
1143 // perspective, not the receiver's perspective.
1144 message_header_builder.add_monotonic_sent_time(
1145 sender->monotonic_sent_time().time_since_epoch().count());
1146 message_header_builder.add_realtime_sent_time(
1147 sender->realtime_sent_time().time_since_epoch().count());
1148 message_header_builder.add_queue_index(sender->sent_queue_index());
1149
1150 message_header_builder.add_monotonic_remote_time(
Austin Schuh287d43d2020-12-04 20:19:33 -08001151 timestamped_message.monotonic_remote_time.time_since_epoch().count());
Austin Schuh8d7e0bb2020-10-02 17:57:00 -07001152 message_header_builder.add_realtime_remote_time(
Austin Schuh287d43d2020-12-04 20:19:33 -08001153 timestamped_message.realtime_remote_time.time_since_epoch().count());
Austin Schuh8d7e0bb2020-10-02 17:57:00 -07001154
1155 message_header_builder.add_remote_queue_index(remote_queue_index);
Austin Schuh315b96b2020-12-11 21:21:12 -08001156 message_header_builder.add_boot_uuid(boot_uuid_offset);
Austin Schuh8d7e0bb2020-10-02 17:57:00 -07001157
Austin Schuh969cd602021-01-03 00:09:45 -08001158 fbb.Finish(message_header_builder.Finish());
1159
1160 remote_timestamp_senders_[timestamped_message.channel_index]->Send(
1161 FlatbufferDetachedBuffer<RemoteMessage>(fbb.Release()),
1162 timestamped_message.monotonic_timestamp_time);
Austin Schuh8d7e0bb2020-10-02 17:57:00 -07001163 }
1164
1165 return true;
1166}
1167
Austin Schuh969cd602021-01-03 00:09:45 -08001168LogReader::RemoteMessageSender::RemoteMessageSender(
1169 aos::Sender<message_bridge::RemoteMessage> sender, EventLoop *event_loop)
1170 : event_loop_(event_loop),
1171 sender_(std::move(sender)),
1172 timer_(event_loop->AddTimer([this]() { SendTimestamp(); })) {}
1173
1174void LogReader::RemoteMessageSender::ScheduleTimestamp() {
1175 if (remote_timestamps_.empty()) {
1176 CHECK_NOTNULL(timer_);
1177 timer_->Disable();
1178 scheduled_time_ = monotonic_clock::min_time;
1179 return;
1180 }
1181
1182 if (scheduled_time_ != remote_timestamps_.front().monotonic_timestamp_time) {
1183 CHECK_NOTNULL(timer_);
Austin Schuh816e5d62021-01-05 23:42:20 -08001184 timer_->Setup(remote_timestamps_.front().monotonic_timestamp_time);
Austin Schuh969cd602021-01-03 00:09:45 -08001185 scheduled_time_ = remote_timestamps_.front().monotonic_timestamp_time;
Austin Schuh3d94be02021-02-12 23:15:20 -08001186 CHECK_GE(scheduled_time_, event_loop_->monotonic_now())
1187 << event_loop_->node()->name()->string_view();
Austin Schuh969cd602021-01-03 00:09:45 -08001188 }
1189}
1190
1191void LogReader::RemoteMessageSender::Send(
1192 FlatbufferDetachedBuffer<RemoteMessage> remote_message,
1193 monotonic_clock::time_point monotonic_timestamp_time) {
1194 // There are 2 cases. Either we have a monotonic_timestamp_time and need to
1195 // resend the timestamp at the correct time, or we don't and can send it
1196 // immediately.
1197 if (monotonic_timestamp_time == monotonic_clock::min_time) {
1198 CHECK(remote_timestamps_.empty())
1199 << ": Unsupported mix of timestamps and no timestamps.";
1200 sender_.Send(std::move(remote_message));
1201 } else {
Austin Schuhb22ae422021-01-31 17:57:06 -08001202 remote_timestamps_.emplace(
1203 std::upper_bound(
1204 remote_timestamps_.begin(), remote_timestamps_.end(),
1205 monotonic_timestamp_time,
1206 [](const aos::monotonic_clock::time_point monotonic_timestamp_time,
1207 const Timestamp &timestamp) {
1208 return monotonic_timestamp_time <
1209 timestamp.monotonic_timestamp_time;
1210 }),
1211 std::move(remote_message), monotonic_timestamp_time);
Austin Schuh969cd602021-01-03 00:09:45 -08001212 ScheduleTimestamp();
1213 }
1214}
1215
1216void LogReader::RemoteMessageSender::SendTimestamp() {
Austin Schuh3d94be02021-02-12 23:15:20 -08001217 CHECK_EQ(event_loop_->context().monotonic_event_time, scheduled_time_)
1218 << event_loop_->node()->name()->string_view();
Austin Schuh969cd602021-01-03 00:09:45 -08001219 CHECK(!remote_timestamps_.empty());
1220
1221 // Send out all timestamps at the currently scheduled time.
1222 while (remote_timestamps_.front().monotonic_timestamp_time ==
1223 scheduled_time_) {
1224 sender_.Send(std::move(remote_timestamps_.front().remote_message));
1225 remote_timestamps_.pop_front();
1226 if (remote_timestamps_.empty()) {
1227 break;
1228 }
1229 }
1230 scheduled_time_ = monotonic_clock::min_time;
1231
1232 ScheduleTimestamp();
1233}
1234
1235LogReader::RemoteMessageSender *LogReader::State::RemoteTimestampSender(
Austin Schuh8d7e0bb2020-10-02 17:57:00 -07001236 const Node *delivered_node) {
1237 auto sender = remote_timestamp_senders_map_.find(delivered_node);
1238
1239 if (sender == remote_timestamp_senders_map_.end()) {
Austin Schuh969cd602021-01-03 00:09:45 -08001240 sender =
1241 remote_timestamp_senders_map_
1242 .emplace(delivered_node,
1243 std::make_unique<RemoteMessageSender>(
1244 event_loop()->MakeSender<RemoteMessage>(absl::StrCat(
1245 "/aos/remote_timestamps/",
1246 delivered_node->name()->string_view())),
1247 event_loop()))
1248 .first;
Austin Schuh8d7e0bb2020-10-02 17:57:00 -07001249 }
1250
Austin Schuh969cd602021-01-03 00:09:45 -08001251 return sender->second.get();
Austin Schuh858c9f32020-08-31 16:56:12 -07001252}
1253
Austin Schuhdda74ec2021-01-03 19:30:37 -08001254TimestampedMessage LogReader::State::PopOldest() {
Austin Schuhe639ea12021-01-25 13:00:22 -08001255 CHECK(timestamp_mapper_ != nullptr);
1256 TimestampedMessage *result_ptr = timestamp_mapper_->Front();
1257 CHECK(result_ptr != nullptr);
Austin Schuh858c9f32020-08-31 16:56:12 -07001258
Austin Schuhe639ea12021-01-25 13:00:22 -08001259 TimestampedMessage result = std::move(*result_ptr);
1260
Austin Schuh2f8fd752020-09-01 22:38:28 -07001261 VLOG(2) << MaybeNodeName(event_loop_->node()) << "PopOldest Popping "
Austin Schuhe639ea12021-01-25 13:00:22 -08001262 << result.monotonic_event_time;
1263 timestamp_mapper_->PopFront();
Austin Schuh858c9f32020-08-31 16:56:12 -07001264 SeedSortedMessages();
1265
Austin Schuhe639ea12021-01-25 13:00:22 -08001266 if (result.monotonic_remote_time != monotonic_clock::min_time) {
1267 message_bridge::NoncausalOffsetEstimator *filter =
1268 filters_[result.channel_index];
1269 CHECK(filter != nullptr);
1270
1271 // TODO(austin): We probably want to push this down into the timestamp
1272 // mapper directly.
Austin Schuh3d94be02021-02-12 23:15:20 -08001273 filter->Pop(event_loop_->node(), event_loop_->monotonic_now());
Austin Schuh2f8fd752020-09-01 22:38:28 -07001274 }
Austin Schuh5ee56872021-01-30 16:53:34 -08001275 VLOG(1) << "Popped " << result
1276 << configuration::CleanedChannelToString(
1277 event_loop_->configuration()->channels()->Get(
1278 factory_channel_index_[result.channel_index]));
Austin Schuhe639ea12021-01-25 13:00:22 -08001279 return result;
Austin Schuh858c9f32020-08-31 16:56:12 -07001280}
1281
1282monotonic_clock::time_point LogReader::State::OldestMessageTime() const {
Austin Schuhe639ea12021-01-25 13:00:22 -08001283 if (timestamp_mapper_ == nullptr) {
Austin Schuh287d43d2020-12-04 20:19:33 -08001284 return monotonic_clock::max_time;
1285 }
Austin Schuhe639ea12021-01-25 13:00:22 -08001286 TimestampedMessage *result_ptr = timestamp_mapper_->Front();
1287 if (result_ptr == nullptr) {
1288 return monotonic_clock::max_time;
1289 }
1290 VLOG(2) << MaybeNodeName(event_loop_->node()) << "oldest message at "
1291 << result_ptr->monotonic_event_time;
1292 return result_ptr->monotonic_event_time;
Austin Schuh858c9f32020-08-31 16:56:12 -07001293}
1294
1295void LogReader::State::SeedSortedMessages() {
Austin Schuh287d43d2020-12-04 20:19:33 -08001296 if (!timestamp_mapper_) return;
Austin Schuh858c9f32020-08-31 16:56:12 -07001297
Austin Schuhe639ea12021-01-25 13:00:22 -08001298 timestamp_mapper_->QueueFor(chrono::duration_cast<chrono::seconds>(
1299 chrono::duration<double>(FLAGS_time_estimation_buffer_seconds)));
Austin Schuh858c9f32020-08-31 16:56:12 -07001300}
1301
1302void LogReader::State::Deregister() {
1303 for (size_t i = 0; i < channels_.size(); ++i) {
1304 channels_[i].reset();
1305 }
Austin Schuh8d7e0bb2020-10-02 17:57:00 -07001306 remote_timestamp_senders_map_.clear();
Austin Schuh858c9f32020-08-31 16:56:12 -07001307 event_loop_unique_ptr_.reset();
1308 event_loop_ = nullptr;
1309 timer_handler_ = nullptr;
1310 node_event_loop_factory_ = nullptr;
1311}
1312
Austin Schuhe309d2a2019-11-29 13:25:21 -08001313} // namespace logger
1314} // namespace aos