Make starter client & server work across nodes
So that we can readily start on one node from another node, add in logic
to make it so that starterd will listen to StarterRpc calls from any
available other nodes. Add a StarterClient class that then supports
interacting across nodes.
Also updates the aos_starter command line utility with --node and
--all_nodes flags to support this.
Change-Id: I234b08b3a3a836966f117f55b9e6a321d2b2ff6c
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/starter/BUILD b/aos/starter/BUILD
index 8097f6a..73353f6 100644
--- a/aos/starter/BUILD
+++ b/aos/starter/BUILD
@@ -1,4 +1,5 @@
load("@com_github_google_flatbuffers//:build_defs.bzl", "flatbuffer_cc_library")
+load("//aos:config.bzl", "aos_config")
# This target is everything which should get deployed to the robot.
filegroup(
@@ -36,21 +37,47 @@
],
)
+aos_config(
+ name = "multinode_pingpong_config",
+ src = "multinode_pingpong.json",
+ flatbuffers = [
+ "//aos/events:ping_fbs",
+ "//aos/events:pong_fbs",
+ ":starter_rpc_fbs",
+ ":starter_fbs",
+ "//aos/logging:log_message_fbs",
+ "//aos/events:event_loop_fbs",
+ "//aos/network:message_bridge_client_fbs",
+ "//aos/network:remote_message_fbs",
+ "//aos/network:timestamp_fbs",
+ "//aos/network:message_bridge_server_fbs",
+ ],
+ target_compatible_with = ["@platforms//os:linux"],
+)
+
cc_test(
name = "starter_test",
srcs = ["starter_test.cc"],
data = [
+ ":multinode_pingpong_config",
"//aos/events:ping",
"//aos/events:pingpong_config",
"//aos/events:pong",
],
+ linkopts = ["-lstdc++fs"],
shard_count = 3,
- target_compatible_with = ["@platforms//os:linux"],
+ # The roborio compiler doesn't support <filesystem>.
+ target_compatible_with =
+ select({
+ "//tools/platforms/hardware:roborio": ["@platforms//:incompatible"],
+ "//conditions:default": ["@platforms//os:linux"],
+ }),
deps = [
":starter_rpc_lib",
":starterd_lib",
"//aos/events:ping_fbs",
"//aos/events:pong_fbs",
+ "//aos/events:simulated_event_loop",
"//aos/testing:googletest",
"//aos/testing:path",
"//aos/testing:tmpdir",
@@ -73,9 +100,11 @@
srcs = ["starter_rpc_lib.cc"],
hdrs = ["starter_rpc_lib.h"],
target_compatible_with = ["@platforms//os:linux"],
+ visibility = ["//visibility:public"],
deps = [
":starter_fbs",
":starter_rpc_fbs",
+ ":starterd_lib",
"//aos:configuration",
"//aos:init",
"//aos/events:shm_event_loop",
diff --git a/aos/starter/multinode_pingpong.json b/aos/starter/multinode_pingpong.json
new file mode 100644
index 0000000..0bb1ee0
--- /dev/null
+++ b/aos/starter/multinode_pingpong.json
@@ -0,0 +1,200 @@
+{
+ "channels": [
+ {
+ "name": "/pi1/aos",
+ "type": "aos.logging.LogMessageFbs",
+ "source_node": "pi1",
+ "frequency": 200,
+ "num_senders": 20,
+ "max_size": 2048
+ },
+ {
+ "name": "/pi2/aos",
+ "type": "aos.logging.LogMessageFbs",
+ "source_node": "pi2",
+ "frequency": 200,
+ "num_senders": 20,
+ "max_size": 2048
+ },
+ {
+ "name": "/pi1/aos",
+ "type": "aos.starter.Status",
+ "source_node": "pi1",
+ "frequency": 10,
+ "num_senders": 2,
+ "destination_nodes": [
+ {
+ "name": "pi2",
+ "timestamp_logger": "NOT_LOGGED"
+ }
+ ]
+ },
+ {
+ "name": "/pi2/aos",
+ "type": "aos.starter.Status",
+ "source_node": "pi2",
+ "frequency": 10,
+ "num_senders": 2,
+ "destination_nodes": [
+ {
+ "name": "pi1",
+ "timestamp_logger": "NOT_LOGGED"
+ }
+ ]
+ },
+ {
+ "name": "/pi1/aos",
+ "type": "aos.starter.StarterRpc",
+ "source_node": "pi1",
+ "frequency": 10,
+ "num_senders": 2,
+ "destination_nodes": [
+ {
+ "name": "pi2",
+ "timestamp_logger": "NOT_LOGGED"
+ }
+ ]
+ },
+ {
+ "name": "/pi2/aos",
+ "type": "aos.starter.StarterRpc",
+ "source_node": "pi2",
+ "frequency": 10,
+ "num_senders": 2,
+ "destination_nodes": [
+ {
+ "name": "pi1",
+ "timestamp_logger": "NOT_LOGGED"
+ }
+ ]
+ },
+ {
+ "name": "/pi1/aos",
+ "type": "aos.message_bridge.Timestamp",
+ "source_node": "pi1",
+ "frequency": 10,
+ "num_senders": 2,
+ "max_size": 200,
+ "destination_nodes": [
+ {
+ "name": "pi2",
+ "priority": 1,
+ "time_to_live": 5000000,
+ "timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
+ "timestamp_logger_nodes": ["pi1"]
+ }
+ ]
+ },
+ {
+ "name": "/pi2/aos",
+ "type": "aos.message_bridge.Timestamp",
+ "source_node": "pi2",
+ "frequency": 10,
+ "num_senders": 2,
+ "max_size": 200,
+ "destination_nodes": [
+ {
+ "name": "pi1",
+ "priority": 1,
+ "time_to_live": 5000000,
+ "timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
+ "timestamp_logger_nodes": ["pi2"]
+ }
+ ]
+ },
+ {
+ "name": "/pi1/aos",
+ "type": "aos.message_bridge.ServerStatistics",
+ "source_node": "pi1",
+ "frequency": 2
+ },
+ {
+ "name": "/pi2/aos",
+ "type": "aos.message_bridge.ServerStatistics",
+ "source_node": "pi2",
+ "frequency": 2
+ },
+ {
+ "name": "/pi1/aos",
+ "type": "aos.message_bridge.ClientStatistics",
+ "source_node": "pi1",
+ "frequency": 2
+ },
+ {
+ "name": "/pi2/aos",
+ "type": "aos.message_bridge.ClientStatistics",
+ "source_node": "pi2",
+ "frequency": 2
+ },
+ {
+ "name": "/pi1/aos/remote_timestamps/pi2",
+ "type": "aos.message_bridge.RemoteMessage",
+ "logger": "NOT_LOGGED",
+ "source_node": "pi1"
+ },
+ {
+ "name": "/pi2/aos/remote_timestamps/pi1",
+ "type": "aos.message_bridge.RemoteMessage",
+ "logger": "NOT_LOGGED",
+ "source_node": "pi2"
+ },
+ {
+ "name": "/pi1/aos",
+ "type": "aos.timing.Report",
+ "source_node": "pi1",
+ "frequency": 50,
+ "num_senders": 20,
+ "max_size": 2048
+ },
+ {
+ "name": "/pi2/aos",
+ "type": "aos.timing.Report",
+ "source_node": "pi2",
+ "frequency": 50,
+ "num_senders": 20,
+ "max_size": 2048
+ },
+ {
+ "name": "/test",
+ "type": "aos.examples.Ping",
+ "source_node": "pi1"
+ },
+ {
+ "name": "/test",
+ "type": "aos.examples.Pong",
+ "source_node": "pi1"
+ }
+ ],
+ "maps": [
+ {
+ "match": {
+ "name": "/aos*",
+ "source_node": "pi1"
+ },
+ "rename": {
+ "name": "/pi1/aos"
+ }
+ },
+ {
+ "match": {
+ "name": "/aos*",
+ "source_node": "pi2"
+ },
+ "rename": {
+ "name": "/pi2/aos"
+ }
+ }
+ ],
+ "nodes": [
+ {
+ "name": "pi1",
+ "hostname": "pi1",
+ "port": 9971
+ },
+ {
+ "name": "pi2",
+ "hostname": "pi2",
+ "port": 9971
+ }
+ ]
+}
diff --git a/aos/starter/starter_cmd.cc b/aos/starter/starter_cmd.cc
index d306306..ae65ff0 100644
--- a/aos/starter/starter_cmd.cc
+++ b/aos/starter/starter_cmd.cc
@@ -6,6 +6,7 @@
#include <unordered_map>
#include "absl/strings/str_format.h"
+#include "absl/strings/str_join.h"
#include "aos/init.h"
#include "aos/json_to_flatbuffer.h"
#include "aos/time/time.h"
@@ -13,6 +14,11 @@
#include "starter_rpc_lib.h"
DEFINE_string(config, "./config.json", "File path of aos configuration");
+// TODO(james): Bash autocompletion for node names.
+DEFINE_string(
+ node, "",
+ "Node to interact with. If empty, just interact with local node.");
+DEFINE_bool(all_nodes, false, "Interact with all nodes.");
DEFINE_bool(_bash_autocomplete, false,
"Internal use: Outputs commands or applications for use with "
@@ -29,47 +35,71 @@
{"stop", aos::starter::Command::STOP},
{"restart", aos::starter::Command::RESTART}};
-const aos::Node *MaybeMyNode(const aos::Configuration *configuration) {
+std::vector<const aos::Node *> InteractNodes(
+ const aos::Configuration *configuration) {
if (!configuration->has_nodes()) {
- return nullptr;
+ return {nullptr};
}
- return aos::configuration::GetMyNode(configuration);
+ if (!FLAGS_node.empty()) {
+ CHECK(!FLAGS_all_nodes) << "Can't specify both --node and --all_nodes.";
+ return {aos::configuration::GetNode(configuration, FLAGS_node)};
+ }
+
+ if (FLAGS_all_nodes) {
+ return aos::configuration::GetNodes(configuration);
+ }
+
+ return {aos::configuration::GetMyNode(configuration)};
}
-bool ValidApplication(const aos::Configuration *config,
- std::string_view application_name) {
- const aos::Node *node = MaybeMyNode(config);
- const aos::Application *application =
- aos::configuration::GetApplication(config, node, application_name);
- if (application == nullptr) {
- if (node) {
- std::cout << "Unknown application '" << application_name << "' on node '"
- << node->name()->string_view() << "'" << std::endl;
- } else {
- std::cout << "Unknown application '" << application_name << "'"
- << std::endl;
+std::vector<const aos::Node *> InteractNodesForApplication(
+ const aos::Configuration *config, std::string_view application_name) {
+ const std::vector<const aos::Node *> interact_nodes = InteractNodes(config);
+ std::vector<const aos::Node *> application_nodes;
+ std::vector<std::string> debug_node_names;
+ for (const aos::Node *node : interact_nodes) {
+ if (aos::configuration::GetApplication(config, node, application_name) !=
+ nullptr) {
+ application_nodes.push_back(node);
}
- return false;
+ if (node != nullptr) {
+ debug_node_names.push_back(node->name()->str());
+ }
}
- return true;
+
+ if (application_nodes.empty()) {
+ if (interact_nodes.size() == 1 && interact_nodes[0] == nullptr) {
+ std::cout << "Unknown application " << application_name << std::endl;
+ } else {
+ std::cout << "Unknown application " << application_name
+ << " on any of node(s) "
+ << absl::StrJoin(debug_node_names, ", ") << std::endl;
+ }
+ }
+ return application_nodes;
}
void PrintKey() {
- absl::PrintF("%-30s %-8s %-6s %-9s\n", "Name", "State", "PID", "Uptime");
+ absl::PrintF("%-30s %-10s %-8s %-6s %-9s\n", "Name", "Node", "State", "PID",
+ "Uptime");
}
void PrintApplicationStatus(const aos::starter::ApplicationStatus *app_status,
- const aos::monotonic_clock::time_point &time) {
+ const aos::monotonic_clock::time_point &time,
+ const aos::Node *node) {
const auto last_start_time = aos::monotonic_clock::time_point(
chrono::nanoseconds(app_status->last_start_time()));
const auto time_running =
chrono::duration_cast<chrono::seconds>(time - last_start_time);
if (app_status->state() == aos::starter::State::STOPPED) {
- absl::PrintF("%-30s %-8s\n", app_status->name()->string_view(),
+ absl::PrintF("%-30s %-10s %-8s\n", app_status->name()->string_view(),
+ (node == nullptr) ? "none" : node->name()->string_view(),
aos::starter::EnumNameState(app_status->state()));
} else {
- absl::PrintF("%-30s %-8s %-6d %-9s\n", app_status->name()->string_view(),
+ absl::PrintF("%-30s %-10s %-8s %-6d %-9s\n",
+ app_status->name()->string_view(),
+ (node == nullptr) ? "none" : node->name()->string_view(),
aos::starter::EnumNameState(app_status->state()),
app_status->pid(), std::to_string(time_running.count()) + 's');
}
@@ -77,18 +107,30 @@
// Prints the status for all applications.
void GetAllStarterStatus(const aos::Configuration *config) {
- // Print status for all processes.
- const auto optional_status = aos::starter::GetStarterStatus(config);
- if (optional_status) {
- auto status = *optional_status;
- const auto time = aos::monotonic_clock::now();
- PrintKey();
- for (const aos::starter::ApplicationStatus *app_status :
- *status.message().statuses()) {
- PrintApplicationStatus(app_status, time);
+ PrintKey();
+ std::vector<const aos::Node *> missing_nodes;
+ for (const aos::Node *node : InteractNodes(config)) {
+ // Print status for all processes.
+ const auto optional_status = aos::starter::GetStarterStatus(config, node);
+ if (optional_status) {
+ const aos::FlatbufferVector<aos::starter::Status> &status =
+ optional_status->second;
+ const aos::monotonic_clock::time_point time = optional_status->first;
+ for (const aos::starter::ApplicationStatus *app_status :
+ *status.message().statuses()) {
+ PrintApplicationStatus(app_status, time, node);
+ }
+ } else {
+ missing_nodes.push_back(node);
}
- } else {
- LOG(WARNING) << "No status found";
+ }
+ for (const aos::Node *node : missing_nodes) {
+ if (node == nullptr) {
+ LOG(WARNING) << "No status found.";
+ } else {
+ LOG(WARNING) << "No status found for node "
+ << node->name()->string_view();
+ }
}
}
@@ -106,12 +148,17 @@
return false;
}
- if (!ValidApplication(config, application_name)) {
+ const std::vector<const aos::Node *> application_nodes =
+ InteractNodesForApplication(config, application_name);
+ if (application_nodes.empty()) {
return false;
}
- auto status = aos::starter::GetStatus(application_name, config);
PrintKey();
- PrintApplicationStatus(&status.message(), aos::monotonic_clock::now());
+ for (const aos::Node *node : application_nodes) {
+ auto status = aos::starter::GetStatus(application_name, config, node);
+ PrintApplicationStatus(&status.message(), aos::monotonic_clock::now(),
+ node);
+ }
} else {
LOG(ERROR) << "The \"status\" command requires zero or one arguments.";
return true;
@@ -125,34 +172,69 @@
const aos::starter::Command command,
std::string_view success_text,
std::string_view failure_text) {
- const auto optional_status = aos::starter::GetStarterStatus(config);
- if (optional_status) {
- auto status = *optional_status;
- const aos::Node *my_node = MaybeMyNode(config);
- std::vector<std::pair<aos::starter::Command, std::string_view>> commands;
+ std::map<const aos::Node *,
+ std::unique_ptr<aos::FlatbufferVector<aos::starter::Status>>>
+ statuses;
+
+ for (const aos::Node *node : InteractNodes(config)) {
+ std::optional<std::pair<aos::monotonic_clock::time_point,
+ const aos::FlatbufferVector<aos::starter::Status>>>
+ optional_status = aos::starter::GetStarterStatus(config, node);
+ if (optional_status.has_value()) {
+ statuses[node] =
+ std::make_unique<aos::FlatbufferVector<aos::starter::Status>>(
+ optional_status.value().second);
+ } else {
+ if (node == nullptr) {
+ LOG(WARNING) << "Starter not running";
+ } else {
+ LOG(WARNING) << "Starter not running on node "
+ << node->name()->string_view();
+ }
+ }
+ }
+
+ if (!statuses.empty()) {
+ std::vector<aos::starter::ApplicationCommand> commands;
for (const aos::Application *application : *config->applications()) {
- // Ignore any applications which aren't supposed to be started on this
- // node.
- if (!aos::configuration::ApplicationShouldStart(config, my_node,
- application)) {
+ const std::string_view application_name =
+ application->name()->string_view();
+ const std::vector<const aos::Node *> application_nodes =
+ InteractNodesForApplication(config, application_name);
+ // Ignore any applications which aren't supposed to be started.
+ if (application_nodes.empty()) {
continue;
}
- const std::string_view application_name =
- application->name()->string_view();
- if (!application->autostart()) {
- const aos::starter::ApplicationStatus *application_status =
- aos::starter::FindApplicationStatus(status.message(),
- application_name);
- if (application_status->state() == aos::starter::State::STOPPED) {
- std::cout << "Skipping " << application_name
- << " because it is STOPPED\n";
- continue;
+ std::vector<const aos::Node *> running_nodes;
+ if (application->autostart()) {
+ running_nodes = application_nodes;
+ } else {
+ for (const aos::Node *node : application_nodes) {
+ const aos::starter::ApplicationStatus *application_status =
+ aos::starter::FindApplicationStatus(statuses[node]->message(),
+ application_name);
+ if (application_status->state() == aos::starter::State::STOPPED) {
+ if (node == nullptr) {
+ std::cout << "Skipping " << application_name
+ << " because it is STOPPED\n";
+ } else {
+ std::cout << "Skipping " << application_name << " on "
+ << node->name()->string_view()
+ << " because it is STOPPED\n";
+ }
+ continue;
+ } else {
+ running_nodes.push_back(node);
+ }
}
}
- commands.emplace_back(command, application_name);
+ if (!running_nodes.empty()) {
+ commands.emplace_back(aos::starter::ApplicationCommand{
+ command, application_name, running_nodes});
+ }
}
// Restart each running process
@@ -163,7 +245,7 @@
std::cout << failure_text << "all \n";
}
} else {
- LOG(WARNING) << "Starter not running";
+ LOG(WARNING) << "None of the starters we care about are running.";
}
}
@@ -206,12 +288,16 @@
InteractWithAll(config, command, success_text, failure_text);
return false;
}
- if (!ValidApplication(config, application_name)) {
+
+ const std::vector<const aos::Node *> application_nodes =
+ InteractNodesForApplication(config, application_name);
+ if (application_nodes.empty()) {
return false;
}
if (aos::starter::SendCommandBlocking(command, application_name, config,
- chrono::seconds(5))) {
+ chrono::seconds(5),
+ application_nodes)) {
std::cout << success_text << application_name << '\n';
} else {
std::cout << failure_text << application_name << '\n';
diff --git a/aos/starter/starter_rpc.fbs b/aos/starter/starter_rpc.fbs
index 21b7117..bbf0605 100644
--- a/aos/starter/starter_rpc.fbs
+++ b/aos/starter/starter_rpc.fbs
@@ -17,11 +17,15 @@
}
table StarterRpc {
- command : Command (id: 0);
+ command:Command (id: 0);
// The name of the application to send the command to. Command is ignored if
// the given application does not exist.
- name: string (id: 1);
+ name:string (id: 1);
+
+ // This set of nodes to start/stop the application on. If empty, indicates that applications
+ // should be restarted on all nodes.
+ nodes:[string] (id: 2);
}
root_type StarterRpc;
diff --git a/aos/starter/starter_rpc_lib.cc b/aos/starter/starter_rpc_lib.cc
index eb9a403..9f666ff 100644
--- a/aos/starter/starter_rpc_lib.cc
+++ b/aos/starter/starter_rpc_lib.cc
@@ -2,10 +2,24 @@
#include "aos/events/shm_event_loop.h"
#include "aos/flatbuffer_merge.h"
+#include "aos/starter/starterd_lib.h"
namespace aos {
namespace starter {
+namespace {
+State ExpectedStateForCommand(Command command) {
+ switch (command) {
+ case Command::START:
+ case Command::RESTART:
+ return State::RUNNING;
+ case Command::STOP:
+ return State::STOPPED;
+ }
+ return State::STOPPED;
+}
+} // namespace
+
const aos::starter::ApplicationStatus *FindApplicationStatus(
const aos::starter::Status &status, std::string_view name) {
if (!status.has_statuses()) {
@@ -39,121 +53,203 @@
return app_name;
}
-bool SendCommandBlocking(aos::starter::Command command, std::string_view name,
- const aos::Configuration *config,
- std::chrono::milliseconds timeout) {
- return SendCommandBlocking(
- std::vector<std::pair<aos::starter::Command, std::string_view>>{
- {command, name}},
- config, timeout);
+StarterClient::StarterClient(EventLoop *event_loop)
+ : event_loop_(event_loop),
+ timeout_timer_(event_loop_->AddTimer([this]() { Timeout(); })),
+ cmd_sender_(event_loop_->MakeSender<StarterRpc>("/aos")) {
+ if (configuration::MultiNode(event_loop_->configuration())) {
+ for (const aos::Node *node :
+ configuration::GetNodes(event_loop_->configuration())) {
+ const Channel *channel =
+ StatusChannelForNode(event_loop_->configuration(), node);
+ CHECK(channel != nullptr) << ": Failed to find channel /aos for "
+ << Status::GetFullyQualifiedName() << " on "
+ << node->name()->string_view();
+ if (!configuration::ChannelIsReadableOnNode(channel,
+ event_loop_->node())) {
+ LOG(INFO) << "Status channel "
+ << configuration::StrippedChannelToString(channel)
+ << " is not readable on "
+ << event_loop_->node()->name()->string_view();
+ } else if (!configuration::ChannelIsReadableOnNode(
+ StarterRpcChannelForNode(event_loop_->configuration(),
+ event_loop_->node()),
+ node)) {
+ // Don't attempt to construct a status fetcher if the other node won't
+ // even be able to receive our commands.
+ LOG(INFO) << "StarterRpc channel for "
+ << event_loop_->node()->name()->string_view()
+ << " is not readable on " << node->name()->string_view();
+ } else {
+ status_fetchers_[node->name()->str()] =
+ event_loop_->MakeFetcher<Status>(channel->name()->string_view());
+ event_loop_->MakeNoArgWatcher<Status>(channel->name()->string_view(),
+ [this]() {
+ if (CheckCommandsSucceeded()) {
+ Succeed();
+ }
+ });
+ }
+ }
+ } else {
+ status_fetchers_[""] = event_loop_->MakeFetcher<Status>("/aos");
+ event_loop_->MakeNoArgWatcher<Status>("/aos", [this]() {
+ if (CheckCommandsSucceeded()) {
+ Succeed();
+ }
+ });
+ }
}
-bool SendCommandBlocking(
- std::vector<std::pair<aos::starter::Command, std::string_view>> commands,
- const aos::Configuration *config, std::chrono::milliseconds timeout) {
+void StarterClient::SendCommands(
+ const std::vector<ApplicationCommand> &commands,
+ monotonic_clock::duration timeout) {
+ CHECK(current_commands_.empty());
+ for (auto &pair : status_fetchers_) {
+ pair.second.Fetch();
+ }
+ const bool is_multi_node =
+ aos::configuration::MultiNode(event_loop_->configuration());
+ for (const auto &command : commands) {
+ auto builder = cmd_sender_.MakeBuilder();
+ const auto application_offset =
+ builder.fbb()->CreateString(command.application);
+ std::vector<flatbuffers::Offset<flatbuffers::String>> node_offsets;
+ CHECK(!command.nodes.empty())
+ << "At least one node must be specified for application "
+ << command.application;
+ for (const aos::Node *node : command.nodes) {
+ const std::string node_name((node == nullptr) ? "" : node->name()->str());
+ if (status_fetchers_.count(node_name) == 0) {
+ if (is_multi_node) {
+ LOG(FATAL) << "Node \"" << node_name
+ << "\" must be configured to both receive StarterRpc "
+ "messages from \""
+ << event_loop_->node()->name()->string_view()
+ << "\" as well as to send starter Status messages back.";
+ } else {
+ LOG(FATAL) << "On single-node configs, use an empty string for the "
+ "node name.";
+ }
+ }
+ CHECK(status_fetchers_[node_name].get() != nullptr)
+ << ": No status available for node " << node_name;
+ if (is_multi_node) {
+ node_offsets.push_back(builder.fbb()->CreateString(node_name));
+ }
+ const ApplicationStatus *last_status =
+ CHECK_NOTNULL(FindApplicationStatus(*status_fetchers_[node_name],
+ command.application));
+ current_commands_[node_name].push_back(CommandStatus{
+ .expected_state = ExpectedStateForCommand(command.command),
+ .application = std::string(command.application),
+ .old_id = std::nullopt});
+ // If we are restarting, then we need to track what the current ID of the
+ // process is to ensure that it actually got restarted. For just starting,
+ // we leave the application running and so don't care.
+ if (command.command == Command::RESTART && last_status->has_id()) {
+ current_commands_[node_name].back().old_id = last_status->id();
+ }
+ }
+ flatbuffers::Offset<
+ flatbuffers::Vector<flatbuffers::Offset<flatbuffers::String>>>
+ nodes_offset;
+ if (is_multi_node) {
+ nodes_offset = builder.fbb()->CreateVector(node_offsets);
+ }
+ auto command_builder = builder.MakeBuilder<StarterRpc>();
+ command_builder.add_command(command.command);
+ command_builder.add_name(application_offset);
+ if (is_multi_node) {
+ command_builder.add_nodes(nodes_offset);
+ }
+ CHECK(builder.Send(command_builder.Finish()));
+ }
+
+ timeout_timer_->Setup(event_loop_->monotonic_now() + timeout);
+}
+
+bool StarterClient::CheckCommandsSucceeded() {
+ if (current_commands_.empty()) {
+ return false;
+ }
+
+ for (auto &pair : status_fetchers_) {
+ pair.second.Fetch();
+ }
+
+ bool succeeded = true;
+
+ for (const auto &pair : current_commands_) {
+ if (pair.second.empty()) {
+ continue;
+ }
+ CHECK(status_fetchers_[pair.first].get() != nullptr)
+ << ": No status available for node " << pair.first;
+ const Status &status = *status_fetchers_[pair.first];
+ for (const auto &command : pair.second) {
+ const ApplicationStatus *application_status =
+ CHECK_NOTNULL(FindApplicationStatus(status, command.application));
+ if (application_status->state() == command.expected_state) {
+ if (command.expected_state == State::RUNNING &&
+ application_status->id() == command.old_id) {
+ succeeded = false;
+ }
+ } else {
+ succeeded = false;
+ }
+ }
+ }
+ return succeeded;
+}
+
+void StarterClient::Timeout() {
+ // Clear commands prior to calling handlers to allow the handler to call
+ // SendCommands() again if desired.
+ current_commands_.clear();
+ if (timeout_handler_) {
+ timeout_handler_();
+ }
+}
+
+void StarterClient::Succeed() {
+ // Clear commands prior to calling handlers to allow the handler to call
+ // SendCommands() again if desired.
+ current_commands_.clear();
+ if (success_handler_) {
+ success_handler_();
+ }
+ timeout_timer_->Disable();
+}
+
+bool SendCommandBlocking(aos::starter::Command command, std::string_view name,
+ const aos::Configuration *config,
+ std::chrono::milliseconds timeout,
+ std::vector<const aos::Node *> nodes) {
+ return SendCommandBlocking({{command, name, nodes}}, config, timeout);
+}
+
+bool SendCommandBlocking(const std::vector<ApplicationCommand> &commands,
+ const aos::Configuration *config,
+ std::chrono::milliseconds timeout) {
aos::ShmEventLoop event_loop(config);
event_loop.SkipAosLog();
- ::aos::Sender<aos::starter::StarterRpc> cmd_sender =
- event_loop.MakeSender<aos::starter::StarterRpc>("/aos");
+ StarterClient client(&event_loop);
// Wait until event loop starts to send all commands so the watcher is ready
- event_loop.OnRun([&cmd_sender, &commands] {
- for (const std::pair<aos::starter::Command, std::string_view>
- &command_pair : commands) {
- const aos::starter::Command command = command_pair.first;
- const std::string_view name = command_pair.second;
- aos::Sender<aos::starter::StarterRpc>::Builder builder =
- cmd_sender.MakeBuilder();
-
- auto name_str = builder.fbb()->CreateString(name);
-
- aos::starter::StarterRpc::Builder cmd_builder =
- builder.MakeBuilder<aos::starter::StarterRpc>();
-
- cmd_builder.add_name(name_str);
- cmd_builder.add_command(command);
-
- builder.Send(cmd_builder.Finish());
- }
+ event_loop.OnRun([&commands, &client, timeout]() {
+ client.SendCommands(commands, timeout);
});
// If still waiting after timeout milliseconds, exit the loop
- event_loop.AddTimer([&event_loop] { event_loop.Exit(); })
- ->Setup(event_loop.monotonic_now() + timeout);
+ client.SetTimeoutHandler([&event_loop]() { event_loop.Exit(); });
- // Fetch the last list of statuses. The id field changes every time the
- // application restarts. By detecting when the application is running with a
- // different ID, we can detect restarts.
- auto initial_status_fetcher =
- event_loop.MakeFetcher<aos::starter::Status>("/aos");
- initial_status_fetcher.Fetch();
-
- std::vector<std::optional<uint64_t>> initial_ids;
-
- for (const std::pair<aos::starter::Command, std::string_view> &command_pair :
- commands) {
- const std::string_view name = command_pair.second;
- auto initial_status =
- initial_status_fetcher.get()
- ? FindApplicationStatus(*initial_status_fetcher, name)
- : nullptr;
-
- initial_ids.emplace_back(
- (initial_status != nullptr && initial_status->has_id())
- ? std::make_optional(initial_status->id())
- : std::nullopt);
- }
-
- std::vector<bool> successes(commands.size(), false);
bool success = false;
- event_loop.MakeWatcher("/aos", [&event_loop, &commands, &initial_ids, &success,
- &successes](
- const aos::starter::Status &status) {
- size_t index = 0;
- for (const std::pair<aos::starter::Command, std::string_view>
- &command_pair : commands) {
- const aos::starter::Command command = command_pair.first;
- const std::string_view name = command_pair.second;
- const aos::starter::ApplicationStatus *app_status =
- FindApplicationStatus(status, name);
-
- const std::optional<aos::starter::State> state =
- (app_status != nullptr && app_status->has_state())
- ? std::make_optional(app_status->state())
- : std::nullopt;
-
- switch (command) {
- case aos::starter::Command::START: {
- if (state == aos::starter::State::RUNNING) {
- successes[index] = true;
- }
- break;
- }
- case aos::starter::Command::STOP: {
- if (state == aos::starter::State::STOPPED) {
- successes[index] = true;
- }
- break;
- }
- case aos::starter::Command::RESTART: {
- if (state == aos::starter::State::RUNNING && app_status->has_id() &&
- app_status->id() != initial_ids[index]) {
- successes[index] = true;
- }
- break;
- }
- }
- ++index;
- }
-
- // Wait until all applications are ready.
- if (std::count(successes.begin(), successes.end(), true) ==
- static_cast<ssize_t>(successes.size())) {
- event_loop.Exit();
- success = true;
- }
+ client.SetSuccessHandler([&event_loop, &success]() {
+ success = true;
+ event_loop.Exit();
});
event_loop.Run();
@@ -162,11 +258,12 @@
}
const FlatbufferDetachedBuffer<aos::starter::ApplicationStatus> GetStatus(
- std::string_view name, const Configuration *config) {
+ std::string_view name, const Configuration *config, const aos::Node *node) {
ShmEventLoop event_loop(config);
event_loop.SkipAosLog();
- auto status_fetcher = event_loop.MakeFetcher<aos::starter::Status>("/aos");
+ auto status_fetcher = event_loop.MakeFetcher<aos::starter::Status>(
+ StatusChannelForNode(config, node)->name()->string_view());
status_fetcher.Fetch();
auto status = status_fetcher.get()
? FindApplicationStatus(*status_fetcher, name)
@@ -176,16 +273,20 @@
aos::starter::ApplicationStatus>::Empty();
}
-std::optional<const aos::FlatbufferVector<aos::starter::Status>>
-GetStarterStatus(const aos::Configuration *config) {
+std::optional<std::pair<aos::monotonic_clock::time_point,
+ const aos::FlatbufferVector<aos::starter::Status>>>
+GetStarterStatus(const aos::Configuration *config, const aos::Node *node) {
ShmEventLoop event_loop(config);
event_loop.SkipAosLog();
- auto status_fetcher = event_loop.MakeFetcher<aos::starter::Status>("/aos");
+ auto status_fetcher = event_loop.MakeFetcher<aos::starter::Status>(
+ StatusChannelForNode(config, node)->name()->string_view());
status_fetcher.Fetch();
- return (status_fetcher.get()
- ? std::make_optional(status_fetcher.CopyFlatBuffer())
- : std::nullopt);
+ return (status_fetcher.get() == nullptr)
+ ? std::nullopt
+ : std::make_optional(std::make_pair(
+ status_fetcher.context().monotonic_remote_time,
+ status_fetcher.CopyFlatBuffer()));
}
} // namespace starter
diff --git a/aos/starter/starter_rpc_lib.h b/aos/starter/starter_rpc_lib.h
index fdea0b7..7b93e24 100644
--- a/aos/starter/starter_rpc_lib.h
+++ b/aos/starter/starter_rpc_lib.h
@@ -2,15 +2,72 @@
#define AOS_STARTER_STARTER_RPC_LIB_H_
#include <chrono>
+#include <map>
#include <optional>
+#include <vector>
#include "aos/configuration.h"
+#include "aos/events/event_loop.h"
#include "aos/starter/starter_generated.h"
#include "aos/starter/starter_rpc_generated.h"
namespace aos {
namespace starter {
+// Data required to command that starter start/stop/restart a given application.
+struct ApplicationCommand {
+ Command command;
+ std::string_view application;
+ std::vector<const aos::Node *> nodes;
+};
+
+// This class manages interacting with starterd so that you can conveniently
+// start/stop applications programmatically.
+// Note that the StarterClient only maintains internal state for a single set of
+// commands at once, so once the user calls SendCommands() they must wait for
+// the timeout or success handler to be called before calling SendCommands
+// again.
+class StarterClient {
+ public:
+ StarterClient(EventLoop *event_loop);
+
+ void SendCommands(const std::vector<ApplicationCommand> &commands,
+ monotonic_clock::duration timeout);
+
+ void SetTimeoutHandler(std::function<void()> handler) {
+ timeout_handler_ = handler;
+ }
+
+ void SetSuccessHandler(std::function<void()> handler) {
+ success_handler_ = handler;
+ }
+
+ private:
+ struct CommandStatus {
+ State expected_state;
+ std::string application;
+ std::optional<uint64_t> old_id;
+ };
+
+ bool CheckCommandsSucceeded();
+
+ void Timeout();
+
+ void Succeed();
+
+ EventLoop *event_loop_;
+ TimerHandler *timeout_timer_;
+ Sender<StarterRpc> cmd_sender_;
+ // Map of fetchers by node name.
+ std::map<std::string, Fetcher<Status>> status_fetchers_;
+
+ // Mapping of node name to a list of applications with pending commands.
+ std::map<std::string, std::vector<CommandStatus>> current_commands_;
+
+ std::function<void()> timeout_handler_;
+ std::function<void()> success_handler_;
+};
+
// Finds the status of an individual application within a starter status message
// Returns nullptr if no application found by the given name.
const aos::starter::ApplicationStatus *FindApplicationStatus(
@@ -28,26 +85,35 @@
// achieved within timeout.
bool SendCommandBlocking(aos::starter::Command, std::string_view name,
const aos::Configuration *config,
- std::chrono::milliseconds timeout);
+ std::chrono::milliseconds timeout,
+ std::vector<const aos::Node *> nodes = {});
// Sends lots of commands and waits for them all to succeed. There must not be
// more than 1 conflicting command in here which modifies the state of a single
// application otherwise it will never succeed. An example is having both a
// start and stop command for a single application.
-bool SendCommandBlocking(
- std::vector<std::pair<aos::starter::Command, std::string_view>> commands,
- const aos::Configuration *config, std::chrono::milliseconds timeout);
+bool SendCommandBlocking(const std::vector<ApplicationCommand> &commands,
+ const aos::Configuration *config,
+ std::chrono::milliseconds timeout);
// Fetches the status of the application with the given name. Creates a
// temporary event loop from the provided config for fetching. Returns an empty
// flatbuffer if the application is not found.
const aos::FlatbufferDetachedBuffer<aos::starter::ApplicationStatus> GetStatus(
- std::string_view name, const aos::Configuration *config);
+ std::string_view name, const aos::Configuration *config,
+ const aos::Node *node);
// Fetches the entire status message of starter. Creates a temporary event loop
// from the provided config for fetching.
-std::optional<const aos::FlatbufferVector<aos::starter::Status>>
-GetStarterStatus(const aos::Configuration *config);
+// The returned pair is the time at which the Status was sent on the node it was
+// sent from, to allow calculating uptimes on remote nodes.
+// TODO(james): Use the ServerStatistics message and return the monotonic offset
+// instead, so that we can correctly handle high message latencies. Because
+// people don't generally care about ultra-high-precision uptime calculations,
+// this hasn't been prioritized.
+std::optional<std::pair<aos::monotonic_clock::time_point,
+ const aos::FlatbufferVector<aos::starter::Status>>>
+GetStarterStatus(const aos::Configuration *config, const aos::Node *node);
} // namespace starter
} // namespace aos
diff --git a/aos/starter/starter_test.cc b/aos/starter/starter_test.cc
index a6f9204..3a61fe7 100644
--- a/aos/starter/starter_test.cc
+++ b/aos/starter/starter_test.cc
@@ -1,9 +1,11 @@
#include <csignal>
+#include <experimental/filesystem>
#include <future>
#include <thread>
#include "aos/events/ping_generated.h"
#include "aos/events/pong_generated.h"
+#include "aos/network/team_number.h"
#include "aos/testing/path.h"
#include "aos/testing/tmpdir.h"
#include "gtest/gtest.h"
@@ -15,31 +17,57 @@
namespace aos {
namespace starter {
-TEST(StarterdTest, StartStopTest) {
- const std::string config_file =
- ArtifactPath("aos/events/pingpong_config.json");
+class StarterdTest : public ::testing::Test {
+ public:
+ StarterdTest() : shm_dir_(aos::testing::TestTmpDir() + "/aos") {
+ FLAGS_shm_base = shm_dir_;
+
+ // Nuke the shm dir:
+ std::experimental::filesystem::remove_all(shm_dir_);
+ }
+
+ protected:
+ gflags::FlagSaver flag_saver_;
+ std::string shm_dir_;
+};
+
+struct TestParams {
+ std::string config;
+ std::string hostname;
+};
+
+class StarterdConfigParamTest
+ : public StarterdTest,
+ public ::testing::WithParamInterface<TestParams> {};
+
+TEST_P(StarterdConfigParamTest, MultiNodeStartStopTest) {
+ gflags::FlagSaver flag_saver;
+ FLAGS_override_hostname = GetParam().hostname;
+ const std::string config_file = ArtifactPath(GetParam().config);
aos::FlatbufferDetachedBuffer<aos::Configuration> config =
aos::configuration::ReadConfig(config_file);
- const std::string test_dir = aos::testing::TestTmpDir();
-
auto new_config = aos::configuration::MergeWithConfig(
- &config.message(), absl::StrFormat(
- R"({"applications": [
+ &config.message(),
+ absl::StrFormat(
+ R"({"applications": [
{
"name": "ping",
"executable_name": "%s",
- "args": ["--shm_base", "%s/aos"]
+ "nodes": ["pi1"],
+ "args": ["--shm_base", "%s", "--config", "%s", "--override_hostname", "%s"]
},
{
"name": "pong",
"executable_name": "%s",
- "args": ["--shm_base", "%s/aos"]
+ "nodes": ["pi1"],
+ "args": ["--shm_base", "%s", "--config", "%s", "--override_hostname", "%s"]
}
]})",
- ArtifactPath("aos/events/ping"), test_dir,
- ArtifactPath("aos/events/pong"), test_dir));
+ ArtifactPath("aos/events/ping"), shm_dir_, config_file,
+ GetParam().hostname, ArtifactPath("aos/events/pong"), shm_dir_,
+ config_file, GetParam().hostname));
const aos::Configuration *config_msg = &new_config.message();
@@ -51,6 +79,17 @@
aos::ShmEventLoop watcher_loop(config_msg);
watcher_loop.SkipAosLog();
+ aos::ShmEventLoop client_loop(config_msg);
+ client_loop.SkipAosLog();
+ StarterClient client(&client_loop);
+ client.SetTimeoutHandler(
+ []() { FAIL() << ": Command should not have timed out."; });
+ bool success = false;
+ client.SetSuccessHandler([&success, &client_loop]() {
+ client_loop.Exit();
+ success = true;
+ });
+
watcher_loop
.AddTimer([&watcher_loop] {
watcher_loop.Exit();
@@ -58,27 +97,25 @@
})
->Setup(watcher_loop.monotonic_now() + std::chrono::seconds(7));
- int test_stage = 0;
- watcher_loop.MakeWatcher(
- "/test", [&test_stage, config_msg](const aos::examples::Ping &) {
- switch (test_stage) {
- case 1: {
- test_stage = 2;
- break;
- }
- case 2: {
- std::thread([config_msg] {
- LOG(INFO) << "Send command";
- ASSERT_TRUE(aos::starter::SendCommandBlocking(
- aos::starter::Command::STOP, "ping", config_msg,
- std::chrono::seconds(3)));
- })
- .detach();
- test_stage = 3;
- break;
- }
+ std::atomic<int> test_stage = 0;
+ // Watch on the client loop since we need to interact with the StarterClient.
+ client_loop.MakeWatcher("/test", [&test_stage, &client,
+ &client_loop](const aos::examples::Ping &) {
+ switch (test_stage) {
+ case 1: {
+ test_stage = 2;
+ break;
+ }
+ case 2: {
+ {
+ client.SendCommands({{Command::STOP, "ping", {client_loop.node()}}},
+ std::chrono::seconds(3));
}
- });
+ test_stage = 3;
+ break;
+ }
+ }
+ });
watcher_loop.MakeWatcher(
"/aos", [&test_stage, &watcher_loop](const aos::starter::Status &status) {
@@ -109,37 +146,43 @@
});
std::thread starterd_thread([&starter] { starter.Run(); });
+ std::thread client_thread([&client_loop] { client_loop.Run(); });
watcher_loop.Run();
starter.Cleanup();
+ client_thread.join();
starterd_thread.join();
}
-TEST(StarterdTest, DeathTest) {
+INSTANTIATE_TEST_SUITE_P(
+ StarterdConfigParamTest, StarterdConfigParamTest,
+ ::testing::Values(TestParams{"aos/events/pingpong_config.json", ""},
+ TestParams{"aos/starter/multinode_pingpong_config.json",
+ "pi1"}));
+
+TEST_F(StarterdTest, DeathTest) {
const std::string config_file =
ArtifactPath("aos/events/pingpong_config.json");
aos::FlatbufferDetachedBuffer<aos::Configuration> config =
aos::configuration::ReadConfig(config_file);
- const std::string test_dir = aos::testing::TestTmpDir();
-
auto new_config = aos::configuration::MergeWithConfig(
&config.message(), absl::StrFormat(
R"({"applications": [
{
"name": "ping",
"executable_name": "%s",
- "args": ["--shm_base", "%s/aos"]
+ "args": ["--shm_base", "%s"]
},
{
"name": "pong",
"executable_name": "%s",
- "args": ["--shm_base", "%s/aos"]
+ "args": ["--shm_base", "%s"]
}
]})",
- ArtifactPath("aos/events/ping"), test_dir,
- ArtifactPath("aos/events/pong"), test_dir));
+ ArtifactPath("aos/events/ping"), shm_dir_,
+ ArtifactPath("aos/events/pong"), shm_dir_));
const aos::Configuration *config_msg = &new_config.message();
@@ -203,32 +246,30 @@
starterd_thread.join();
}
-TEST(StarterdTest, Autostart) {
+TEST_F(StarterdTest, Autostart) {
const std::string config_file =
ArtifactPath("aos/events/pingpong_config.json");
aos::FlatbufferDetachedBuffer<aos::Configuration> config =
aos::configuration::ReadConfig(config_file);
- const std::string test_dir = aos::testing::TestTmpDir();
-
auto new_config = aos::configuration::MergeWithConfig(
&config.message(), absl::StrFormat(
R"({"applications": [
{
"name": "ping",
"executable_name": "%s",
- "args": ["--shm_base", "%s/aos"],
+ "args": ["--shm_base", "%s"],
"autostart": false
},
{
"name": "pong",
"executable_name": "%s",
- "args": ["--shm_base", "%s/aos"]
+ "args": ["--shm_base", "%s"]
}
]})",
- ArtifactPath("aos/events/ping"), test_dir,
- ArtifactPath("aos/events/pong"), test_dir));
+ ArtifactPath("aos/events/ping"), shm_dir_,
+ ArtifactPath("aos/events/pong"), shm_dir_));
const aos::Configuration *config_msg = &new_config.message();
diff --git a/aos/starter/starterd_lib.cc b/aos/starter/starterd_lib.cc
index 19051bb..bc83768 100644
--- a/aos/starter/starterd_lib.cc
+++ b/aos/starter/starterd_lib.cc
@@ -8,6 +8,8 @@
#include <algorithm>
#include <utility>
+#include "absl/strings/str_format.h"
+#include "aos/json_to_flatbuffer.h"
#include "glog/logging.h"
#include "glog/stl_logging.h"
@@ -406,6 +408,15 @@
SignalListener::~SignalListener() { loop_->epoll()->DeleteFd(signalfd_.fd()); }
+const aos::Channel *StatusChannelForNode(const aos::Configuration *config,
+ const aos::Node *node) {
+ return configuration::GetChannel<Status>(config, "/aos", "", node);
+}
+const aos::Channel *StarterRpcChannelForNode(const aos::Configuration *config,
+ const aos::Node *node) {
+ return configuration::GetChannel<StarterRpc>(config, "/aos", "", node);
+}
+
Starter::Starter(const aos::Configuration *event_loop_config)
: config_msg_(event_loop_config),
event_loop_(event_loop_config),
@@ -427,20 +438,30 @@
std::chrono::milliseconds(1000));
});
- event_loop_.MakeWatcher("/aos", [this](const aos::starter::StarterRpc &cmd) {
- if (!cmd.has_command() || !cmd.has_name() || exiting_) {
- return;
+ if (!aos::configuration::MultiNode(config_msg_)) {
+ event_loop_.MakeWatcher(
+ "/aos",
+ [this](const aos::starter::StarterRpc &cmd) { HandleStarterRpc(cmd); });
+ } else {
+ for (const aos::Node *node : aos::configuration::GetNodes(config_msg_)) {
+ const Channel *channel = StarterRpcChannelForNode(config_msg_, node);
+ CHECK(channel != nullptr) << ": Failed to find channel /aos for "
+ << StarterRpc::GetFullyQualifiedName() << " on "
+ << node->name()->string_view();
+ if (!aos::configuration::ChannelIsReadableOnNode(channel,
+ event_loop_.node())) {
+ LOG(INFO) << "StarterRpc channel "
+ << aos::configuration::StrippedChannelToString(channel)
+ << " is not readable on "
+ << event_loop_.node()->name()->string_view();
+ } else {
+ event_loop_.MakeWatcher(channel->name()->string_view(),
+ [this](const aos::starter::StarterRpc &cmd) {
+ HandleStarterRpc(cmd);
+ });
+ }
}
- LOG(INFO) << "Received command "
- << aos::starter::EnumNameCommand(cmd.command()) << ' '
- << cmd.name()->string_view();
-
- auto search = applications_.find(cmd.name()->str());
- if (search != applications_.end()) {
- // If an applicatione exists by the given name, dispatch the command
- search->second.HandleCommand(cmd.command());
- }
- });
+ }
if (config_msg_->has_applications()) {
const flatbuffers::Vector<flatbuffers::Offset<aos::Application>>
@@ -465,6 +486,34 @@
}
}
+void Starter::HandleStarterRpc(const StarterRpc &command) {
+ if (!command.has_command() || !command.has_name() || exiting_) {
+ return;
+ }
+
+ LOG(INFO) << "Received " << aos::FlatbufferToJson(&command);
+
+ if (command.has_nodes()) {
+ CHECK(aos::configuration::MultiNode(config_msg_));
+ bool relevant_to_this_node = false;
+ for (const flatbuffers::String *node : *command.nodes()) {
+ if (node->string_view() == event_loop_.node()->name()->string_view()) {
+ relevant_to_this_node = true;
+ }
+ }
+ if (!relevant_to_this_node) {
+ return;
+ }
+ }
+ // If not populated, restart regardless of node.
+
+ auto search = applications_.find(command.name()->str());
+ if (search != applications_.end()) {
+ // If an applicatione exists by the given name, dispatch the command
+ search->second.HandleCommand(command.command());
+ }
+}
+
void Starter::MaybeSendStatus() {
if (status_count_ < max_status_count_) {
SendStatus();
diff --git a/aos/starter/starterd_lib.h b/aos/starter/starterd_lib.h
index b5888e0..1809326 100644
--- a/aos/starter/starterd_lib.h
+++ b/aos/starter/starterd_lib.h
@@ -159,6 +159,11 @@
DISALLOW_COPY_AND_ASSIGN(SignalListener);
};
+const aos::Channel *StatusChannelForNode(const aos::Configuration *config,
+ const aos::Node *node);
+const aos::Channel *StarterRpcChannelForNode(const aos::Configuration *config,
+ const aos::Node *node);
+
class Starter {
public:
Starter(const aos::Configuration *event_loop_config);
@@ -180,6 +185,7 @@
SIGSEGV, SIGPIPE, SIGTERM, SIGBUS, SIGXCPU};
void OnSignal(signalfd_siginfo signal);
+ void HandleStarterRpc(const StarterRpc &command);
// Sends the Status message if it wouldn't exceed the rate limit.
void MaybeSendStatus();