blob: 4a54f712c9fa04420351e3acfa585f69e865f9c7 [file] [log] [blame]
#include "aos/starter/starter_rpc_lib.h"
#include <algorithm>
#include <ostream>
#include "absl/log/check.h"
#include "absl/log/log.h"
#include "flatbuffers/buffer.h"
#include "flatbuffers/flatbuffer_builder.h"
#include "flatbuffers/string.h"
#include "flatbuffers/vector.h"
#include "aos/events/context.h"
#include "aos/events/shm_event_loop.h"
#include "aos/flatbuffer_merge.h"
#include "aos/starter/starterd_lib.h"
namespace aos::starter {
namespace {
State ExpectedStateForCommand(Command command) {
switch (command) {
case Command::START:
case Command::RESTART:
return State::RUNNING;
case Command::STOP:
return State::STOPPED;
}
return State::STOPPED;
}
} // namespace
const aos::starter::ApplicationStatus *FindApplicationStatus(
const aos::starter::Status &status, std::string_view name) {
if (!status.has_statuses()) {
return nullptr;
}
auto statuses = status.statuses();
auto search =
std::find_if(statuses->begin(), statuses->end(),
[name](const aos::starter::ApplicationStatus *app_status) {
return app_status->has_name() &&
app_status->name()->string_view() == name;
});
if (search == statuses->end()) {
return nullptr;
}
return *search;
}
std::string_view FindApplication(const std::string_view name,
const aos::Configuration *config) {
std::string_view app_name = name;
for (const auto app : *config->applications()) {
if (app->name()->string_view() == name) {
return name;
}
}
for (const auto app : *config->applications()) {
if (app->has_executable_name() &&
app->executable_name()->string_view() == name) {
app_name = app->name()->string_view();
break;
}
}
return app_name;
}
StarterClient::StarterClient(EventLoop *event_loop)
: event_loop_(event_loop),
timeout_timer_(event_loop_->AddTimer([this]() { Timeout(); })),
cmd_sender_(event_loop_->MakeSender<StarterRpc>("/aos")) {
timeout_timer_->set_name("rpc_timeout");
if (configuration::MultiNode(event_loop_->configuration())) {
for (const aos::Node *node :
configuration::GetNodes(event_loop_->configuration())) {
const Channel *channel =
StatusChannelForNode(event_loop_->configuration(), node);
CHECK(channel != nullptr) << ": Failed to find channel /aos for "
<< Status::GetFullyQualifiedName() << " on "
<< node->name()->string_view();
if (!configuration::ChannelIsReadableOnNode(channel,
event_loop_->node())) {
VLOG(1) << "Status channel "
<< configuration::StrippedChannelToString(channel)
<< " is not readable on "
<< event_loop_->node()->name()->string_view();
} else if (!configuration::ChannelIsReadableOnNode(
StarterRpcChannelForNode(event_loop_->configuration(),
event_loop_->node()),
node)) {
// Don't attempt to construct a status fetcher if the other node won't
// even be able to receive our commands.
VLOG(1) << "StarterRpc channel for "
<< event_loop_->node()->name()->string_view()
<< " is not readable on " << node->name()->string_view();
} else {
status_fetchers_[node->name()->str()] =
event_loop_->MakeFetcher<Status>(channel->name()->string_view());
event_loop_->MakeNoArgWatcher<Status>(channel->name()->string_view(),
[this]() {
if (CheckCommandsSucceeded()) {
Succeed();
}
});
}
}
} else {
status_fetchers_[""] = event_loop_->MakeFetcher<Status>("/aos");
event_loop_->MakeNoArgWatcher<Status>("/aos", [this]() {
if (CheckCommandsSucceeded()) {
Succeed();
}
});
}
}
void StarterClient::SendCommands(
const std::vector<ApplicationCommand> &commands,
monotonic_clock::duration timeout) {
CHECK(current_commands_.empty());
for (auto &pair : status_fetchers_) {
pair.second.Fetch();
}
const bool is_multi_node =
aos::configuration::MultiNode(event_loop_->configuration());
for (const auto &command : commands) {
auto builder = cmd_sender_.MakeBuilder();
const auto application_offset =
builder.fbb()->CreateString(command.application);
std::vector<flatbuffers::Offset<flatbuffers::String>> node_offsets;
CHECK(!command.nodes.empty())
<< "At least one node must be specified for application "
<< command.application;
for (const aos::Node *node : command.nodes) {
const std::string node_name((node == nullptr) ? "" : node->name()->str());
if (status_fetchers_.count(node_name) == 0) {
if (is_multi_node) {
LOG(FATAL) << "Node \"" << node_name
<< "\" must be configured to both receive StarterRpc "
"messages from \""
<< event_loop_->node()->name()->string_view()
<< "\" as well as to send starter Status messages back.";
} else {
LOG(FATAL) << "On single-node configs, use an empty string for the "
"node name.";
}
}
if (status_fetchers_[node_name].get() == nullptr) {
LOG(WARNING) << ": No status available for node " << node_name
<< "; not executing commands for that node.";
continue;
}
if (is_multi_node) {
node_offsets.push_back(builder.fbb()->CreateString(node_name));
}
const ApplicationStatus *last_status = FindApplicationStatus(
*status_fetchers_[node_name], command.application);
CHECK(last_status != nullptr);
current_commands_[node_name].push_back(CommandStatus{
.expected_state = ExpectedStateForCommand(command.command),
.application = std::string(command.application),
.old_id = std::nullopt});
// If we are restarting, then we need to track what the current ID of the
// process is to ensure that it actually got restarted. For just starting,
// we leave the application running and so don't care.
if (command.command == Command::RESTART && last_status->has_id()) {
current_commands_[node_name].back().old_id = last_status->id();
}
}
flatbuffers::Offset<
flatbuffers::Vector<flatbuffers::Offset<flatbuffers::String>>>
nodes_offset;
if (is_multi_node) {
nodes_offset = builder.fbb()->CreateVector(node_offsets);
}
auto command_builder = builder.MakeBuilder<StarterRpc>();
command_builder.add_command(command.command);
command_builder.add_name(application_offset);
if (is_multi_node) {
command_builder.add_nodes(nodes_offset);
}
builder.CheckOk(builder.Send(command_builder.Finish()));
}
timeout_timer_->Schedule(event_loop_->monotonic_now() + timeout);
}
bool StarterClient::CheckCommandsSucceeded() {
if (current_commands_.empty()) {
return false;
}
for (auto &pair : status_fetchers_) {
pair.second.Fetch();
}
bool succeeded = true;
for (const auto &pair : current_commands_) {
if (pair.second.empty()) {
continue;
}
CHECK(status_fetchers_[pair.first].get() != nullptr)
<< ": No status available for node " << pair.first;
const Status &status = *status_fetchers_[pair.first];
for (const auto &command : pair.second) {
const ApplicationStatus *application_status =
FindApplicationStatus(status, command.application);
CHECK(application_status != nullptr);
if (application_status->state() == command.expected_state) {
if (command.expected_state == State::RUNNING &&
application_status->id() == command.old_id) {
succeeded = false;
}
} else {
succeeded = false;
}
}
}
return succeeded;
}
void StarterClient::Timeout() {
// Clear commands prior to calling handlers to allow the handler to call
// SendCommands() again if desired.
current_commands_.clear();
if (timeout_handler_) {
timeout_handler_();
}
}
void StarterClient::Succeed() {
// Clear commands prior to calling handlers to allow the handler to call
// SendCommands() again if desired.
current_commands_.clear();
// Clear the timer before calling success handler, in case the success
// handler needs to modify timeout handler.
timeout_timer_->Disable();
if (success_handler_) {
success_handler_();
}
}
bool SendCommandBlocking(aos::starter::Command command, std::string_view name,
const aos::Configuration *config,
std::chrono::milliseconds timeout,
std::vector<const aos::Node *> nodes) {
return SendCommandBlocking({{command, name, nodes}}, config, timeout);
}
bool SendCommandBlocking(const std::vector<ApplicationCommand> &commands,
const aos::Configuration *config,
std::chrono::milliseconds timeout) {
aos::ShmEventLoop event_loop(config);
event_loop.SkipAosLog();
StarterClient client(&event_loop);
// Wait until event loop starts to send all commands so the watcher is ready
event_loop.OnRun([&commands, &client, timeout]() {
client.SendCommands(commands, timeout);
});
// If still waiting after timeout milliseconds, exit the loop
client.SetTimeoutHandler([&event_loop]() { event_loop.Exit(); });
bool success = false;
client.SetSuccessHandler([&event_loop, &success]() {
success = true;
event_loop.Exit();
});
event_loop.Run();
return success;
}
const std::optional<
std::pair<aos::monotonic_clock::time_point,
FlatbufferDetachedBuffer<aos::starter::ApplicationStatus>>>
GetStatus(std::string_view name, const Configuration *config,
const aos::Node *node) {
ShmEventLoop event_loop(config);
event_loop.SkipAosLog();
auto status_fetcher = event_loop.MakeFetcher<aos::starter::Status>(
StatusChannelForNode(config, node)->name()->string_view());
status_fetcher.Fetch();
if (status_fetcher.get() != nullptr) {
const aos::starter::ApplicationStatus *status =
FindApplicationStatus(*status_fetcher, name);
if (status != nullptr) {
return std::make_pair(status_fetcher.context().monotonic_remote_time,
aos::CopyFlatBuffer(status));
}
}
return std::nullopt;
}
std::optional<std::pair<aos::monotonic_clock::time_point,
const aos::FlatbufferVector<aos::starter::Status>>>
GetStarterStatus(const aos::Configuration *config, const aos::Node *node) {
ShmEventLoop event_loop(config);
event_loop.SkipAosLog();
auto status_fetcher = event_loop.MakeFetcher<aos::starter::Status>(
StatusChannelForNode(config, node)->name()->string_view());
status_fetcher.Fetch();
return (status_fetcher.get() == nullptr)
? std::nullopt
: std::make_optional(std::make_pair(
status_fetcher.context().monotonic_remote_time,
status_fetcher.CopyFlatBuffer()));
}
} // namespace aos::starter