blob: a5bdbdd7e2fc76a3ba396a7e07aa4e32b730e517 [file] [log] [blame]
Tyler Chatowa79419d2020-08-12 20:12:11 -07001#include "starter_rpc_lib.h"
2
3#include "aos/events/shm_event_loop.h"
4#include "aos/flatbuffer_merge.h"
James Kuszmaul293b2172021-11-10 16:20:48 -08005#include "aos/starter/starterd_lib.h"
milind1f1dca32021-07-03 13:50:07 -07006#include "glog/logging.h"
Tyler Chatowa79419d2020-08-12 20:12:11 -07007
8namespace aos {
9namespace starter {
10
James Kuszmaul293b2172021-11-10 16:20:48 -080011namespace {
12State ExpectedStateForCommand(Command command) {
13 switch (command) {
14 case Command::START:
15 case Command::RESTART:
16 return State::RUNNING;
17 case Command::STOP:
18 return State::STOPPED;
19 }
20 return State::STOPPED;
21}
22} // namespace
23
Tyler Chatowa79419d2020-08-12 20:12:11 -070024const aos::starter::ApplicationStatus *FindApplicationStatus(
25 const aos::starter::Status &status, std::string_view name) {
26 if (!status.has_statuses()) {
27 return nullptr;
28 }
29
30 auto statuses = status.statuses();
31
32 auto search =
33 std::find_if(statuses->begin(), statuses->end(),
34 [name](const aos::starter::ApplicationStatus *app_status) {
35 return app_status->has_name() &&
36 app_status->name()->string_view() == name;
37 });
38 if (search == statuses->end()) {
39 return nullptr;
40 }
41 return *search;
42}
43
milind upadhyay4272f382021-04-07 18:03:08 -070044std::string_view FindApplication(const std::string_view &name,
45 const aos::Configuration *config) {
46 std::string_view app_name = name;
47 for (const auto app : *config->applications()) {
Milind Upadhyay7641ce82021-04-10 14:15:28 -070048 if (app->has_executable_name() &&
milind upadhyay4272f382021-04-07 18:03:08 -070049 app->executable_name()->string_view() == name) {
50 app_name = app->name()->string_view();
51 break;
52 }
53 }
54 return app_name;
55}
56
James Kuszmaul293b2172021-11-10 16:20:48 -080057StarterClient::StarterClient(EventLoop *event_loop)
58 : event_loop_(event_loop),
59 timeout_timer_(event_loop_->AddTimer([this]() { Timeout(); })),
60 cmd_sender_(event_loop_->MakeSender<StarterRpc>("/aos")) {
61 if (configuration::MultiNode(event_loop_->configuration())) {
62 for (const aos::Node *node :
63 configuration::GetNodes(event_loop_->configuration())) {
64 const Channel *channel =
65 StatusChannelForNode(event_loop_->configuration(), node);
66 CHECK(channel != nullptr) << ": Failed to find channel /aos for "
67 << Status::GetFullyQualifiedName() << " on "
68 << node->name()->string_view();
69 if (!configuration::ChannelIsReadableOnNode(channel,
70 event_loop_->node())) {
James Kuszmaul9d36a622021-11-10 16:48:00 -080071 VLOG(1) << "Status channel "
72 << configuration::StrippedChannelToString(channel)
73 << " is not readable on "
74 << event_loop_->node()->name()->string_view();
James Kuszmaul293b2172021-11-10 16:20:48 -080075 } else if (!configuration::ChannelIsReadableOnNode(
76 StarterRpcChannelForNode(event_loop_->configuration(),
77 event_loop_->node()),
78 node)) {
79 // Don't attempt to construct a status fetcher if the other node won't
80 // even be able to receive our commands.
James Kuszmaul9d36a622021-11-10 16:48:00 -080081 VLOG(1) << "StarterRpc channel for "
82 << event_loop_->node()->name()->string_view()
83 << " is not readable on " << node->name()->string_view();
James Kuszmaul293b2172021-11-10 16:20:48 -080084 } else {
85 status_fetchers_[node->name()->str()] =
86 event_loop_->MakeFetcher<Status>(channel->name()->string_view());
87 event_loop_->MakeNoArgWatcher<Status>(channel->name()->string_view(),
88 [this]() {
89 if (CheckCommandsSucceeded()) {
90 Succeed();
91 }
92 });
93 }
94 }
95 } else {
96 status_fetchers_[""] = event_loop_->MakeFetcher<Status>("/aos");
97 event_loop_->MakeNoArgWatcher<Status>("/aos", [this]() {
98 if (CheckCommandsSucceeded()) {
99 Succeed();
100 }
101 });
102 }
Austin Schuhe4b748a2021-10-16 14:19:58 -0700103}
104
James Kuszmaul293b2172021-11-10 16:20:48 -0800105void StarterClient::SendCommands(
106 const std::vector<ApplicationCommand> &commands,
107 monotonic_clock::duration timeout) {
108 CHECK(current_commands_.empty());
109 for (auto &pair : status_fetchers_) {
110 pair.second.Fetch();
111 }
112 const bool is_multi_node =
113 aos::configuration::MultiNode(event_loop_->configuration());
114 for (const auto &command : commands) {
115 auto builder = cmd_sender_.MakeBuilder();
116 const auto application_offset =
117 builder.fbb()->CreateString(command.application);
118 std::vector<flatbuffers::Offset<flatbuffers::String>> node_offsets;
119 CHECK(!command.nodes.empty())
120 << "At least one node must be specified for application "
121 << command.application;
122 for (const aos::Node *node : command.nodes) {
123 const std::string node_name((node == nullptr) ? "" : node->name()->str());
124 if (status_fetchers_.count(node_name) == 0) {
125 if (is_multi_node) {
126 LOG(FATAL) << "Node \"" << node_name
127 << "\" must be configured to both receive StarterRpc "
128 "messages from \""
129 << event_loop_->node()->name()->string_view()
130 << "\" as well as to send starter Status messages back.";
131 } else {
132 LOG(FATAL) << "On single-node configs, use an empty string for the "
133 "node name.";
134 }
135 }
136 CHECK(status_fetchers_[node_name].get() != nullptr)
137 << ": No status available for node " << node_name;
138 if (is_multi_node) {
139 node_offsets.push_back(builder.fbb()->CreateString(node_name));
140 }
141 const ApplicationStatus *last_status =
142 CHECK_NOTNULL(FindApplicationStatus(*status_fetchers_[node_name],
143 command.application));
144 current_commands_[node_name].push_back(CommandStatus{
145 .expected_state = ExpectedStateForCommand(command.command),
146 .application = std::string(command.application),
147 .old_id = std::nullopt});
148 // If we are restarting, then we need to track what the current ID of the
149 // process is to ensure that it actually got restarted. For just starting,
150 // we leave the application running and so don't care.
151 if (command.command == Command::RESTART && last_status->has_id()) {
152 current_commands_[node_name].back().old_id = last_status->id();
153 }
154 }
155 flatbuffers::Offset<
156 flatbuffers::Vector<flatbuffers::Offset<flatbuffers::String>>>
157 nodes_offset;
158 if (is_multi_node) {
159 nodes_offset = builder.fbb()->CreateVector(node_offsets);
160 }
161 auto command_builder = builder.MakeBuilder<StarterRpc>();
162 command_builder.add_command(command.command);
163 command_builder.add_name(application_offset);
164 if (is_multi_node) {
165 command_builder.add_nodes(nodes_offset);
166 }
milind1f1dca32021-07-03 13:50:07 -0700167 builder.CheckOk(builder.Send(command_builder.Finish()));
James Kuszmaul293b2172021-11-10 16:20:48 -0800168 }
169
170 timeout_timer_->Setup(event_loop_->monotonic_now() + timeout);
171}
172
173bool StarterClient::CheckCommandsSucceeded() {
174 if (current_commands_.empty()) {
175 return false;
176 }
177
178 for (auto &pair : status_fetchers_) {
179 pair.second.Fetch();
180 }
181
182 bool succeeded = true;
183
184 for (const auto &pair : current_commands_) {
185 if (pair.second.empty()) {
186 continue;
187 }
188 CHECK(status_fetchers_[pair.first].get() != nullptr)
189 << ": No status available for node " << pair.first;
190 const Status &status = *status_fetchers_[pair.first];
191 for (const auto &command : pair.second) {
192 const ApplicationStatus *application_status =
193 CHECK_NOTNULL(FindApplicationStatus(status, command.application));
194 if (application_status->state() == command.expected_state) {
195 if (command.expected_state == State::RUNNING &&
196 application_status->id() == command.old_id) {
197 succeeded = false;
198 }
199 } else {
200 succeeded = false;
201 }
202 }
203 }
204 return succeeded;
205}
206
207void StarterClient::Timeout() {
208 // Clear commands prior to calling handlers to allow the handler to call
209 // SendCommands() again if desired.
210 current_commands_.clear();
211 if (timeout_handler_) {
212 timeout_handler_();
213 }
214}
215
216void StarterClient::Succeed() {
217 // Clear commands prior to calling handlers to allow the handler to call
218 // SendCommands() again if desired.
219 current_commands_.clear();
220 if (success_handler_) {
221 success_handler_();
222 }
223 timeout_timer_->Disable();
224}
225
226bool SendCommandBlocking(aos::starter::Command command, std::string_view name,
227 const aos::Configuration *config,
228 std::chrono::milliseconds timeout,
229 std::vector<const aos::Node *> nodes) {
230 return SendCommandBlocking({{command, name, nodes}}, config, timeout);
231}
232
233bool SendCommandBlocking(const std::vector<ApplicationCommand> &commands,
234 const aos::Configuration *config,
235 std::chrono::milliseconds timeout) {
Tyler Chatowa79419d2020-08-12 20:12:11 -0700236 aos::ShmEventLoop event_loop(config);
237 event_loop.SkipAosLog();
238
James Kuszmaul293b2172021-11-10 16:20:48 -0800239 StarterClient client(&event_loop);
Tyler Chatowa79419d2020-08-12 20:12:11 -0700240
Austin Schuhe4b748a2021-10-16 14:19:58 -0700241 // Wait until event loop starts to send all commands so the watcher is ready
James Kuszmaul293b2172021-11-10 16:20:48 -0800242 event_loop.OnRun([&commands, &client, timeout]() {
243 client.SendCommands(commands, timeout);
Tyler Chatowa79419d2020-08-12 20:12:11 -0700244 });
245
246 // If still waiting after timeout milliseconds, exit the loop
James Kuszmaul293b2172021-11-10 16:20:48 -0800247 client.SetTimeoutHandler([&event_loop]() { event_loop.Exit(); });
Tyler Chatowa79419d2020-08-12 20:12:11 -0700248
Tyler Chatowa79419d2020-08-12 20:12:11 -0700249 bool success = false;
Tyler Chatowa79419d2020-08-12 20:12:11 -0700250
James Kuszmaul293b2172021-11-10 16:20:48 -0800251 client.SetSuccessHandler([&event_loop, &success]() {
252 success = true;
253 event_loop.Exit();
Austin Schuhe4b748a2021-10-16 14:19:58 -0700254 });
Tyler Chatowa79419d2020-08-12 20:12:11 -0700255
256 event_loop.Run();
257
258 return success;
259}
260
James Kuszmaule4bb0a22022-01-07 18:14:43 -0800261const std::optional<FlatbufferDetachedBuffer<aos::starter::ApplicationStatus>>
262GetStatus(std::string_view name, const Configuration *config,
263 const aos::Node *node) {
Tyler Chatowa79419d2020-08-12 20:12:11 -0700264 ShmEventLoop event_loop(config);
265 event_loop.SkipAosLog();
266
James Kuszmaul293b2172021-11-10 16:20:48 -0800267 auto status_fetcher = event_loop.MakeFetcher<aos::starter::Status>(
268 StatusChannelForNode(config, node)->name()->string_view());
Tyler Chatowa79419d2020-08-12 20:12:11 -0700269 status_fetcher.Fetch();
James Kuszmaule4bb0a22022-01-07 18:14:43 -0800270 if (status_fetcher.get() != nullptr) {
271 const aos::starter::ApplicationStatus *status =
272 FindApplicationStatus(*status_fetcher, name);
273 if (status != nullptr) {
274 return aos::CopyFlatBuffer(status);
275 }
276 }
277 return std::nullopt;
Tyler Chatowa79419d2020-08-12 20:12:11 -0700278}
279
James Kuszmaul293b2172021-11-10 16:20:48 -0800280std::optional<std::pair<aos::monotonic_clock::time_point,
281 const aos::FlatbufferVector<aos::starter::Status>>>
282GetStarterStatus(const aos::Configuration *config, const aos::Node *node) {
Philipp Schrader08537492021-01-23 16:17:55 -0800283 ShmEventLoop event_loop(config);
284 event_loop.SkipAosLog();
285
James Kuszmaul293b2172021-11-10 16:20:48 -0800286 auto status_fetcher = event_loop.MakeFetcher<aos::starter::Status>(
287 StatusChannelForNode(config, node)->name()->string_view());
Philipp Schrader08537492021-01-23 16:17:55 -0800288 status_fetcher.Fetch();
James Kuszmaul293b2172021-11-10 16:20:48 -0800289 return (status_fetcher.get() == nullptr)
290 ? std::nullopt
291 : std::make_optional(std::make_pair(
292 status_fetcher.context().monotonic_remote_time,
293 status_fetcher.CopyFlatBuffer()));
Philipp Schrader08537492021-01-23 16:17:55 -0800294}
295
Tyler Chatowa79419d2020-08-12 20:12:11 -0700296} // namespace starter
297} // namespace aos