blob: 242638c35c6d4d1df7148b89d3833e7f4f6f15a5 [file] [log] [blame]
James Kuszmaul38735e82019-12-07 16:42:06 -08001#include "aos/events/logging/logger.h"
Austin Schuhe309d2a2019-11-29 13:25:21 -08002
3#include <fcntl.h>
Austin Schuh4c4e0092019-12-22 16:18:03 -08004#include <limits.h>
Austin Schuhe309d2a2019-11-29 13:25:21 -08005#include <sys/stat.h>
6#include <sys/types.h>
7#include <sys/uio.h>
8#include <vector>
9
Austin Schuh8bd96322020-02-13 21:18:22 -080010#include "Eigen/Dense"
Austin Schuhe309d2a2019-11-29 13:25:21 -080011#include "absl/types/span.h"
12#include "aos/events/event_loop.h"
James Kuszmaul38735e82019-12-07 16:42:06 -080013#include "aos/events/logging/logger_generated.h"
Austin Schuhe309d2a2019-11-29 13:25:21 -080014#include "aos/flatbuffer_merge.h"
Austin Schuh288479d2019-12-18 19:47:52 -080015#include "aos/network/team_number.h"
Austin Schuhe309d2a2019-11-29 13:25:21 -080016#include "aos/time/time.h"
17#include "flatbuffers/flatbuffers.h"
18
Austin Schuh15649d62019-12-28 16:36:38 -080019DEFINE_bool(skip_missing_forwarding_entries, false,
20 "If true, drop any forwarding entries with missing data. If "
21 "false, CHECK.");
Austin Schuhe309d2a2019-11-29 13:25:21 -080022
Austin Schuh8bd96322020-02-13 21:18:22 -080023DEFINE_bool(timestamps_to_csv, false,
24 "If true, write all the time synchronization information to a set "
25 "of CSV files in /tmp/. This should only be needed when debugging "
26 "time synchronization.");
27
Austin Schuhe309d2a2019-11-29 13:25:21 -080028namespace aos {
29namespace logger {
30
31namespace chrono = std::chrono;
32
Austin Schuhe309d2a2019-11-29 13:25:21 -080033Logger::Logger(DetachedBufferWriter *writer, EventLoop *event_loop,
34 std::chrono::milliseconds polling_period)
Austin Schuh6f3babe2020-01-26 20:34:50 -080035 : Logger(std::make_unique<LocalLogNamer>(writer, event_loop->node()),
36 event_loop, polling_period) {}
37
38Logger::Logger(std::unique_ptr<LogNamer> log_namer, EventLoop *event_loop,
39 std::chrono::milliseconds polling_period)
Austin Schuhe309d2a2019-11-29 13:25:21 -080040 : event_loop_(event_loop),
Austin Schuh6f3babe2020-01-26 20:34:50 -080041 log_namer_(std::move(log_namer)),
Austin Schuhe309d2a2019-11-29 13:25:21 -080042 timer_handler_(event_loop_->AddTimer([this]() { DoLogData(); })),
43 polling_period_(polling_period) {
Austin Schuh6f3babe2020-01-26 20:34:50 -080044 VLOG(1) << "Starting logger for " << FlatbufferToJson(event_loop_->node());
45 int channel_index = 0;
Austin Schuhe309d2a2019-11-29 13:25:21 -080046 for (const Channel *channel : *event_loop_->configuration()->channels()) {
47 FetcherStruct fs;
Austin Schuh6f3babe2020-01-26 20:34:50 -080048 const bool is_local =
49 configuration::ChannelIsSendableOnNode(channel, event_loop_->node());
50
Austin Schuh15649d62019-12-28 16:36:38 -080051 const bool is_readable =
52 configuration::ChannelIsReadableOnNode(channel, event_loop_->node());
53 const bool log_message = configuration::ChannelMessageIsLoggedOnNode(
54 channel, event_loop_->node()) &&
55 is_readable;
56
57 const bool log_delivery_times =
58 (event_loop_->node() == nullptr)
59 ? false
60 : configuration::ConnectionDeliveryTimeIsLoggedOnNode(
61 channel, event_loop_->node(), event_loop_->node());
62
63 if (log_message || log_delivery_times) {
64 fs.fetcher = event_loop->MakeRawFetcher(channel);
65 VLOG(1) << "Logging channel "
66 << configuration::CleanedChannelToString(channel);
67
68 if (log_delivery_times) {
Austin Schuh6f3babe2020-01-26 20:34:50 -080069 VLOG(1) << " Delivery times";
70 fs.timestamp_writer = log_namer_->MakeTimestampWriter(channel);
Austin Schuh15649d62019-12-28 16:36:38 -080071 }
Austin Schuh6f3babe2020-01-26 20:34:50 -080072 if (log_message) {
73 VLOG(1) << " Data";
74 fs.writer = log_namer_->MakeWriter(channel);
75 if (!is_local) {
76 fs.log_type = LogType::kLogRemoteMessage;
77 }
78 }
79 fs.channel_index = channel_index;
80 fs.written = false;
81 fetchers_.emplace_back(std::move(fs));
Austin Schuh15649d62019-12-28 16:36:38 -080082 }
Austin Schuh6f3babe2020-01-26 20:34:50 -080083 ++channel_index;
Austin Schuhe309d2a2019-11-29 13:25:21 -080084 }
85
86 // When things start, we want to log the header, then the most recent messages
87 // available on each fetcher to capture the previous state, then start
88 // polling.
89 event_loop_->OnRun([this, polling_period]() {
90 // Grab data from each channel right before we declare the log file started
91 // so we can capture the latest message on each channel. This lets us have
92 // non periodic messages with configuration that now get logged.
93 for (FetcherStruct &f : fetchers_) {
Austin Schuh6f3babe2020-01-26 20:34:50 -080094 f.written = !f.fetcher->Fetch();
Austin Schuhe309d2a2019-11-29 13:25:21 -080095 }
96
97 // We need to pick a point in time to declare the log file "started". This
98 // starts here. It needs to be after everything is fetched so that the
99 // fetchers are all pointed at the most recent message before the start
100 // time.
Austin Schuhfa895892020-01-07 20:07:41 -0800101 monotonic_start_time_ = event_loop_->monotonic_now();
102 realtime_start_time_ = event_loop_->realtime_now();
103 last_synchronized_time_ = monotonic_start_time_;
Austin Schuhe309d2a2019-11-29 13:25:21 -0800104
Austin Schuhcde938c2020-02-02 17:30:07 -0800105 LOG(INFO) << "Logging node as " << FlatbufferToJson(event_loop_->node())
106 << " start_time " << monotonic_start_time_;
Austin Schuhe309d2a2019-11-29 13:25:21 -0800107
Austin Schuhfa895892020-01-07 20:07:41 -0800108 WriteHeader();
Austin Schuhe309d2a2019-11-29 13:25:21 -0800109
110 timer_handler_->Setup(event_loop_->monotonic_now() + polling_period,
111 polling_period);
112 });
113}
114
Austin Schuhcde938c2020-02-02 17:30:07 -0800115// TODO(austin): Set the remote start time to the first time we see a remote
116// message when we are logging those messages separate? Need to signal what to
117// do, or how to get a good timestamp.
Austin Schuhfa895892020-01-07 20:07:41 -0800118void Logger::WriteHeader() {
Austin Schuh6f3babe2020-01-26 20:34:50 -0800119 for (const Node *node : log_namer_->nodes()) {
120 WriteHeader(node);
121 }
122}
Austin Schuh8bd96322020-02-13 21:18:22 -0800123
Austin Schuh6f3babe2020-01-26 20:34:50 -0800124void Logger::WriteHeader(const Node *node) {
Austin Schuhfa895892020-01-07 20:07:41 -0800125 // Now write the header with this timestamp in it.
126 flatbuffers::FlatBufferBuilder fbb;
Austin Schuhd7b15da2020-02-17 15:06:11 -0800127 fbb.ForceDefaults(true);
Austin Schuhfa895892020-01-07 20:07:41 -0800128
129 flatbuffers::Offset<aos::Configuration> configuration_offset =
130 CopyFlatBuffer(event_loop_->configuration(), &fbb);
131
132 flatbuffers::Offset<flatbuffers::String> string_offset =
133 fbb.CreateString(network::GetHostname());
134
135 flatbuffers::Offset<Node> node_offset;
136 if (event_loop_->node() != nullptr) {
Austin Schuh6f3babe2020-01-26 20:34:50 -0800137 node_offset = CopyFlatBuffer(node, &fbb);
Austin Schuhfa895892020-01-07 20:07:41 -0800138 }
139
140 aos::logger::LogFileHeader::Builder log_file_header_builder(fbb);
141
142 log_file_header_builder.add_name(string_offset);
143
144 // Only add the node if we are running in a multinode configuration.
Austin Schuh6f3babe2020-01-26 20:34:50 -0800145 if (node != nullptr) {
Austin Schuhfa895892020-01-07 20:07:41 -0800146 log_file_header_builder.add_node(node_offset);
147 }
148
149 log_file_header_builder.add_configuration(configuration_offset);
150 // The worst case theoretical out of order is the polling period times 2.
151 // One message could get logged right after the boundary, but be for right
152 // before the next boundary. And the reverse could happen for another
153 // message. Report back 3x to be extra safe, and because the cost isn't
154 // huge on the read side.
155 log_file_header_builder.add_max_out_of_order_duration(
156 std::chrono::duration_cast<std::chrono::nanoseconds>(3 * polling_period_)
157 .count());
158
159 log_file_header_builder.add_monotonic_start_time(
160 std::chrono::duration_cast<std::chrono::nanoseconds>(
161 monotonic_start_time_.time_since_epoch())
162 .count());
163 log_file_header_builder.add_realtime_start_time(
164 std::chrono::duration_cast<std::chrono::nanoseconds>(
165 realtime_start_time_.time_since_epoch())
166 .count());
167
168 fbb.FinishSizePrefixed(log_file_header_builder.Finish());
Austin Schuh6f3babe2020-01-26 20:34:50 -0800169 log_namer_->WriteHeader(&fbb, node);
Austin Schuhfa895892020-01-07 20:07:41 -0800170}
171
172void Logger::Rotate(DetachedBufferWriter *writer) {
Austin Schuh6f3babe2020-01-26 20:34:50 -0800173 Rotate(std::make_unique<LocalLogNamer>(writer, event_loop_->node()));
174}
175
176void Logger::Rotate(std::unique_ptr<LogNamer> log_namer) {
Austin Schuhfa895892020-01-07 20:07:41 -0800177 // Force data up until now to be written.
178 DoLogData();
179
180 // Swap the writer out, and re-write the header.
Austin Schuh6f3babe2020-01-26 20:34:50 -0800181 log_namer_ = std::move(log_namer);
182
183 // And then update the writers.
184 for (FetcherStruct &f : fetchers_) {
185 const Channel *channel =
186 event_loop_->configuration()->channels()->Get(f.channel_index);
187 if (f.timestamp_writer != nullptr) {
188 f.timestamp_writer = log_namer_->MakeTimestampWriter(channel);
189 }
190 if (f.writer != nullptr) {
191 f.writer = log_namer_->MakeWriter(channel);
192 }
193 }
194
Austin Schuhfa895892020-01-07 20:07:41 -0800195 WriteHeader();
196}
197
Austin Schuhe309d2a2019-11-29 13:25:21 -0800198void Logger::DoLogData() {
199 // We want to guarentee that messages aren't out of order by more than
200 // max_out_of_order_duration. To do this, we need sync points. Every write
201 // cycle should be a sync point.
Austin Schuhfa895892020-01-07 20:07:41 -0800202 const monotonic_clock::time_point monotonic_now =
203 event_loop_->monotonic_now();
Austin Schuhe309d2a2019-11-29 13:25:21 -0800204
205 do {
206 // Move the sync point up by at most polling_period. This forces one sync
207 // per iteration, even if it is small.
208 last_synchronized_time_ =
209 std::min(last_synchronized_time_ + polling_period_, monotonic_now);
Austin Schuhe309d2a2019-11-29 13:25:21 -0800210 // Write each channel to disk, one at a time.
211 for (FetcherStruct &f : fetchers_) {
Austin Schuh6f3babe2020-01-26 20:34:50 -0800212 while (true) {
213 if (f.written) {
214 if (!f.fetcher->FetchNext()) {
215 VLOG(2) << "No new data on "
216 << configuration::CleanedChannelToString(
217 f.fetcher->channel());
218 break;
219 } else {
220 f.written = false;
Austin Schuhe309d2a2019-11-29 13:25:21 -0800221 }
Austin Schuh6f3babe2020-01-26 20:34:50 -0800222 }
Austin Schuhe309d2a2019-11-29 13:25:21 -0800223
Austin Schuh6f3babe2020-01-26 20:34:50 -0800224 CHECK(!f.written);
Austin Schuh15649d62019-12-28 16:36:38 -0800225
Austin Schuh6f3babe2020-01-26 20:34:50 -0800226 // TODO(james): Write tests to exercise this logic.
227 if (f.fetcher->context().monotonic_event_time <
228 last_synchronized_time_) {
229 if (f.writer != nullptr) {
Austin Schuh15649d62019-12-28 16:36:38 -0800230 // Write!
231 flatbuffers::FlatBufferBuilder fbb(f.fetcher->context().size +
232 max_header_size_);
Austin Schuhd7b15da2020-02-17 15:06:11 -0800233 fbb.ForceDefaults(true);
Austin Schuh15649d62019-12-28 16:36:38 -0800234
235 fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
Austin Schuh6f3babe2020-01-26 20:34:50 -0800236 f.channel_index, f.log_type));
Austin Schuh15649d62019-12-28 16:36:38 -0800237
Austin Schuhcde938c2020-02-02 17:30:07 -0800238 VLOG(2) << "Writing data as node "
Austin Schuh6f3babe2020-01-26 20:34:50 -0800239 << FlatbufferToJson(event_loop_->node()) << " for channel "
Austin Schuh15649d62019-12-28 16:36:38 -0800240 << configuration::CleanedChannelToString(
Austin Schuh6f3babe2020-01-26 20:34:50 -0800241 f.fetcher->channel())
Austin Schuhcde938c2020-02-02 17:30:07 -0800242 << " to " << f.writer->filename() << " data "
Austin Schuh6f3babe2020-01-26 20:34:50 -0800243 << FlatbufferToJson(
244 flatbuffers::GetSizePrefixedRoot<MessageHeader>(
245 fbb.GetBufferPointer()));
Austin Schuh15649d62019-12-28 16:36:38 -0800246
247 max_header_size_ = std::max(
248 max_header_size_, fbb.GetSize() - f.fetcher->context().size);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800249 f.writer->QueueSizedFlatbuffer(&fbb);
Austin Schuhe309d2a2019-11-29 13:25:21 -0800250 }
Austin Schuh6f3babe2020-01-26 20:34:50 -0800251
252 if (f.timestamp_writer != nullptr) {
253 // And now handle timestamps.
254 flatbuffers::FlatBufferBuilder fbb;
Austin Schuhd7b15da2020-02-17 15:06:11 -0800255 fbb.ForceDefaults(true);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800256
257 fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
258 f.channel_index,
259 LogType::kLogDeliveryTimeOnly));
260
Austin Schuhcde938c2020-02-02 17:30:07 -0800261 VLOG(2) << "Writing timestamps as node "
Austin Schuh6f3babe2020-01-26 20:34:50 -0800262 << FlatbufferToJson(event_loop_->node()) << " for channel "
263 << configuration::CleanedChannelToString(
264 f.fetcher->channel())
Austin Schuhcde938c2020-02-02 17:30:07 -0800265 << " to " << f.timestamp_writer->filename() << " timestamp "
Austin Schuh6f3babe2020-01-26 20:34:50 -0800266 << FlatbufferToJson(
267 flatbuffers::GetSizePrefixedRoot<MessageHeader>(
268 fbb.GetBufferPointer()));
269
270 f.timestamp_writer->QueueSizedFlatbuffer(&fbb);
271 }
272
273 f.written = true;
274 } else {
275 break;
Austin Schuhe309d2a2019-11-29 13:25:21 -0800276 }
Austin Schuhe309d2a2019-11-29 13:25:21 -0800277 }
Austin Schuhe309d2a2019-11-29 13:25:21 -0800278 }
279
Austin Schuhe309d2a2019-11-29 13:25:21 -0800280 // If we missed cycles, we could be pretty far behind. Spin until we are
281 // caught up.
282 } while (last_synchronized_time_ + polling_period_ < monotonic_now);
Austin Schuhe309d2a2019-11-29 13:25:21 -0800283}
284
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800285LogReader::LogReader(std::string_view filename,
286 const Configuration *replay_configuration)
Austin Schuhfa895892020-01-07 20:07:41 -0800287 : LogReader(std::vector<std::string>{std::string(filename)},
288 replay_configuration) {}
289
290LogReader::LogReader(const std::vector<std::string> &filenames,
291 const Configuration *replay_configuration)
Austin Schuh6f3babe2020-01-26 20:34:50 -0800292 : LogReader(std::vector<std::vector<std::string>>{filenames},
293 replay_configuration) {}
294
295LogReader::LogReader(const std::vector<std::vector<std::string>> &filenames,
296 const Configuration *replay_configuration)
297 : filenames_(filenames),
298 log_file_header_(ReadHeader(filenames[0][0])),
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800299 replay_configuration_(replay_configuration) {
Austin Schuh6331ef92020-01-07 18:28:09 -0800300 MakeRemappedConfig();
Austin Schuh6f3babe2020-01-26 20:34:50 -0800301
Austin Schuh6aa77be2020-02-22 21:06:40 -0800302 if (replay_configuration) {
303 CHECK_EQ(configuration::MultiNode(configuration()),
304 configuration::MultiNode(replay_configuration))
305 << ": Log file and replay config need to both be multi or single node.";
306 }
307
Austin Schuh6f3babe2020-01-26 20:34:50 -0800308 if (!configuration::MultiNode(configuration())) {
Austin Schuh858c9f32020-08-31 16:56:12 -0700309 states_.emplace_back(
310 std::make_unique<State>(std::make_unique<ChannelMerger>(filenames)));
Austin Schuh8bd96322020-02-13 21:18:22 -0800311 } else {
Austin Schuh6aa77be2020-02-22 21:06:40 -0800312 if (replay_configuration) {
James Kuszmaul46d82582020-05-09 19:50:09 -0700313 CHECK_EQ(logged_configuration()->nodes()->size(),
Austin Schuh6aa77be2020-02-22 21:06:40 -0800314 replay_configuration->nodes()->size())
315 << ": Log file and replay config need to have matching nodes lists.";
James Kuszmaul46d82582020-05-09 19:50:09 -0700316 for (const Node *node : *logged_configuration()->nodes()) {
317 if (configuration::GetNode(replay_configuration, node) == nullptr) {
318 LOG(FATAL)
319 << "Found node " << FlatbufferToJson(node)
320 << " in logged config that is not present in the replay config.";
321 }
322 }
Austin Schuh6aa77be2020-02-22 21:06:40 -0800323 }
Austin Schuh8bd96322020-02-13 21:18:22 -0800324 states_.resize(configuration()->nodes()->size());
Austin Schuh6f3babe2020-01-26 20:34:50 -0800325 }
Austin Schuhe309d2a2019-11-29 13:25:21 -0800326}
327
Austin Schuh6aa77be2020-02-22 21:06:40 -0800328LogReader::~LogReader() {
Austin Schuh39580f12020-08-01 14:44:08 -0700329 if (event_loop_factory_unique_ptr_) {
330 Deregister();
331 } else if (event_loop_factory_ != nullptr) {
332 LOG(FATAL) << "Must call Deregister before the SimulatedEventLoopFactory "
333 "is destroyed";
334 }
Austin Schuh8bd96322020-02-13 21:18:22 -0800335 if (offset_fp_ != nullptr) {
336 fclose(offset_fp_);
337 }
Austin Schuh39580f12020-08-01 14:44:08 -0700338 // Zero out some buffers. It's easy to do use-after-frees on these, so make it
339 // more obvious.
340 if (remapped_configuration_buffer_) {
341 remapped_configuration_buffer_->Wipe();
342 }
343 log_file_header_.Wipe();
Austin Schuh8bd96322020-02-13 21:18:22 -0800344}
Austin Schuhe309d2a2019-11-29 13:25:21 -0800345
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800346const Configuration *LogReader::logged_configuration() const {
Austin Schuh6f3babe2020-01-26 20:34:50 -0800347 return log_file_header_.message().configuration();
Austin Schuhe309d2a2019-11-29 13:25:21 -0800348}
349
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800350const Configuration *LogReader::configuration() const {
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800351 return remapped_configuration_;
352}
353
Austin Schuh6f3babe2020-01-26 20:34:50 -0800354std::vector<const Node *> LogReader::Nodes() const {
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800355 // Because the Node pointer will only be valid if it actually points to memory
356 // owned by remapped_configuration_, we need to wait for the
357 // remapped_configuration_ to be populated before accessing it.
Austin Schuh6f3babe2020-01-26 20:34:50 -0800358 //
359 // Also, note, that when ever a map is changed, the nodes in here are
360 // invalidated.
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800361 CHECK(remapped_configuration_ != nullptr)
362 << ": Need to call Register before the node() pointer will be valid.";
Austin Schuh6f3babe2020-01-26 20:34:50 -0800363 return configuration::GetNodes(remapped_configuration_);
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800364}
Austin Schuh15649d62019-12-28 16:36:38 -0800365
Austin Schuh6f3babe2020-01-26 20:34:50 -0800366monotonic_clock::time_point LogReader::monotonic_start_time(const Node *node) {
Austin Schuh8bd96322020-02-13 21:18:22 -0800367 State *state =
368 states_[configuration::GetNodeIndex(configuration(), node)].get();
369 CHECK(state != nullptr) << ": Unknown node " << FlatbufferToJson(node);
370
Austin Schuh858c9f32020-08-31 16:56:12 -0700371 return state->monotonic_start_time();
Austin Schuhe309d2a2019-11-29 13:25:21 -0800372}
373
Austin Schuh6f3babe2020-01-26 20:34:50 -0800374realtime_clock::time_point LogReader::realtime_start_time(const Node *node) {
Austin Schuh8bd96322020-02-13 21:18:22 -0800375 State *state =
376 states_[configuration::GetNodeIndex(configuration(), node)].get();
377 CHECK(state != nullptr) << ": Unknown node " << FlatbufferToJson(node);
378
Austin Schuh858c9f32020-08-31 16:56:12 -0700379 return state->realtime_start_time();
Austin Schuhe309d2a2019-11-29 13:25:21 -0800380}
381
James Kuszmaul84ff3e52020-01-03 19:48:53 -0800382void LogReader::Register() {
383 event_loop_factory_unique_ptr_ =
Austin Schuhac0771c2020-01-07 18:36:30 -0800384 std::make_unique<SimulatedEventLoopFactory>(configuration());
James Kuszmaul84ff3e52020-01-03 19:48:53 -0800385 Register(event_loop_factory_unique_ptr_.get());
386}
387
Austin Schuh92547522019-12-28 14:33:43 -0800388void LogReader::Register(SimulatedEventLoopFactory *event_loop_factory) {
Austin Schuh92547522019-12-28 14:33:43 -0800389 event_loop_factory_ = event_loop_factory;
Austin Schuh92547522019-12-28 14:33:43 -0800390
Austin Schuh6f3babe2020-01-26 20:34:50 -0800391 for (const Node *node : configuration::GetNodes(configuration())) {
Austin Schuh8bd96322020-02-13 21:18:22 -0800392 const size_t node_index =
393 configuration::GetNodeIndex(configuration(), node);
Austin Schuh858c9f32020-08-31 16:56:12 -0700394 states_[node_index] =
395 std::make_unique<State>(std::make_unique<ChannelMerger>(filenames_));
Austin Schuh8bd96322020-02-13 21:18:22 -0800396 State *state = states_[node_index].get();
Austin Schuh6f3babe2020-01-26 20:34:50 -0800397
Austin Schuh858c9f32020-08-31 16:56:12 -0700398 Register(state->SetNodeEventLoopFactory(
399 event_loop_factory_->GetNodeEventLoopFactory(node)));
Austin Schuhcde938c2020-02-02 17:30:07 -0800400 }
James Kuszmaul46d82582020-05-09 19:50:09 -0700401 if (live_nodes_ == 0) {
402 LOG(FATAL)
403 << "Don't have logs from any of the nodes in the replay config--are "
404 "you sure that the replay config matches the original config?";
405 }
Austin Schuh6f3babe2020-01-26 20:34:50 -0800406
Austin Schuh8bd96322020-02-13 21:18:22 -0800407 // We need to now seed our per-node time offsets and get everything set up to
408 // run.
409 const size_t num_nodes = !configuration::MultiNode(logged_configuration())
410 ? 1u
411 : logged_configuration()->nodes()->size();
Austin Schuhcde938c2020-02-02 17:30:07 -0800412
Austin Schuh8bd96322020-02-13 21:18:22 -0800413 // It is easiest to solve for per node offsets with a matrix rather than
414 // trying to solve the equations by hand. So let's get after it.
415 //
416 // Now, build up the map matrix.
417 //
418 // sample_matrix_ = map_matrix_ * offset_matrix_
419 map_matrix_ = Eigen::MatrixXd::Zero(filters_.size() + 1, num_nodes);
Austin Schuhcde938c2020-02-02 17:30:07 -0800420
Austin Schuh8bd96322020-02-13 21:18:22 -0800421 sample_matrix_ = Eigen::VectorXd::Zero(filters_.size() + 1);
422 offset_matrix_ = Eigen::VectorXd::Zero(num_nodes);
Austin Schuhcde938c2020-02-02 17:30:07 -0800423
Austin Schuh8bd96322020-02-13 21:18:22 -0800424 // And the base offset matrix, which will be a copy of the initial offset
425 // matrix.
426 base_offset_matrix_ =
427 Eigen::Matrix<std::chrono::nanoseconds, Eigen::Dynamic, 1>::Zero(
428 num_nodes);
429
430 // All offsets should sum to 0. Add that as the first constraint in our least
431 // squares.
432 map_matrix_.row(0).setOnes();
433
434 {
435 // Now, add the a - b -> sample elements.
436 size_t i = 1;
437 for (std::pair<const std::tuple<const Node *, const Node *>,
438 message_bridge::ClippedAverageFilter> &filter : filters_) {
439 const Node *const node_a = std::get<0>(filter.first);
440 const Node *const node_b = std::get<1>(filter.first);
441
442 const size_t node_a_index =
443 configuration::GetNodeIndex(configuration(), node_a);
444 const size_t node_b_index =
445 configuration::GetNodeIndex(configuration(), node_b);
446
447 // +a
448 map_matrix_(i, node_a_index) = 1.0;
449 // -b
450 map_matrix_(i, node_b_index) = -1.0;
451
452 // -> sample
453 filter.second.set_sample_pointer(&sample_matrix_(i, 0));
454
455 ++i;
Austin Schuh6f3babe2020-01-26 20:34:50 -0800456 }
457 }
458
Austin Schuh8bd96322020-02-13 21:18:22 -0800459 // Rank of the map matrix tells you if all the nodes are in communication with
460 // each other, which tells you if the offsets are observable.
461 const size_t connected_nodes =
462 Eigen::FullPivLU<Eigen::Matrix<double, Eigen::Dynamic, Eigen::Dynamic>>(
463 map_matrix_)
464 .rank();
465
466 // We don't need to support isolated nodes until someone has a real use case.
467 CHECK_EQ(connected_nodes, num_nodes)
468 << ": There is a node which isn't communicating with the rest.";
469
470 // Now, iterate through all the timestamps from all the nodes and seed
471 // everything.
472 for (std::unique_ptr<State> &state : states_) {
473 for (size_t i = 0; i < logged_configuration()->channels()->size(); ++i) {
474 TimestampMerger::DeliveryTimestamp timestamp =
Austin Schuh858c9f32020-08-31 16:56:12 -0700475 state->OldestTimestampForChannel(i);
Austin Schuh8bd96322020-02-13 21:18:22 -0800476 if (timestamp.monotonic_event_time != monotonic_clock::min_time) {
477 CHECK(state->MaybeUpdateTimestamp(timestamp, i));
478 }
479 }
480 }
481
482 // Make sure all the samples have been seeded.
483 for (int i = 1; i < sample_matrix_.cols(); ++i) {
484 // The seeding logic is pretty basic right now because we don't have great
485 // use cases yet. It wants to see data from every node. Blow up for now,
486 // and once we have a reason to do something different, update this logic.
487 // Maybe read further in the log file? Or seed off the realtime time?
488 CHECK_NE(sample_matrix_(i, 0), 0.0)
489 << ": Sample " << i << " is not seeded.";
490 }
491
492 // And solve.
493 offset_matrix_ = SolveOffsets();
494
495 // Save off the base offsets so we can work in deltas from here out. That
496 // will significantly simplify the numerical precision problems.
497 for (size_t i = 0; i < num_nodes; ++i) {
498 base_offset_matrix_(i, 0) =
499 std::chrono::duration_cast<std::chrono::nanoseconds>(
500 std::chrono::duration<double>(offset_matrix_(i, 0)));
501 }
502
503 {
504 // Shift everything so we never could (reasonably) require the distributed
505 // clock to have a large backwards jump in time. This makes it so the boot
506 // time on the node up the longest will essentially start matching the
507 // distributed clock.
508 const chrono::nanoseconds offset = -base_offset_matrix_.maxCoeff();
509 for (int i = 0; i < base_offset_matrix_.rows(); ++i) {
510 base_offset_matrix_(i, 0) += offset;
511 }
512 }
513
514 {
515 // Re-compute the samples and setup all the filters so that they
516 // subtract this base offset.
517
518 size_t i = 1;
519 for (std::pair<const std::tuple<const Node *, const Node *>,
520 message_bridge::ClippedAverageFilter> &filter : filters_) {
521 CHECK(filter.second.sample_pointer() == &sample_matrix_(i, 0));
522
523 const Node *const node_a = std::get<0>(filter.first);
524 const Node *const node_b = std::get<1>(filter.first);
525
526 const size_t node_a_index =
527 configuration::GetNodeIndex(configuration(), node_a);
528 const size_t node_b_index =
529 configuration::GetNodeIndex(configuration(), node_b);
530
531 filter.second.set_base_offset(base_offset_matrix_(node_a_index) -
532 base_offset_matrix_(node_b_index));
533
534 ++i;
535 }
536 }
537
538 // Now, iterate again through all the offsets now that we have set the base
539 // offset to something sane. This will seed everything with an accurate
540 // initial offset.
541 for (std::unique_ptr<State> &state : states_) {
542 for (size_t i = 0; i < logged_configuration()->channels()->size(); ++i) {
543 TimestampMerger::DeliveryTimestamp timestamp =
Austin Schuh858c9f32020-08-31 16:56:12 -0700544 state->OldestTimestampForChannel(i);
Austin Schuh8bd96322020-02-13 21:18:22 -0800545 if (timestamp.monotonic_event_time != monotonic_clock::min_time) {
546 CHECK(state->MaybeUpdateTimestamp(timestamp, i));
547 }
548 }
549 }
550
Austin Schuh858c9f32020-08-31 16:56:12 -0700551 for (std::unique_ptr<State> &state : states_) {
552 state->SeedSortedMessages();
553 }
554
Austin Schuh8bd96322020-02-13 21:18:22 -0800555 UpdateOffsets();
556
Austin Schuhcde938c2020-02-02 17:30:07 -0800557 // We want to start the log file at the last start time of the log files from
558 // all the nodes. Compute how long each node's simulation needs to run to
559 // move time to this point.
Austin Schuh8bd96322020-02-13 21:18:22 -0800560 distributed_clock::time_point start_time = distributed_clock::min_time;
Austin Schuhcde938c2020-02-02 17:30:07 -0800561
Austin Schuh8bd96322020-02-13 21:18:22 -0800562 for (std::unique_ptr<State> &state : states_) {
563 // Setup the realtime clock to have something sane in it now.
Austin Schuh858c9f32020-08-31 16:56:12 -0700564 state->SetRealtimeOffset(state->monotonic_start_time(),
565 state->realtime_start_time());
Austin Schuh8bd96322020-02-13 21:18:22 -0800566 // And start computing the start time on the distributed clock now that that
567 // works.
Austin Schuh858c9f32020-08-31 16:56:12 -0700568 start_time = std::max(
569 start_time, state->ToDistributedClock(state->monotonic_start_time()));
Austin Schuhcde938c2020-02-02 17:30:07 -0800570 }
Austin Schuh8bd96322020-02-13 21:18:22 -0800571 CHECK_GE(start_time, distributed_clock::epoch());
Austin Schuhcde938c2020-02-02 17:30:07 -0800572
Austin Schuh6f3babe2020-01-26 20:34:50 -0800573 // Forwarding is tracked per channel. If it is enabled, we want to turn it
574 // off. Otherwise messages replayed will get forwarded across to the other
575 // nodes, and also replayed on the other nodes. This may not satisfy all our
576 // users, but it'll start the discussion.
577 if (configuration::MultiNode(event_loop_factory_->configuration())) {
578 for (size_t i = 0; i < logged_configuration()->channels()->size(); ++i) {
579 const Channel *channel = logged_configuration()->channels()->Get(i);
580 const Node *node = configuration::GetNode(
581 configuration(), channel->source_node()->string_view());
582
Austin Schuh8bd96322020-02-13 21:18:22 -0800583 State *state =
584 states_[configuration::GetNodeIndex(configuration(), node)].get();
Austin Schuh6f3babe2020-01-26 20:34:50 -0800585
586 const Channel *remapped_channel =
Austin Schuh858c9f32020-08-31 16:56:12 -0700587 RemapChannel(state->event_loop(), channel);
Austin Schuh6f3babe2020-01-26 20:34:50 -0800588
589 event_loop_factory_->DisableForwarding(remapped_channel);
590 }
Austin Schuh4c3b9702020-08-30 11:34:55 -0700591
592 // If we are replaying a log, we don't want a bunch of redundant messages
593 // from both the real message bridge and simulated message bridge.
594 event_loop_factory_->DisableStatistics();
Austin Schuh6f3babe2020-01-26 20:34:50 -0800595 }
596
Austin Schuhcde938c2020-02-02 17:30:07 -0800597 // While we are starting the system up, we might be relying on matching data
598 // to timestamps on log files where the timestamp log file starts before the
599 // data. In this case, it is reasonable to expect missing data.
600 ignore_missing_data_ = true;
Brian Silverman8a32ce62020-08-12 12:02:38 -0700601 VLOG(1) << "Running until start time: " << start_time;
Austin Schuh8bd96322020-02-13 21:18:22 -0800602 event_loop_factory_->RunFor(start_time.time_since_epoch());
Brian Silverman8a32ce62020-08-12 12:02:38 -0700603 VLOG(1) << "At start time";
Austin Schuhcde938c2020-02-02 17:30:07 -0800604 // Now that we are running for real, missing data means that the log file is
605 // corrupted or went wrong.
606 ignore_missing_data_ = false;
Austin Schuh92547522019-12-28 14:33:43 -0800607}
608
Austin Schuh8bd96322020-02-13 21:18:22 -0800609void LogReader::UpdateOffsets() {
610 // TODO(austin): Evaluate less accurate inverses. We might be able to
611 // do some tricks to keep the accuracy up.
612 offset_matrix_ = SolveOffsets();
613
614 size_t node_index = 0;
615 for (std::unique_ptr<State> &state : states_) {
Austin Schuh858c9f32020-08-31 16:56:12 -0700616 state->SetDistributedOffset(-offset(node_index), 1.0);
Austin Schuh8bd96322020-02-13 21:18:22 -0800617 ++node_index;
618 }
619}
620
621std::tuple<message_bridge::ClippedAverageFilter *, bool> LogReader::GetFilter(
622 const Node *node_a, const Node *node_b) {
623 CHECK_NE(node_a, node_b);
624 CHECK_EQ(configuration::GetNode(configuration(), node_a), node_a);
625 CHECK_EQ(configuration::GetNode(configuration(), node_b), node_b);
626
627 if (node_a > node_b) {
628 return std::make_pair(std::get<0>(GetFilter(node_b, node_a)), false);
629 }
630
631 auto tuple = std::make_tuple(node_a, node_b);
632
633 auto it = filters_.find(tuple);
634
635 if (it == filters_.end()) {
636 auto &x = filters_
637 .insert(std::make_pair(
638 tuple, message_bridge::ClippedAverageFilter()))
639 .first->second;
640 if (FLAGS_timestamps_to_csv) {
641 std::string fwd_name =
642 absl::StrCat("/tmp/timestamp_", node_a->name()->string_view(), "_",
Austin Schuh85357ee2020-08-24 16:41:42 -0700643 node_b->name()->string_view());
644 x.SetFwdCsvFileName(fwd_name);
Austin Schuh8bd96322020-02-13 21:18:22 -0800645 std::string rev_name =
646 absl::StrCat("/tmp/timestamp_", node_b->name()->string_view(), "_",
Austin Schuh85357ee2020-08-24 16:41:42 -0700647 node_a->name()->string_view());
648 x.SetRevCsvFileName(rev_name);
Austin Schuh8bd96322020-02-13 21:18:22 -0800649 }
650
651 return std::make_tuple(&x, true);
652 } else {
653 return std::make_tuple(&(it->second), true);
654 }
655}
656
657bool LogReader::State::MaybeUpdateTimestamp(
658 const TimestampMerger::DeliveryTimestamp &channel_timestamp,
659 int channel_index) {
660 if (channel_timestamp.monotonic_remote_time == monotonic_clock::min_time) {
Austin Schuh858c9f32020-08-31 16:56:12 -0700661 CHECK(std::get<0>(filters_[channel_index]) == nullptr);
Austin Schuh8bd96322020-02-13 21:18:22 -0800662 return false;
663 }
664
665 // Got a forwarding timestamp!
Austin Schuh858c9f32020-08-31 16:56:12 -0700666 CHECK(std::get<0>(filters_[channel_index]) != nullptr);
Austin Schuh8bd96322020-02-13 21:18:22 -0800667
668 // Call the correct method depending on if we are the forward or reverse
669 // direction here.
Austin Schuh858c9f32020-08-31 16:56:12 -0700670 if (std::get<1>(filters_[channel_index])) {
671 std::get<0>(filters_[channel_index])
Austin Schuh8bd96322020-02-13 21:18:22 -0800672 ->FwdSample(channel_timestamp.monotonic_event_time,
673 channel_timestamp.monotonic_event_time -
674 channel_timestamp.monotonic_remote_time);
675 } else {
Austin Schuh858c9f32020-08-31 16:56:12 -0700676 std::get<0>(filters_[channel_index])
Austin Schuh8bd96322020-02-13 21:18:22 -0800677 ->RevSample(channel_timestamp.monotonic_event_time,
678 channel_timestamp.monotonic_event_time -
679 channel_timestamp.monotonic_remote_time);
680 }
681 return true;
682}
683
Austin Schuhe309d2a2019-11-29 13:25:21 -0800684void LogReader::Register(EventLoop *event_loop) {
Austin Schuh8bd96322020-02-13 21:18:22 -0800685 State *state =
686 states_[configuration::GetNodeIndex(configuration(), event_loop->node())]
687 .get();
Austin Schuh6f3babe2020-01-26 20:34:50 -0800688
Austin Schuh858c9f32020-08-31 16:56:12 -0700689 state->set_event_loop(event_loop);
Austin Schuhe309d2a2019-11-29 13:25:21 -0800690
Tyler Chatow67ddb032020-01-12 14:30:04 -0800691 // We don't run timing reports when trying to print out logged data, because
692 // otherwise we would end up printing out the timing reports themselves...
693 // This is only really relevant when we are replaying into a simulation.
Austin Schuh6f3babe2020-01-26 20:34:50 -0800694 event_loop->SkipTimingReport();
695 event_loop->SkipAosLog();
Austin Schuh39788ff2019-12-01 18:22:57 -0800696
Austin Schuh858c9f32020-08-31 16:56:12 -0700697 const bool has_data = state->SetNode();
Austin Schuhe309d2a2019-11-29 13:25:21 -0800698
Austin Schuh858c9f32020-08-31 16:56:12 -0700699 state->SetChannelCount(logged_configuration()->channels()->size());
Austin Schuh8bd96322020-02-13 21:18:22 -0800700
Austin Schuh858c9f32020-08-31 16:56:12 -0700701 for (size_t i = 0; i < logged_configuration()->channels()->size(); ++i) {
Austin Schuh6f3babe2020-01-26 20:34:50 -0800702 const Channel *channel =
703 RemapChannel(event_loop, logged_configuration()->channels()->Get(i));
Austin Schuh6331ef92020-01-07 18:28:09 -0800704
Austin Schuh858c9f32020-08-31 16:56:12 -0700705 std::tuple<message_bridge::ClippedAverageFilter *, bool> filter =
706 std::make_tuple(nullptr, false);
Austin Schuh8bd96322020-02-13 21:18:22 -0800707
Austin Schuh858c9f32020-08-31 16:56:12 -0700708 NodeEventLoopFactory *channel_target_event_loop_factory = nullptr;
Austin Schuh8bd96322020-02-13 21:18:22 -0800709
710 if (!configuration::ChannelIsSendableOnNode(channel, event_loop->node()) &&
711 configuration::ChannelIsReadableOnNode(channel, event_loop->node())) {
712 const Node *target_node = configuration::GetNode(
713 event_loop->configuration(), channel->source_node()->string_view());
Austin Schuh858c9f32020-08-31 16:56:12 -0700714 filter = GetFilter(event_loop->node(), target_node);
Austin Schuh8bd96322020-02-13 21:18:22 -0800715
716 if (event_loop_factory_ != nullptr) {
Austin Schuh858c9f32020-08-31 16:56:12 -0700717 channel_target_event_loop_factory =
Austin Schuh8bd96322020-02-13 21:18:22 -0800718 event_loop_factory_->GetNodeEventLoopFactory(target_node);
719 }
720 }
Austin Schuh858c9f32020-08-31 16:56:12 -0700721
722 state->SetChannel(i, event_loop->MakeRawSender(channel), filter,
723 channel_target_event_loop_factory);
Austin Schuhe309d2a2019-11-29 13:25:21 -0800724 }
725
Austin Schuh6aa77be2020-02-22 21:06:40 -0800726 // If we didn't find any log files with data in them, we won't ever get a
727 // callback or be live. So skip the rest of the setup.
728 if (!has_data) {
729 return;
730 }
731
Austin Schuh858c9f32020-08-31 16:56:12 -0700732 state->set_timer_handler(event_loop->AddTimer([this, state]() {
733 if (state->OldestMessageTime() == monotonic_clock::max_time) {
Austin Schuh6f3babe2020-01-26 20:34:50 -0800734 --live_nodes_;
Austin Schuh6aa77be2020-02-22 21:06:40 -0800735 VLOG(1) << "Node down!";
Austin Schuh6f3babe2020-01-26 20:34:50 -0800736 if (live_nodes_ == 0) {
737 event_loop_factory_->Exit();
738 }
James Kuszmaul314f1672020-01-03 20:02:08 -0800739 return;
740 }
Austin Schuh8bd96322020-02-13 21:18:22 -0800741 bool update_offsets = false;
Austin Schuh6f3babe2020-01-26 20:34:50 -0800742 TimestampMerger::DeliveryTimestamp channel_timestamp;
Austin Schuh05b70472020-01-01 17:11:17 -0800743 int channel_index;
744 FlatbufferVector<MessageHeader> channel_data =
745 FlatbufferVector<MessageHeader>::Empty();
746
Austin Schuh858c9f32020-08-31 16:56:12 -0700747 bool dummy_update_time = false;
Austin Schuh05b70472020-01-01 17:11:17 -0800748 std::tie(channel_timestamp, channel_index, channel_data) =
Austin Schuh858c9f32020-08-31 16:56:12 -0700749 state->PopOldest(&dummy_update_time);
Austin Schuh05b70472020-01-01 17:11:17 -0800750
Austin Schuhe309d2a2019-11-29 13:25:21 -0800751 const monotonic_clock::time_point monotonic_now =
Austin Schuh858c9f32020-08-31 16:56:12 -0700752 state->event_loop()->context().monotonic_event_time;
Austin Schuh6f3babe2020-01-26 20:34:50 -0800753 CHECK(monotonic_now == channel_timestamp.monotonic_event_time)
Austin Schuh858c9f32020-08-31 16:56:12 -0700754 << ": " << FlatbufferToJson(state->event_loop()->node()) << " Now "
Austin Schuh8bd96322020-02-13 21:18:22 -0800755 << monotonic_now << " trying to send "
756 << channel_timestamp.monotonic_event_time << " failure "
Austin Schuh858c9f32020-08-31 16:56:12 -0700757 << state->DebugString();
Austin Schuhe309d2a2019-11-29 13:25:21 -0800758
Austin Schuh6f3babe2020-01-26 20:34:50 -0800759 if (channel_timestamp.monotonic_event_time >
Austin Schuh858c9f32020-08-31 16:56:12 -0700760 state->monotonic_start_time() ||
Austin Schuh15649d62019-12-28 16:36:38 -0800761 event_loop_factory_ != nullptr) {
Austin Schuh8bd96322020-02-13 21:18:22 -0800762 if ((!ignore_missing_data_ && !FLAGS_skip_missing_forwarding_entries &&
Austin Schuh858c9f32020-08-31 16:56:12 -0700763 !state->at_end()) ||
Austin Schuh05b70472020-01-01 17:11:17 -0800764 channel_data.message().data() != nullptr) {
765 CHECK(channel_data.message().data() != nullptr)
766 << ": Got a message without data. Forwarding entry which was "
Austin Schuh6f3babe2020-01-26 20:34:50 -0800767 "not matched? Use --skip_missing_forwarding_entries to ignore "
Austin Schuh15649d62019-12-28 16:36:38 -0800768 "this.";
Austin Schuh92547522019-12-28 14:33:43 -0800769
Austin Schuh8bd96322020-02-13 21:18:22 -0800770 if (state->MaybeUpdateTimestamp(channel_timestamp, channel_index)) {
771 // Confirm that the message was sent on the sending node before the
772 // destination node (this node). As a proxy, do this by making sure
773 // that time on the source node is past when the message was sent.
774 CHECK_LT(channel_timestamp.monotonic_remote_time,
Austin Schuh858c9f32020-08-31 16:56:12 -0700775 state->monotonic_remote_now(channel_index));
Austin Schuh8bd96322020-02-13 21:18:22 -0800776
777 update_offsets = true;
778
779 if (FLAGS_timestamps_to_csv) {
780 if (offset_fp_ == nullptr) {
781 offset_fp_ = fopen("/tmp/offsets.csv", "w");
782 fprintf(
783 offset_fp_,
784 "# time_since_start, offset node 0, offset node 1, ...\n");
785 first_time_ = channel_timestamp.realtime_event_time;
786 }
787
788 fprintf(offset_fp_, "%.9f",
789 std::chrono::duration_cast<std::chrono::duration<double>>(
790 channel_timestamp.realtime_event_time - first_time_)
791 .count());
792 for (int i = 0; i < base_offset_matrix_.rows(); ++i) {
793 fprintf(
794 offset_fp_, ", %.9f",
795 offset_matrix_(i, 0) +
796 std::chrono::duration_cast<std::chrono::duration<double>>(
797 base_offset_matrix_(i, 0))
798 .count());
799 }
800 fprintf(offset_fp_, "\n");
801 }
Austin Schuh8bd96322020-02-13 21:18:22 -0800802 }
803
Austin Schuh15649d62019-12-28 16:36:38 -0800804 // If we have access to the factory, use it to fix the realtime time.
Austin Schuh858c9f32020-08-31 16:56:12 -0700805 state->SetRealtimeOffset(channel_timestamp.monotonic_event_time,
806 channel_timestamp.realtime_event_time);
Austin Schuh15649d62019-12-28 16:36:38 -0800807
Austin Schuh858c9f32020-08-31 16:56:12 -0700808 state->Send(channel_index, channel_data.message().data()->Data(),
809 channel_data.message().data()->size(),
810 channel_timestamp.monotonic_remote_time,
811 channel_timestamp.realtime_remote_time,
812 channel_timestamp.remote_queue_index);
813 } else if (state->at_end()) {
Austin Schuh8bd96322020-02-13 21:18:22 -0800814 // We are at the end of the log file and found missing data. Finish
815 // reading the rest of the log file and call it quits. We don't want to
816 // replay partial data.
Austin Schuh858c9f32020-08-31 16:56:12 -0700817 while (state->OldestMessageTime() != monotonic_clock::max_time) {
818 bool update_time_dummy;
819 state->PopOldest(&update_time_dummy);
Austin Schuh8bd96322020-02-13 21:18:22 -0800820 }
Austin Schuh92547522019-12-28 14:33:43 -0800821 }
Austin Schuh8bd96322020-02-13 21:18:22 -0800822
Austin Schuhe309d2a2019-11-29 13:25:21 -0800823 } else {
Austin Schuh6f3babe2020-01-26 20:34:50 -0800824 LOG(WARNING)
825 << "Not sending data from before the start of the log file. "
826 << channel_timestamp.monotonic_event_time.time_since_epoch().count()
827 << " start " << monotonic_start_time().time_since_epoch().count()
828 << " " << FlatbufferToJson(channel_data);
Austin Schuhe309d2a2019-11-29 13:25:21 -0800829 }
830
Austin Schuh858c9f32020-08-31 16:56:12 -0700831 const monotonic_clock::time_point next_time = state->OldestMessageTime();
Austin Schuh6f3babe2020-01-26 20:34:50 -0800832 if (next_time != monotonic_clock::max_time) {
Austin Schuh858c9f32020-08-31 16:56:12 -0700833 state->Setup(next_time);
James Kuszmaul314f1672020-01-03 20:02:08 -0800834 } else {
835 // Set a timer up immediately after now to die. If we don't do this, then
836 // the senders waiting on the message we just read will never get called.
Austin Schuheecb9282020-01-08 17:43:30 -0800837 if (event_loop_factory_ != nullptr) {
Austin Schuh858c9f32020-08-31 16:56:12 -0700838 state->Setup(monotonic_now + event_loop_factory_->send_delay() +
839 std::chrono::nanoseconds(1));
Austin Schuheecb9282020-01-08 17:43:30 -0800840 }
Austin Schuhe309d2a2019-11-29 13:25:21 -0800841 }
Austin Schuh8bd96322020-02-13 21:18:22 -0800842
843 // Once we make this call, the current time changes. So do everything which
844 // involves time before changing it. That especially includes sending the
845 // message.
846 if (update_offsets) {
847 UpdateOffsets();
848 }
Austin Schuh858c9f32020-08-31 16:56:12 -0700849 }));
Austin Schuhe309d2a2019-11-29 13:25:21 -0800850
Austin Schuh6f3babe2020-01-26 20:34:50 -0800851 ++live_nodes_;
852
Austin Schuh858c9f32020-08-31 16:56:12 -0700853 if (state->OldestMessageTime() != monotonic_clock::max_time) {
854 event_loop->OnRun([state]() { state->Setup(state->OldestMessageTime()); });
Austin Schuhe309d2a2019-11-29 13:25:21 -0800855 }
856}
857
858void LogReader::Deregister() {
James Kuszmaul84ff3e52020-01-03 19:48:53 -0800859 // Make sure that things get destroyed in the correct order, rather than
860 // relying on getting the order correct in the class definition.
Austin Schuh8bd96322020-02-13 21:18:22 -0800861 for (std::unique_ptr<State> &state : states_) {
Austin Schuh858c9f32020-08-31 16:56:12 -0700862 state->Deregister();
Austin Schuhe309d2a2019-11-29 13:25:21 -0800863 }
Austin Schuh92547522019-12-28 14:33:43 -0800864
James Kuszmaul84ff3e52020-01-03 19:48:53 -0800865 event_loop_factory_unique_ptr_.reset();
866 event_loop_factory_ = nullptr;
Austin Schuhe309d2a2019-11-29 13:25:21 -0800867}
868
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800869void LogReader::RemapLoggedChannel(std::string_view name, std::string_view type,
870 std::string_view add_prefix) {
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800871 for (size_t ii = 0; ii < logged_configuration()->channels()->size(); ++ii) {
872 const Channel *const channel = logged_configuration()->channels()->Get(ii);
873 if (channel->name()->str() == name &&
874 channel->type()->string_view() == type) {
875 CHECK_EQ(0u, remapped_channels_.count(ii))
876 << "Already remapped channel "
877 << configuration::CleanedChannelToString(channel);
878 remapped_channels_[ii] = std::string(add_prefix) + std::string(name);
879 VLOG(1) << "Remapping channel "
880 << configuration::CleanedChannelToString(channel)
881 << " to have name " << remapped_channels_[ii];
Austin Schuh6331ef92020-01-07 18:28:09 -0800882 MakeRemappedConfig();
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800883 return;
884 }
885 }
886 LOG(FATAL) << "Unabled to locate channel with name " << name << " and type "
887 << type;
888}
889
890void LogReader::MakeRemappedConfig() {
Austin Schuh8bd96322020-02-13 21:18:22 -0800891 for (std::unique_ptr<State> &state : states_) {
Austin Schuh6aa77be2020-02-22 21:06:40 -0800892 if (state) {
Austin Schuh858c9f32020-08-31 16:56:12 -0700893 CHECK(!state->event_loop())
Austin Schuh6aa77be2020-02-22 21:06:40 -0800894 << ": Can't change the mapping after the events are scheduled.";
895 }
Austin Schuh6f3babe2020-01-26 20:34:50 -0800896 }
Austin Schuhac0771c2020-01-07 18:36:30 -0800897
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800898 // If no remapping occurred and we are using the original config, then there
899 // is nothing interesting to do here.
900 if (remapped_channels_.empty() && replay_configuration_ == nullptr) {
Austin Schuh6f3babe2020-01-26 20:34:50 -0800901 remapped_configuration_ = logged_configuration();
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800902 return;
903 }
904 // Config to copy Channel definitions from. Use the specified
905 // replay_configuration_ if it has been provided.
906 const Configuration *const base_config = replay_configuration_ == nullptr
907 ? logged_configuration()
908 : replay_configuration_;
909 // The remapped config will be identical to the base_config, except that it
910 // will have a bunch of extra channels in the channel list, which are exact
911 // copies of the remapped channels, but with different names.
912 // Because the flatbuffers API is a pain to work with, this requires a bit of
913 // a song-and-dance to get copied over.
914 // The order of operations is to:
915 // 1) Make a flatbuffer builder for a config that will just contain a list of
916 // the new channels that we want to add.
917 // 2) For each channel that we are remapping:
918 // a) Make a buffer/builder and construct into it a Channel table that only
919 // contains the new name for the channel.
920 // b) Merge the new channel with just the name into the channel that we are
921 // trying to copy, built in the flatbuffer builder made in 1. This gives
922 // us the new channel definition that we need.
923 // 3) Using this list of offsets, build the Configuration of just new
924 // Channels.
925 // 4) Merge the Configuration with the new Channels into the base_config.
926 // 5) Call MergeConfiguration() on that result to give MergeConfiguration a
927 // chance to sanitize the config.
928
929 // This is the builder that we use for the config containing all the new
930 // channels.
931 flatbuffers::FlatBufferBuilder new_config_fbb;
Austin Schuhd7b15da2020-02-17 15:06:11 -0800932 new_config_fbb.ForceDefaults(true);
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800933 std::vector<flatbuffers::Offset<Channel>> channel_offsets;
934 for (auto &pair : remapped_channels_) {
935 // This is the builder that we use for creating the Channel with just the
936 // new name.
937 flatbuffers::FlatBufferBuilder new_name_fbb;
Austin Schuhd7b15da2020-02-17 15:06:11 -0800938 new_name_fbb.ForceDefaults(true);
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800939 const flatbuffers::Offset<flatbuffers::String> name_offset =
940 new_name_fbb.CreateString(pair.second);
941 ChannelBuilder new_name_builder(new_name_fbb);
942 new_name_builder.add_name(name_offset);
943 new_name_fbb.Finish(new_name_builder.Finish());
944 const FlatbufferDetachedBuffer<Channel> new_name = new_name_fbb.Release();
945 // Retrieve the channel that we want to copy, confirming that it is actually
946 // present in base_config.
947 const Channel *const base_channel = CHECK_NOTNULL(configuration::GetChannel(
948 base_config, logged_configuration()->channels()->Get(pair.first), "",
949 nullptr));
950 // Actually create the new channel and put it into the vector of Offsets
951 // that we will use to create the new Configuration.
952 channel_offsets.emplace_back(MergeFlatBuffers<Channel>(
953 reinterpret_cast<const flatbuffers::Table *>(base_channel),
954 reinterpret_cast<const flatbuffers::Table *>(&new_name.message()),
955 &new_config_fbb));
956 }
957 // Create the Configuration containing the new channels that we want to add.
Austin Schuhfa895892020-01-07 20:07:41 -0800958 const auto new_name_vector_offsets =
959 new_config_fbb.CreateVector(channel_offsets);
James Kuszmaulc7bbb3e2020-01-03 20:01:00 -0800960 ConfigurationBuilder new_config_builder(new_config_fbb);
961 new_config_builder.add_channels(new_name_vector_offsets);
962 new_config_fbb.Finish(new_config_builder.Finish());
963 const FlatbufferDetachedBuffer<Configuration> new_name_config =
964 new_config_fbb.Release();
965 // Merge the new channels configuration into the base_config, giving us the
966 // remapped configuration.
967 remapped_configuration_buffer_ =
968 std::make_unique<FlatbufferDetachedBuffer<Configuration>>(
969 MergeFlatBuffers<Configuration>(base_config,
970 &new_name_config.message()));
971 // Call MergeConfiguration to deal with sanitizing the config.
972 remapped_configuration_buffer_ =
973 std::make_unique<FlatbufferDetachedBuffer<Configuration>>(
974 configuration::MergeConfiguration(*remapped_configuration_buffer_));
975
976 remapped_configuration_ = &remapped_configuration_buffer_->message();
977}
978
Austin Schuh6f3babe2020-01-26 20:34:50 -0800979const Channel *LogReader::RemapChannel(const EventLoop *event_loop,
980 const Channel *channel) {
981 std::string_view channel_name = channel->name()->string_view();
982 std::string_view channel_type = channel->type()->string_view();
983 const int channel_index =
984 configuration::ChannelIndex(logged_configuration(), channel);
985 // If the channel is remapped, find the correct channel name to use.
986 if (remapped_channels_.count(channel_index) > 0) {
Austin Schuhee711052020-08-24 16:06:09 -0700987 VLOG(3) << "Got remapped channel on "
Austin Schuh6f3babe2020-01-26 20:34:50 -0800988 << configuration::CleanedChannelToString(channel);
989 channel_name = remapped_channels_[channel_index];
990 }
991
Austin Schuhee711052020-08-24 16:06:09 -0700992 VLOG(2) << "Going to remap channel " << channel_name << " " << channel_type;
Austin Schuh6f3babe2020-01-26 20:34:50 -0800993 const Channel *remapped_channel = configuration::GetChannel(
994 event_loop->configuration(), channel_name, channel_type,
995 event_loop->name(), event_loop->node());
996
997 CHECK(remapped_channel != nullptr)
998 << ": Unable to send {\"name\": \"" << channel_name << "\", \"type\": \""
999 << channel_type << "\"} because it is not in the provided configuration.";
1000
1001 return remapped_channel;
1002}
1003
Austin Schuh858c9f32020-08-31 16:56:12 -07001004LogReader::State::State(std::unique_ptr<ChannelMerger> channel_merger)
1005 : channel_merger_(std::move(channel_merger)) {}
1006
1007EventLoop *LogReader::State::SetNodeEventLoopFactory(
1008 NodeEventLoopFactory *node_event_loop_factory) {
1009 node_event_loop_factory_ = node_event_loop_factory;
1010 event_loop_unique_ptr_ =
1011 node_event_loop_factory_->MakeEventLoop("log_reader");
1012 return event_loop_unique_ptr_.get();
1013}
1014
1015void LogReader::State::SetChannelCount(size_t count) {
1016 channels_.resize(count);
1017 filters_.resize(count);
1018 channel_target_event_loop_factory_.resize(count);
1019}
1020
1021void LogReader::State::SetChannel(
1022 size_t channel, std::unique_ptr<RawSender> sender,
1023 std::tuple<message_bridge::ClippedAverageFilter *, bool> filter,
1024 NodeEventLoopFactory *channel_target_event_loop_factory) {
1025 channels_[channel] = std::move(sender);
1026 filters_[channel] = filter;
1027 channel_target_event_loop_factory_[channel] =
1028 channel_target_event_loop_factory;
1029}
1030
1031std::tuple<TimestampMerger::DeliveryTimestamp, int,
1032 FlatbufferVector<MessageHeader>>
1033LogReader::State::PopOldest(bool *update_time) {
1034 CHECK_GT(sorted_messages_.size(), 0u);
1035
1036 std::tuple<TimestampMerger::DeliveryTimestamp, int,
1037 FlatbufferVector<MessageHeader>>
1038 result = std::move(sorted_messages_.front());
1039 VLOG(1) << MaybeNodeName(event_loop_->node()) << "PopOldest Popping "
1040 << std::get<0>(result).monotonic_event_time;
1041 sorted_messages_.pop_front();
1042 SeedSortedMessages();
1043
1044 *update_time = false;
1045 return std::make_tuple(std::get<0>(result), std::get<1>(result),
1046 std::move(std::get<2>(result)));
1047}
1048
1049monotonic_clock::time_point LogReader::State::OldestMessageTime() const {
1050 if (sorted_messages_.size() > 0) {
1051 VLOG(1) << MaybeNodeName(event_loop_->node()) << "oldest message at "
1052 << std::get<0>(sorted_messages_.front()).monotonic_event_time;
1053 return std::get<0>(sorted_messages_.front()).monotonic_event_time;
1054 }
1055
1056 return channel_merger_->OldestMessageTime();
1057}
1058
1059void LogReader::State::SeedSortedMessages() {
1060 const aos::monotonic_clock::time_point end_queue_time =
1061 (sorted_messages_.size() > 0
1062 ? std::get<0>(sorted_messages_.front()).monotonic_event_time
1063 : channel_merger_->monotonic_start_time()) +
1064 std::chrono::seconds(2);
1065
1066 while (true) {
1067 if (channel_merger_->OldestMessageTime() == monotonic_clock::max_time) {
1068 return;
1069 }
1070 if (sorted_messages_.size() > 0) {
1071 // Stop placing sorted messages on the list once we have 2 seconds
1072 // queued up (but queue at least until the log starts.
1073 if (end_queue_time <
1074 std::get<0>(sorted_messages_.back()).monotonic_event_time) {
1075 return;
1076 }
1077 }
1078
1079 TimestampMerger::DeliveryTimestamp channel_timestamp;
1080 int channel_index;
1081 FlatbufferVector<MessageHeader> channel_data =
1082 FlatbufferVector<MessageHeader>::Empty();
1083
1084 std::tie(channel_timestamp, channel_index, channel_data) =
1085 channel_merger_->PopOldest();
1086
1087 sorted_messages_.emplace_back(channel_timestamp, channel_index,
1088 std::move(channel_data));
1089 }
1090}
1091
1092void LogReader::State::Deregister() {
1093 for (size_t i = 0; i < channels_.size(); ++i) {
1094 channels_[i].reset();
1095 }
1096 event_loop_unique_ptr_.reset();
1097 event_loop_ = nullptr;
1098 timer_handler_ = nullptr;
1099 node_event_loop_factory_ = nullptr;
1100}
1101
Austin Schuhe309d2a2019-11-29 13:25:21 -08001102} // namespace logger
1103} // namespace aos