Support nodes rebooting
We have log files which span a reboot. We want to be able to replay the
timeline across that reboot so we can run simulations and everything
else interesting.
This requires a bunch of stuff, unfortunately.
The most visible one is that we need to be able to "reboot" a node.
This means we need a way of starting it up and stopping it. There are
now OnStartup and OnShutdown handlers in NodeEventLoopFactory to serve
this purpose, and better application context tracking to make it easier
to start and stop applications through a virtual starter.
This requires LogReader and the simulated network bridge to be
refactored to support nodes coming and going while the main application
continues to run.
From there, everything else is just a massive amount of plumbing of the
BootTimestamp through everything just short of the user. Boot UUIDs
were put in TimeConverter so everything related to rebooting is all
nicely together.
Change-Id: I2cfb659c5764c1dd80dc66f33cfab3937159e324
Signed-off-by: Austin Schuh <austin.schuh@bluerivertech.com>
diff --git a/aos/events/logging/log_reader.cc b/aos/events/logging/log_reader.cc
index b320474..a1269e1 100644
--- a/aos/events/logging/log_reader.cc
+++ b/aos/events/logging/log_reader.cc
@@ -49,14 +49,6 @@
namespace logger {
namespace {
-std::string LogFileVectorToString(std::vector<LogFile> log_files) {
- std::stringstream ss;
- for (const auto &f : log_files) {
- ss << f << "\n";
- }
- return ss.str();
-}
-
// Copies the channel, removing the schema as we go. If new_name is provided,
// it is used instead of the name inside the channel. If new_type is provided,
// it is used instead of the type in the channel.
@@ -230,7 +222,8 @@
if (!configuration::MultiNode(configuration())) {
states_.emplace_back(std::make_unique<State>(
- std::make_unique<TimestampMapper>(FilterPartsForNode(log_files_, ""))));
+ std::make_unique<TimestampMapper>(FilterPartsForNode(log_files_, "")),
+ nullptr));
} else {
if (replay_configuration) {
CHECK_EQ(logged_configuration()->nodes()->size(),
@@ -295,19 +288,82 @@
return state->realtime_start_time(0);
}
+void LogReader::OnStart(std::function<void()> fn) {
+ CHECK(!configuration::MultiNode(configuration()));
+ OnStart(nullptr, std::move(fn));
+}
+
+void LogReader::OnStart(const Node *node, std::function<void()> fn) {
+ const int node_index = configuration::GetNodeIndex(configuration(), node);
+ CHECK_GE(node_index, 0);
+ CHECK_LT(node_index, static_cast<int>(states_.size()));
+ State *state = states_[node_index].get();
+ CHECK(state != nullptr) << ": Unknown node " << FlatbufferToJson(node);
+
+ state->OnStart(std::move(fn));
+}
+
+void LogReader::State::OnStart(std::function<void()> fn) {
+ on_starts_.emplace_back(std::move(fn));
+}
+
+void LogReader::State::RunOnStart() {
+ SetRealtimeOffset(monotonic_start_time(boot_count()),
+ realtime_start_time(boot_count()));
+
+ VLOG(1) << "Starting " << MaybeNodeName(node()) << "at time "
+ << monotonic_start_time(boot_count());
+ for (size_t i = 0; i < on_starts_.size(); ++i) {
+ on_starts_[i]();
+ }
+ stopped_ = false;
+ started_ = true;
+}
+
+void LogReader::OnEnd(std::function<void()> fn) {
+ CHECK(!configuration::MultiNode(configuration()));
+ OnEnd(nullptr, std::move(fn));
+}
+
+void LogReader::OnEnd(const Node *node, std::function<void()> fn) {
+ const int node_index = configuration::GetNodeIndex(configuration(), node);
+ CHECK_GE(node_index, 0);
+ CHECK_LT(node_index, static_cast<int>(states_.size()));
+ State *state = states_[node_index].get();
+ CHECK(state != nullptr) << ": Unknown node " << FlatbufferToJson(node);
+
+ state->OnEnd(std::move(fn));
+}
+
+void LogReader::State::OnEnd(std::function<void()> fn) {
+ on_ends_.emplace_back(std::move(fn));
+}
+
+void LogReader::State::RunOnEnd() {
+ VLOG(1) << "Ending " << MaybeNodeName(node()) << "at time "
+ << monotonic_start_time(boot_count());
+ for (size_t i = 0; i < on_ends_.size(); ++i) {
+ on_ends_[i]();
+ }
+
+ stopped_ = true;
+ started_ = false;
+}
+
void LogReader::Register() {
event_loop_factory_unique_ptr_ =
std::make_unique<SimulatedEventLoopFactory>(configuration());
Register(event_loop_factory_unique_ptr_.get());
}
-void LogReader::Register(SimulatedEventLoopFactory *event_loop_factory) {
+void LogReader::RegisterWithoutStarting(
+ SimulatedEventLoopFactory *event_loop_factory) {
event_loop_factory_ = event_loop_factory;
remapped_configuration_ = event_loop_factory_->configuration();
filters_ =
std::make_unique<message_bridge::MultiNodeNoncausalOffsetEstimator>(
event_loop_factory_->configuration(), logged_configuration(),
- FLAGS_skip_order_validation,
+ log_files_[0].boots, FLAGS_skip_order_validation,
chrono::duration_cast<chrono::nanoseconds>(
chrono::duration<double>(FLAGS_time_estimation_buffer_seconds)));
@@ -318,29 +374,14 @@
std::vector<LogParts> filtered_parts = FilterPartsForNode(
log_files_, node != nullptr ? node->name()->string_view() : "");
- // Confirm that all the parts are from the same boot if there are enough
- // parts to not be from the same boot.
- if (filtered_parts.size() > 1u) {
- for (size_t i = 1; i < filtered_parts.size(); ++i) {
- CHECK_EQ(filtered_parts[i].source_boot_uuid,
- filtered_parts[0].source_boot_uuid)
- << ": Found parts from different boots for node "
- << node->name()->string_view() << " "
- << LogFileVectorToString(log_files_);
- }
- if (!filtered_parts[0].source_boot_uuid.empty()) {
- event_loop_factory_->GetNodeEventLoopFactory(node)->set_boot_uuid(
- filtered_parts[0].source_boot_uuid);
- }
- }
-
states_[node_index] = std::make_unique<State>(
filtered_parts.size() == 0u
? nullptr
- : std::make_unique<TimestampMapper>(std::move(filtered_parts)));
+ : std::make_unique<TimestampMapper>(std::move(filtered_parts)),
+ node);
State *state = states_[node_index].get();
- state->set_event_loop(state->SetNodeEventLoopFactory(
- event_loop_factory_->GetNodeEventLoopFactory(node)));
+ state->SetNodeEventLoopFactory(
+ event_loop_factory_->GetNodeEventLoopFactory(node));
state->SetChannelCount(logged_configuration()->channels()->size());
timestamp_mappers.emplace_back(state->timestamp_mapper());
@@ -372,7 +413,22 @@
configuration::GetNodeIndex(configuration(), node);
State *state = states_[node_index].get();
- Register(state->event_loop());
+ // If we didn't find any log files with data in them, we won't ever get a
+ // callback or be live. So skip the rest of the setup.
+ if (state->OldestMessageTime() == BootTimestamp::max_time()) {
+ continue;
+ }
+ ++live_nodes_;
+
+ NodeEventLoopFactory *node_factory =
+ event_loop_factory_->GetNodeEventLoopFactory(node);
+ node_factory->OnStartup([this, state, node]() {
+ RegisterDuringStartup(state->MakeEventLoop(), node);
+ });
+ node_factory->OnShutdown([this, state, node]() {
+ RegisterDuringStartup(nullptr, node);
+ state->DestroyEventLoop();
+ });
}
if (live_nodes_ == 0) {
@@ -387,34 +443,6 @@
state->SeedSortedMessages();
}
- // We want to start the log file at the last start time of the log files
- // from all the nodes. Compute how long each node's simulation needs to run
- // to move time to this point.
- distributed_clock::time_point start_time = distributed_clock::min_time;
-
- // TODO(austin): We want an "OnStart" callback for each node rather than
- // running until the last node.
-
- for (std::unique_ptr<State> &state : states_) {
- VLOG(1) << "Start time is " << state->monotonic_start_time(0)
- << " for node " << MaybeNodeName(state->event_loop()->node())
- << "now " << state->monotonic_now();
- if (state->monotonic_start_time(0) == monotonic_clock::min_time) {
- continue;
- }
- // And start computing the start time on the distributed clock now that
- // that works.
- start_time = std::max(
- start_time, state->ToDistributedClock(state->monotonic_start_time(0)));
- }
-
- // TODO(austin): If a node doesn't have a start time, we might not queue
- // enough. If this happens, we'll explode with a frozen error eventually.
-
- CHECK_GE(start_time, distributed_clock::epoch())
- << ": Hmm, we have a node starting before the start of time. Offset "
- "everything.";
-
// Forwarding is tracked per channel. If it is enabled, we want to turn it
// off. Otherwise messages replayed will get forwarded across to the other
// nodes, and also replayed on the other nodes. This may not satisfy all
@@ -429,7 +457,7 @@
states_[configuration::GetNodeIndex(configuration(), node)].get();
const Channel *remapped_channel =
- RemapChannel(state->event_loop(), channel);
+ RemapChannel(state->event_loop(), node, channel);
event_loop_factory_->DisableForwarding(remapped_channel);
}
@@ -438,6 +466,37 @@
// from both the real message bridge and simulated message bridge.
event_loop_factory_->DisableStatistics();
}
+}
+
+void LogReader::Register(SimulatedEventLoopFactory *event_loop_factory) {
+ RegisterWithoutStarting(event_loop_factory);
+ // We want to start the log file at the last start time of the log files
+ // from all the nodes. Compute how long each node's simulation needs to run
+ // to move time to this point.
+ distributed_clock::time_point start_time = distributed_clock::min_time;
+
+ // TODO(austin): We want an "OnStart" callback for each node rather than
+ // running until the last node.
+
+ for (std::unique_ptr<State> &state : states_) {
+ VLOG(1) << "Start time is " << state->monotonic_start_time(0)
+ << " for node " << MaybeNodeName(state->node()) << "now "
+ << state->monotonic_now();
+ if (state->monotonic_start_time(0) == monotonic_clock::min_time) {
+ continue;
+ }
+ // And start computing the start time on the distributed clock now that
+ // that works.
+ start_time = std::max(
+ start_time, state->ToDistributedClock(state->monotonic_start_time(0)));
+ }
+
+ // TODO(austin): If a node doesn't have a start time, we might not queue
+ // enough. If this happens, we'll explode with a frozen error eventually.
+
+ CHECK_GE(start_time, distributed_clock::epoch())
+ << ": Hmm, we have a node starting before the start of time. Offset "
+ "everything.";
// While we are starting the system up, we might be relying on matching data
// to timestamps on log files where the timestamp log file starts before the
@@ -478,23 +537,53 @@
}
void LogReader::Register(EventLoop *event_loop) {
+ Register(event_loop, event_loop->node());
+}
+
+void LogReader::Register(EventLoop *event_loop, const Node *node) {
State *state =
- states_[configuration::GetNodeIndex(configuration(), event_loop->node())]
- .get();
+ states_[configuration::GetNodeIndex(configuration(), node)].get();
+
+ // If we didn't find any log files with data in them, we won't ever get a
+ // callback or be live. So skip the rest of the setup.
+ if (state->OldestMessageTime() == BootTimestamp::max_time()) {
+ return;
+ }
+ ++live_nodes_;
+
+ if (event_loop_factory_ != nullptr) {
+ event_loop_factory_->GetNodeEventLoopFactory(node)->OnStartup(
+ [this, event_loop, node]() {
+ RegisterDuringStartup(event_loop, node);
+ });
+ } else {
+ RegisterDuringStartup(event_loop, node);
+ }
+}
+
+void LogReader::RegisterDuringStartup(EventLoop *event_loop, const Node *node) {
+ if (event_loop) {
+ CHECK(event_loop->configuration() == configuration());
+ }
+
+ State *state =
+ states_[configuration::GetNodeIndex(configuration(), node)].get();
state->set_event_loop(event_loop);
// We don't run timing reports when trying to print out logged data, because
// otherwise we would end up printing out the timing reports themselves...
// This is only really relevant when we are replaying into a simulation.
- event_loop->SkipTimingReport();
- event_loop->SkipAosLog();
+ if (event_loop) {
+ event_loop->SkipTimingReport();
+ event_loop->SkipAosLog();
+ }
for (size_t logged_channel_index = 0;
logged_channel_index < logged_configuration()->channels()->size();
++logged_channel_index) {
const Channel *channel = RemapChannel(
- event_loop,
+ event_loop, node,
logged_configuration()->channels()->Get(logged_channel_index));
if (channel->logger() == LoggerConfig::NOT_LOGGED) {
@@ -502,45 +591,58 @@
}
message_bridge::NoncausalOffsetEstimator *filter = nullptr;
- RemoteMessageSender *remote_timestamp_sender = nullptr;
State *source_state = nullptr;
-
- if (!configuration::ChannelIsSendableOnNode(channel, event_loop->node()) &&
- configuration::ChannelIsReadableOnNode(channel, event_loop->node())) {
- // We've got a message which is being forwarded to this node.
+ if (!configuration::ChannelIsSendableOnNode(channel, node) &&
+ configuration::ChannelIsReadableOnNode(channel, node)) {
const Node *source_node = configuration::GetNode(
- event_loop->configuration(), channel->source_node()->string_view());
- filter = GetFilter(event_loop->node(), source_node);
+ configuration(), channel->source_node()->string_view());
- // Delivery timestamps are supposed to be logged back on the source node.
- // Configure remote timestamps to be sent.
- const Connection *connection =
- configuration::ConnectionToNode(channel, event_loop->node());
- const bool delivery_time_is_logged =
- configuration::ConnectionDeliveryTimeIsLoggedOnNode(connection,
- source_node);
+ // We've got a message which is being forwarded to this node.
+ filter = GetFilter(node, source_node);
source_state =
states_[configuration::GetNodeIndex(configuration(), source_node)]
.get();
-
- if (delivery_time_is_logged) {
- remote_timestamp_sender =
- source_state->RemoteTimestampSender(channel, connection);
- }
}
- state->SetChannel(
- logged_channel_index,
- configuration::ChannelIndex(event_loop->configuration(), channel),
- event_loop->MakeRawSender(channel), filter, remote_timestamp_sender,
- source_state);
+ // We are the source, and it is forwarded.
+ const bool is_forwarded =
+ configuration::ChannelIsSendableOnNode(channel, node) &&
+ configuration::ConnectionCount(channel);
+
+ state->SetChannel(logged_channel_index,
+ configuration::ChannelIndex(configuration(), channel),
+ event_loop ? event_loop->MakeRawSender(channel) : nullptr,
+ filter, is_forwarded, source_state);
+
+ if (is_forwarded) {
+ const Node *source_node = configuration::GetNode(
+ configuration(), channel->source_node()->string_view());
+
+ for (const Connection *connection : *channel->destination_nodes()) {
+ const bool delivery_time_is_logged =
+ configuration::ConnectionDeliveryTimeIsLoggedOnNode(connection,
+ source_node);
+
+ if (delivery_time_is_logged) {
+ State *destination_state =
+ states_[configuration::GetNodeIndex(
+ configuration(), connection->name()->string_view())]
+ .get();
+ destination_state->SetRemoteTimestampSender(
+ logged_channel_index,
+ event_loop ? state->RemoteTimestampSender(channel, connection)
+ : nullptr);
+ }
+ }
+ }
}
- // If we didn't find any log files with data in them, we won't ever get a
- // callback or be live. So skip the rest of the setup.
- if (state->OldestMessageTime() == monotonic_clock::max_time) {
+ if (!event_loop) {
+ state->ClearRemoteTimestampSenders();
+ state->set_timer_handler(nullptr);
+ state->set_startup_timer(nullptr);
return;
}
@@ -548,7 +650,7 @@
VLOG(1) << "Starting sending " << MaybeNodeName(state->event_loop()->node())
<< "at " << state->event_loop()->context().monotonic_event_time
<< " now " << state->monotonic_now();
- if (state->OldestMessageTime() == monotonic_clock::max_time) {
+ if (state->OldestMessageTime() == BootTimestamp::max_time()) {
--live_nodes_;
VLOG(1) << MaybeNodeName(state->event_loop()->node()) << "Node down!";
if (exit_on_finish_ && live_nodes_ == 0) {
@@ -558,9 +660,9 @@
}
TimestampedMessage timestamped_message = state->PopOldest();
- CHECK_EQ(timestamped_message.monotonic_event_time.boot, 0u);
- CHECK_EQ(timestamped_message.monotonic_remote_time.boot, 0u);
- CHECK_EQ(timestamped_message.monotonic_timestamp_time.boot, 0u);
+
+ CHECK_EQ(timestamped_message.monotonic_event_time.boot,
+ state->boot_count());
const monotonic_clock::time_point monotonic_now =
state->event_loop()->context().monotonic_event_time;
@@ -570,7 +672,8 @@
<< monotonic_now << " trying to send "
<< timestamped_message.monotonic_event_time << " failure "
<< state->DebugString();
- } else if (BootTimestamp{.boot = 0u, .time = monotonic_now} !=
+ } else if (BootTimestamp{.boot = state->boot_count(),
+ .time = monotonic_now} !=
timestamped_message.monotonic_event_time) {
LOG(WARNING) << "Check failed: monotonic_now == "
"timestamped_message.monotonic_event_time) ("
@@ -597,10 +700,13 @@
// happen after the effect even though we know they are at the same
// time. I doubt anyone will notice for a bit, but we should really
// fix that.
+ BootTimestamp monotonic_remote_now =
+ state->monotonic_remote_now(timestamped_message.channel_index);
if (!FLAGS_skip_order_validation) {
- CHECK_LE(
- timestamped_message.monotonic_remote_time.time,
- state->monotonic_remote_now(timestamped_message.channel_index))
+ CHECK_EQ(timestamped_message.monotonic_remote_time.boot,
+ monotonic_remote_now.boot);
+ CHECK_LE(timestamped_message.monotonic_remote_time,
+ monotonic_remote_now)
<< state->event_loop()->node()->name()->string_view() << " to "
<< state->remote_node(timestamped_message.channel_index)
->name()
@@ -610,9 +716,13 @@
logged_configuration()->channels()->Get(
timestamped_message.channel_index))
<< " " << state->DebugString();
- } else if (timestamped_message.monotonic_remote_time.time >
- state->monotonic_remote_now(
- timestamped_message.channel_index)) {
+ } else if (monotonic_remote_now.boot !=
+ timestamped_message.monotonic_remote_time.boot) {
+ LOG(WARNING) << "Missmatched boots, " << monotonic_remote_now.boot
+ << " vs "
+ << timestamped_message.monotonic_remote_time.boot;
+ } else if (timestamped_message.monotonic_remote_time >
+ monotonic_remote_now) {
LOG(WARNING)
<< "Check failed: timestamped_message.monotonic_remote_time < "
"state->monotonic_remote_now(timestamped_message.channel_"
@@ -665,7 +775,8 @@
<< timestamped_message.channel_index << ", "
<< configuration::CleanedChannelToString(
logged_configuration()->channels()->Get(
- timestamped_message.channel_index));
+ timestamped_message.channel_index))
+ << " " << timestamped_message;
// The user might be working with log files from 1 node but forgot to
// configure the infrastructure to log data for a remote channel on that
@@ -713,7 +824,7 @@
// rest. It is confusing when part of your data gets replayed but not
// all. Read the rest of the messages and drop them on the floor while
// doing some basic validation.
- while (state->OldestMessageTime() != monotonic_clock::max_time) {
+ while (state->OldestMessageTime() != BootTimestamp::max_time()) {
TimestampedMessage next = state->PopOldest();
// Make sure that once we have seen the last message on a channel,
// data doesn't start back up again. If the user wants to play
@@ -727,8 +838,9 @@
LOG(FATAL)
<< "Found missing data in the middle of the log file on "
"channel "
- << next.channel_index << " Last "
- << last_message[next.channel_index] << state->DebugString();
+ << next.channel_index << " " << next << " Last "
+ << last_message[next.channel_index] << " "
+ << state->DebugString();
}
}
}
@@ -745,16 +857,26 @@
{.multi_line = false, .max_vector_size = 100});
}
- const monotonic_clock::time_point next_time = state->OldestMessageTime();
- if (next_time != monotonic_clock::max_time) {
+ const BootTimestamp next_time = state->OldestMessageTime();
+ if (next_time != BootTimestamp::max_time()) {
+ if (next_time.boot != state->boot_count()) {
+ VLOG(1) << "Next message for "
+ << MaybeNodeName(state->event_loop()->node())
+ << "is on the next boot, " << next_time << " now is "
+ << state->monotonic_now();
+ CHECK(event_loop_factory_);
+ state->RunOnEnd();
+ return;
+ }
VLOG(1) << "Scheduling " << MaybeNodeName(state->event_loop()->node())
- << "wakeup for " << next_time << "("
- << state->ToDistributedClock(next_time)
+ << "wakeup for " << next_time.time << "("
+ << state->ToDistributedClock(next_time.time)
<< " distributed), now is " << state->monotonic_now();
- state->Setup(next_time);
+ state->Setup(next_time.time);
} else {
VLOG(1) << MaybeNodeName(state->event_loop()->node())
<< "No next message, scheduling shutdown";
+ state->RunOnEnd();
// Set a timer up immediately after now to die. If we don't do this,
// then the senders waiting on the message we just read will never get
// called.
@@ -769,10 +891,15 @@
<< state->monotonic_now();
}));
- ++live_nodes_;
-
- if (state->OldestMessageTime() != monotonic_clock::max_time) {
- event_loop->OnRun([state]() { state->Setup(state->OldestMessageTime()); });
+ if (state->OldestMessageTime() != BootTimestamp::max_time()) {
+ state->set_startup_timer(
+ event_loop->AddTimer([state]() { state->RunOnStart(); }));
+ event_loop->OnRun([state]() {
+ BootTimestamp next_time = state->OldestMessageTime();
+ CHECK_EQ(next_time.boot, state->boot_count());
+ state->Setup(next_time.time);
+ state->SetupStartupTimer();
+ });
}
}
@@ -1101,6 +1228,7 @@
}
const Channel *LogReader::RemapChannel(const EventLoop *event_loop,
+ const Node *node,
const Channel *channel) {
std::string_view channel_name = channel->name()->string_view();
std::string_view channel_type = channel->type()->string_view();
@@ -1115,8 +1243,8 @@
VLOG(2) << "Going to remap channel " << channel_name << " " << channel_type;
const Channel *remapped_channel = configuration::GetChannel(
- event_loop->configuration(), channel_name, channel_type,
- event_loop->name(), event_loop->node());
+ configuration(), channel_name, channel_type,
+ event_loop ? event_loop->name() : "log_reader", node);
CHECK(remapped_channel != nullptr)
<< ": Unable to send {\"name\": \"" << channel_name << "\", \"type\": \""
@@ -1125,8 +1253,9 @@
return remapped_channel;
}
-LogReader::State::State(std::unique_ptr<TimestampMapper> timestamp_mapper)
- : timestamp_mapper_(std::move(timestamp_mapper)) {}
+LogReader::State::State(std::unique_ptr<TimestampMapper> timestamp_mapper,
+ const Node *node)
+ : timestamp_mapper_(std::move(timestamp_mapper)), node_(node) {}
void LogReader::State::AddPeer(State *peer) {
if (timestamp_mapper_ && peer->timestamp_mapper_) {
@@ -1134,12 +1263,9 @@
}
}
-EventLoop *LogReader::State::SetNodeEventLoopFactory(
+void LogReader::State::SetNodeEventLoopFactory(
NodeEventLoopFactory *node_event_loop_factory) {
node_event_loop_factory_ = node_event_loop_factory;
- event_loop_unique_ptr_ =
- node_event_loop_factory_->MakeEventLoop("log_reader");
- return event_loop_unique_ptr_.get();
}
void LogReader::State::SetChannelCount(size_t count) {
@@ -1151,22 +1277,23 @@
queue_index_map_.resize(count);
}
+void LogReader::State::SetRemoteTimestampSender(
+ size_t logged_channel_index, RemoteMessageSender *remote_timestamp_sender) {
+ remote_timestamp_senders_[logged_channel_index] = remote_timestamp_sender;
+}
+
void LogReader::State::SetChannel(
size_t logged_channel_index, size_t factory_channel_index,
std::unique_ptr<RawSender> sender,
- message_bridge::NoncausalOffsetEstimator *filter,
- RemoteMessageSender *remote_timestamp_sender, State *source_state) {
+ message_bridge::NoncausalOffsetEstimator *filter, bool is_forwarded,
+ State *source_state) {
channels_[logged_channel_index] = std::move(sender);
filters_[logged_channel_index] = filter;
- remote_timestamp_senders_[logged_channel_index] = remote_timestamp_sender;
+ channel_source_state_[logged_channel_index] = source_state;
- if (source_state) {
- channel_source_state_[logged_channel_index] = source_state;
-
- if (remote_timestamp_sender != nullptr) {
- source_state->queue_index_map_[logged_channel_index] =
- std::make_unique<std::vector<State::ContiguousSentTimestamp>>();
- }
+ if (is_forwarded) {
+ queue_index_map_[logged_channel_index] =
+ std::make_unique<std::vector<State::ContiguousSentTimestamp>>();
}
factory_channel_index_[logged_channel_index] = factory_channel_index;
@@ -1174,12 +1301,14 @@
bool LogReader::State::Send(const TimestampedMessage ×tamped_message) {
aos::RawSender *sender = channels_[timestamped_message.channel_index].get();
+ CHECK(sender);
uint32_t remote_queue_index = 0xffffffff;
if (remote_timestamp_senders_[timestamped_message.channel_index] != nullptr) {
+ State *source_state =
+ CHECK_NOTNULL(channel_source_state_[timestamped_message.channel_index]);
std::vector<ContiguousSentTimestamp> *queue_index_map = CHECK_NOTNULL(
- CHECK_NOTNULL(channel_source_state_[timestamped_message.channel_index])
- ->queue_index_map_[timestamped_message.channel_index]
+ source_state->queue_index_map_[timestamped_message.channel_index]
.get());
struct SentTimestamp {
@@ -1187,10 +1316,11 @@
uint32_t queue_index;
} search;
- CHECK_EQ(timestamped_message.monotonic_remote_time.boot, 0u);
+ CHECK_EQ(timestamped_message.monotonic_remote_time.boot,
+ source_state->boot_count());
search.monotonic_event_time =
timestamped_message.monotonic_remote_time.time;
- search.queue_index = timestamped_message.remote_queue_index;
+ search.queue_index = timestamped_message.remote_queue_index.index;
// Find the sent time if available.
auto element = std::lower_bound(
@@ -1220,28 +1350,30 @@
// other node isn't done yet. So there is no send time, but there is a
// receive time.
if (element != queue_index_map->end()) {
- CHECK_EQ(timestamped_message.monotonic_remote_time.boot, 0u);
+ CHECK_EQ(timestamped_message.monotonic_remote_time.boot,
+ source_state->boot_count());
CHECK_GE(timestamped_message.monotonic_remote_time.time,
element->starting_monotonic_event_time);
CHECK_LE(timestamped_message.monotonic_remote_time.time,
element->ending_monotonic_event_time);
- CHECK_GE(timestamped_message.remote_queue_index,
+ CHECK_GE(timestamped_message.remote_queue_index.index,
element->starting_queue_index);
- CHECK_LE(timestamped_message.remote_queue_index,
+ CHECK_LE(timestamped_message.remote_queue_index.index,
element->ending_queue_index);
- remote_queue_index = timestamped_message.remote_queue_index +
+ remote_queue_index = timestamped_message.remote_queue_index.index +
element->actual_queue_index -
element->starting_queue_index;
} else {
VLOG(1) << "No timestamp match in the map.";
}
+ CHECK_EQ(timestamped_message.monotonic_remote_time.boot,
+ source_state->boot_count());
}
// Send! Use the replayed queue index here instead of the logged queue index
// for the remote queue index. This makes re-logging work.
- CHECK_EQ(timestamped_message.monotonic_remote_time.boot, 0u);
const bool sent = sender->Send(
timestamped_message.data.message().data()->Data(),
timestamped_message.data.message().data()->size(),
@@ -1255,15 +1387,15 @@
if (!sent) return false;
if (queue_index_map_[timestamped_message.channel_index]) {
+ CHECK_EQ(timestamped_message.monotonic_event_time.boot, boot_count());
if (queue_index_map_[timestamped_message.channel_index]->empty()) {
// Nothing here, start a range with 0 length.
ContiguousSentTimestamp timestamp;
- CHECK_EQ(timestamped_message.monotonic_event_time.boot, 0u);
timestamp.starting_monotonic_event_time =
timestamp.ending_monotonic_event_time =
timestamped_message.monotonic_event_time.time;
timestamp.starting_queue_index = timestamp.ending_queue_index =
- timestamped_message.queue_index;
+ timestamped_message.queue_index.index;
timestamp.actual_queue_index = sender->sent_queue_index();
queue_index_map_[timestamped_message.channel_index]->emplace_back(
timestamp);
@@ -1273,20 +1405,18 @@
ContiguousSentTimestamp *back =
&queue_index_map_[timestamped_message.channel_index]->back();
if ((back->starting_queue_index - back->actual_queue_index) ==
- (timestamped_message.queue_index - sender->sent_queue_index())) {
- back->ending_queue_index = timestamped_message.queue_index;
- CHECK_EQ(timestamped_message.monotonic_event_time.boot, 0u);
+ (timestamped_message.queue_index.index - sender->sent_queue_index())) {
+ back->ending_queue_index = timestamped_message.queue_index.index;
back->ending_monotonic_event_time =
timestamped_message.monotonic_event_time.time;
} else {
// Otherwise, make a new one.
- CHECK_EQ(timestamped_message.monotonic_event_time.boot, 0u);
ContiguousSentTimestamp timestamp;
timestamp.starting_monotonic_event_time =
timestamp.ending_monotonic_event_time =
timestamped_message.monotonic_event_time.time;
timestamp.starting_queue_index = timestamp.ending_queue_index =
- timestamped_message.queue_index;
+ timestamped_message.queue_index.index;
timestamp.actual_queue_index = sender->sent_queue_index();
queue_index_map_[timestamped_message.channel_index]->emplace_back(
timestamp);
@@ -1298,6 +1428,9 @@
// map.
} else if (remote_timestamp_senders_[timestamped_message.channel_index] !=
nullptr) {
+ State *source_state =
+ CHECK_NOTNULL(channel_source_state_[timestamped_message.channel_index]);
+
flatbuffers::FlatBufferBuilder fbb;
fbb.ForceDefaults(true);
flatbuffers::Offset<flatbuffers::Vector<uint8_t>> boot_uuid_offset =
@@ -1316,7 +1449,8 @@
sender->realtime_sent_time().time_since_epoch().count());
message_header_builder.add_queue_index(sender->sent_queue_index());
- CHECK_EQ(timestamped_message.monotonic_remote_time.boot, 0u);
+ CHECK_EQ(timestamped_message.monotonic_remote_time.boot,
+ source_state->boot_count());
message_header_builder.add_monotonic_remote_time(
timestamped_message.monotonic_remote_time.time.time_since_epoch()
.count());
@@ -1328,10 +1462,10 @@
fbb.Finish(message_header_builder.Finish());
- CHECK_EQ(timestamped_message.monotonic_timestamp_time.boot, 0u);
remote_timestamp_senders_[timestamped_message.channel_index]->Send(
FlatbufferDetachedBuffer<RemoteMessage>(fbb.Release()),
- timestamped_message.monotonic_timestamp_time.time);
+ timestamped_message.monotonic_timestamp_time,
+ source_state->boot_count());
}
return true;
@@ -1362,7 +1496,7 @@
void LogReader::RemoteMessageSender::Send(
FlatbufferDetachedBuffer<RemoteMessage> remote_message,
- monotonic_clock::time_point monotonic_timestamp_time) {
+ BootTimestamp monotonic_timestamp_time, size_t source_boot_count) {
// There are 2 variants of logs.
// 1) Logs without monotonic_timestamp_time
// 2) Logs with monotonic_timestamp_time
@@ -1386,20 +1520,22 @@
// timestamp to distinguish 2 and 3, and ignore 1. If we don't have a
// monotonic_timestamp_time, this means the message was logged locally and
// remote timestamps can be ignored.
- if (monotonic_timestamp_time == monotonic_clock::min_time) {
+ if (monotonic_timestamp_time == BootTimestamp::min_time()) {
return;
}
+ CHECK_EQ(monotonic_timestamp_time.boot, source_boot_count);
+
remote_timestamps_.emplace(
std::upper_bound(
remote_timestamps_.begin(), remote_timestamps_.end(),
- monotonic_timestamp_time,
+ monotonic_timestamp_time.time,
[](const aos::monotonic_clock::time_point monotonic_timestamp_time,
const Timestamp ×tamp) {
return monotonic_timestamp_time <
timestamp.monotonic_timestamp_time;
}),
- std::move(remote_message), monotonic_timestamp_time);
+ std::move(remote_message), monotonic_timestamp_time.time);
ScheduleTimestamp();
}
@@ -1475,18 +1611,8 @@
timestamp_mapper_->PopFront();
SeedSortedMessages();
- CHECK_EQ(result.monotonic_remote_time.boot, 0u);
+ CHECK_EQ(result.monotonic_event_time.boot, boot_count());
- if (result.monotonic_remote_time.time != monotonic_clock::min_time) {
- message_bridge::NoncausalOffsetEstimator *filter =
- filters_[result.channel_index];
- CHECK(filter != nullptr);
-
- // TODO(austin): We probably want to push this down into the timestamp
- // mapper directly.
- // TODO(austin): This hard-codes the boot to 0. We need to fix that.
- filter->Pop(event_loop_->node(), {0, event_loop_->monotonic_now()});
- }
VLOG(1) << "Popped " << result
<< configuration::CleanedChannelToString(
event_loop_->configuration()->channels()->Get(
@@ -1494,18 +1620,17 @@
return result;
}
-monotonic_clock::time_point LogReader::State::OldestMessageTime() const {
+BootTimestamp LogReader::State::OldestMessageTime() const {
if (timestamp_mapper_ == nullptr) {
- return monotonic_clock::max_time;
+ return BootTimestamp::max_time();
}
TimestampedMessage *result_ptr = timestamp_mapper_->Front();
if (result_ptr == nullptr) {
- return monotonic_clock::max_time;
+ return BootTimestamp::max_time();
}
- CHECK_EQ(result_ptr->monotonic_event_time.boot, 0u);
VLOG(2) << MaybeNodeName(event_loop_->node()) << "oldest message at "
<< result_ptr->monotonic_event_time.time;
- return result_ptr->monotonic_event_time.time;
+ return result_ptr->monotonic_event_time;
}
void LogReader::State::SeedSortedMessages() {
@@ -1516,6 +1641,9 @@
}
void LogReader::State::Deregister() {
+ if (started_ && !stopped_) {
+ RunOnEnd();
+ }
for (size_t i = 0; i < channels_.size(); ++i) {
channels_[i].reset();
}