blob: b4977923b551e86115a967f0fd82b09e5074fd63 [file] [log] [blame]
Philipp Schrader790cb542023-07-05 21:06:52 -07001#include "aos/starter/starter_rpc_lib.h"
2
Stephan Pleinesf581a072024-05-23 20:59:27 -07003#include <algorithm>
4#include <ostream>
5
6#include "flatbuffers/buffer.h"
7#include "flatbuffers/flatbuffer_builder.h"
8#include "flatbuffers/string.h"
9#include "flatbuffers/vector.h"
Philipp Schrader790cb542023-07-05 21:06:52 -070010#include "glog/logging.h"
Tyler Chatowa79419d2020-08-12 20:12:11 -070011
Stephan Pleinesf581a072024-05-23 20:59:27 -070012#include "aos/events/context.h"
Tyler Chatowa79419d2020-08-12 20:12:11 -070013#include "aos/events/shm_event_loop.h"
14#include "aos/flatbuffer_merge.h"
James Kuszmaul293b2172021-11-10 16:20:48 -080015#include "aos/starter/starterd_lib.h"
Tyler Chatowa79419d2020-08-12 20:12:11 -070016
Stephan Pleinesf63bde82024-01-13 15:59:33 -080017namespace aos::starter {
Tyler Chatowa79419d2020-08-12 20:12:11 -070018
James Kuszmaul293b2172021-11-10 16:20:48 -080019namespace {
20State ExpectedStateForCommand(Command command) {
21 switch (command) {
22 case Command::START:
23 case Command::RESTART:
24 return State::RUNNING;
25 case Command::STOP:
26 return State::STOPPED;
27 }
28 return State::STOPPED;
29}
30} // namespace
31
Tyler Chatowa79419d2020-08-12 20:12:11 -070032const aos::starter::ApplicationStatus *FindApplicationStatus(
33 const aos::starter::Status &status, std::string_view name) {
34 if (!status.has_statuses()) {
35 return nullptr;
36 }
37
38 auto statuses = status.statuses();
39
40 auto search =
41 std::find_if(statuses->begin(), statuses->end(),
42 [name](const aos::starter::ApplicationStatus *app_status) {
43 return app_status->has_name() &&
44 app_status->name()->string_view() == name;
45 });
46 if (search == statuses->end()) {
47 return nullptr;
48 }
49 return *search;
50}
51
Austin Schuhdae1e8d2022-03-26 15:09:31 -070052std::string_view FindApplication(const std::string_view name,
milind upadhyay4272f382021-04-07 18:03:08 -070053 const aos::Configuration *config) {
54 std::string_view app_name = name;
55 for (const auto app : *config->applications()) {
Austin Schuhdae1e8d2022-03-26 15:09:31 -070056 if (app->name()->string_view() == name) {
57 return name;
58 }
59 }
60 for (const auto app : *config->applications()) {
Milind Upadhyay7641ce82021-04-10 14:15:28 -070061 if (app->has_executable_name() &&
milind upadhyay4272f382021-04-07 18:03:08 -070062 app->executable_name()->string_view() == name) {
63 app_name = app->name()->string_view();
64 break;
65 }
66 }
67 return app_name;
68}
69
James Kuszmaul293b2172021-11-10 16:20:48 -080070StarterClient::StarterClient(EventLoop *event_loop)
71 : event_loop_(event_loop),
72 timeout_timer_(event_loop_->AddTimer([this]() { Timeout(); })),
73 cmd_sender_(event_loop_->MakeSender<StarterRpc>("/aos")) {
James Kuszmaul06a8f352024-03-15 14:15:57 -070074 timeout_timer_->set_name("rpc_timeout");
James Kuszmaul293b2172021-11-10 16:20:48 -080075 if (configuration::MultiNode(event_loop_->configuration())) {
76 for (const aos::Node *node :
77 configuration::GetNodes(event_loop_->configuration())) {
78 const Channel *channel =
79 StatusChannelForNode(event_loop_->configuration(), node);
80 CHECK(channel != nullptr) << ": Failed to find channel /aos for "
81 << Status::GetFullyQualifiedName() << " on "
82 << node->name()->string_view();
83 if (!configuration::ChannelIsReadableOnNode(channel,
84 event_loop_->node())) {
James Kuszmaul9d36a622021-11-10 16:48:00 -080085 VLOG(1) << "Status channel "
86 << configuration::StrippedChannelToString(channel)
87 << " is not readable on "
88 << event_loop_->node()->name()->string_view();
James Kuszmaul293b2172021-11-10 16:20:48 -080089 } else if (!configuration::ChannelIsReadableOnNode(
90 StarterRpcChannelForNode(event_loop_->configuration(),
91 event_loop_->node()),
92 node)) {
93 // Don't attempt to construct a status fetcher if the other node won't
94 // even be able to receive our commands.
James Kuszmaul9d36a622021-11-10 16:48:00 -080095 VLOG(1) << "StarterRpc channel for "
96 << event_loop_->node()->name()->string_view()
97 << " is not readable on " << node->name()->string_view();
James Kuszmaul293b2172021-11-10 16:20:48 -080098 } else {
99 status_fetchers_[node->name()->str()] =
100 event_loop_->MakeFetcher<Status>(channel->name()->string_view());
101 event_loop_->MakeNoArgWatcher<Status>(channel->name()->string_view(),
102 [this]() {
103 if (CheckCommandsSucceeded()) {
104 Succeed();
105 }
106 });
107 }
108 }
109 } else {
110 status_fetchers_[""] = event_loop_->MakeFetcher<Status>("/aos");
111 event_loop_->MakeNoArgWatcher<Status>("/aos", [this]() {
112 if (CheckCommandsSucceeded()) {
113 Succeed();
114 }
115 });
116 }
Austin Schuhe4b748a2021-10-16 14:19:58 -0700117}
118
James Kuszmaul293b2172021-11-10 16:20:48 -0800119void StarterClient::SendCommands(
120 const std::vector<ApplicationCommand> &commands,
121 monotonic_clock::duration timeout) {
122 CHECK(current_commands_.empty());
123 for (auto &pair : status_fetchers_) {
124 pair.second.Fetch();
125 }
126 const bool is_multi_node =
127 aos::configuration::MultiNode(event_loop_->configuration());
128 for (const auto &command : commands) {
129 auto builder = cmd_sender_.MakeBuilder();
130 const auto application_offset =
131 builder.fbb()->CreateString(command.application);
132 std::vector<flatbuffers::Offset<flatbuffers::String>> node_offsets;
133 CHECK(!command.nodes.empty())
134 << "At least one node must be specified for application "
135 << command.application;
136 for (const aos::Node *node : command.nodes) {
137 const std::string node_name((node == nullptr) ? "" : node->name()->str());
138 if (status_fetchers_.count(node_name) == 0) {
139 if (is_multi_node) {
140 LOG(FATAL) << "Node \"" << node_name
141 << "\" must be configured to both receive StarterRpc "
142 "messages from \""
143 << event_loop_->node()->name()->string_view()
144 << "\" as well as to send starter Status messages back.";
145 } else {
146 LOG(FATAL) << "On single-node configs, use an empty string for the "
147 "node name.";
148 }
149 }
James Kuszmaul43d70162023-03-04 18:25:14 -0800150 if (status_fetchers_[node_name].get() == nullptr) {
151 LOG(WARNING) << ": No status available for node " << node_name
152 << "; not executing commands for that node.";
153 continue;
154 }
James Kuszmaul293b2172021-11-10 16:20:48 -0800155 if (is_multi_node) {
156 node_offsets.push_back(builder.fbb()->CreateString(node_name));
157 }
Austin Schuh6bdcc372024-06-27 14:49:11 -0700158 const ApplicationStatus *last_status = FindApplicationStatus(
159 *status_fetchers_[node_name], command.application);
160 CHECK(last_status != nullptr);
James Kuszmaul293b2172021-11-10 16:20:48 -0800161 current_commands_[node_name].push_back(CommandStatus{
162 .expected_state = ExpectedStateForCommand(command.command),
163 .application = std::string(command.application),
164 .old_id = std::nullopt});
165 // If we are restarting, then we need to track what the current ID of the
166 // process is to ensure that it actually got restarted. For just starting,
167 // we leave the application running and so don't care.
168 if (command.command == Command::RESTART && last_status->has_id()) {
169 current_commands_[node_name].back().old_id = last_status->id();
170 }
171 }
172 flatbuffers::Offset<
173 flatbuffers::Vector<flatbuffers::Offset<flatbuffers::String>>>
174 nodes_offset;
175 if (is_multi_node) {
176 nodes_offset = builder.fbb()->CreateVector(node_offsets);
177 }
178 auto command_builder = builder.MakeBuilder<StarterRpc>();
179 command_builder.add_command(command.command);
180 command_builder.add_name(application_offset);
181 if (is_multi_node) {
182 command_builder.add_nodes(nodes_offset);
183 }
milind1f1dca32021-07-03 13:50:07 -0700184 builder.CheckOk(builder.Send(command_builder.Finish()));
James Kuszmaul293b2172021-11-10 16:20:48 -0800185 }
186
Philipp Schradera6712522023-07-05 20:25:11 -0700187 timeout_timer_->Schedule(event_loop_->monotonic_now() + timeout);
James Kuszmaul293b2172021-11-10 16:20:48 -0800188}
189
190bool StarterClient::CheckCommandsSucceeded() {
191 if (current_commands_.empty()) {
192 return false;
193 }
194
195 for (auto &pair : status_fetchers_) {
196 pair.second.Fetch();
197 }
198
199 bool succeeded = true;
200
201 for (const auto &pair : current_commands_) {
202 if (pair.second.empty()) {
203 continue;
204 }
205 CHECK(status_fetchers_[pair.first].get() != nullptr)
206 << ": No status available for node " << pair.first;
207 const Status &status = *status_fetchers_[pair.first];
208 for (const auto &command : pair.second) {
209 const ApplicationStatus *application_status =
Austin Schuh6bdcc372024-06-27 14:49:11 -0700210 FindApplicationStatus(status, command.application);
211 CHECK(application_status != nullptr);
James Kuszmaul293b2172021-11-10 16:20:48 -0800212 if (application_status->state() == command.expected_state) {
213 if (command.expected_state == State::RUNNING &&
214 application_status->id() == command.old_id) {
215 succeeded = false;
216 }
217 } else {
218 succeeded = false;
219 }
220 }
221 }
222 return succeeded;
223}
224
225void StarterClient::Timeout() {
226 // Clear commands prior to calling handlers to allow the handler to call
227 // SendCommands() again if desired.
228 current_commands_.clear();
229 if (timeout_handler_) {
230 timeout_handler_();
231 }
232}
233
234void StarterClient::Succeed() {
235 // Clear commands prior to calling handlers to allow the handler to call
236 // SendCommands() again if desired.
237 current_commands_.clear();
Austin Schuh59398d32023-05-03 08:10:55 -0700238 // Clear the timer before calling success handler, in case the success
239 // handler needs to modify timeout handler.
240 timeout_timer_->Disable();
James Kuszmaul293b2172021-11-10 16:20:48 -0800241 if (success_handler_) {
242 success_handler_();
243 }
James Kuszmaul293b2172021-11-10 16:20:48 -0800244}
245
246bool SendCommandBlocking(aos::starter::Command command, std::string_view name,
247 const aos::Configuration *config,
248 std::chrono::milliseconds timeout,
249 std::vector<const aos::Node *> nodes) {
250 return SendCommandBlocking({{command, name, nodes}}, config, timeout);
251}
252
253bool SendCommandBlocking(const std::vector<ApplicationCommand> &commands,
254 const aos::Configuration *config,
255 std::chrono::milliseconds timeout) {
Tyler Chatowa79419d2020-08-12 20:12:11 -0700256 aos::ShmEventLoop event_loop(config);
257 event_loop.SkipAosLog();
258
James Kuszmaul293b2172021-11-10 16:20:48 -0800259 StarterClient client(&event_loop);
Tyler Chatowa79419d2020-08-12 20:12:11 -0700260
Austin Schuhe4b748a2021-10-16 14:19:58 -0700261 // Wait until event loop starts to send all commands so the watcher is ready
James Kuszmaul293b2172021-11-10 16:20:48 -0800262 event_loop.OnRun([&commands, &client, timeout]() {
263 client.SendCommands(commands, timeout);
Tyler Chatowa79419d2020-08-12 20:12:11 -0700264 });
265
266 // If still waiting after timeout milliseconds, exit the loop
James Kuszmaul293b2172021-11-10 16:20:48 -0800267 client.SetTimeoutHandler([&event_loop]() { event_loop.Exit(); });
Tyler Chatowa79419d2020-08-12 20:12:11 -0700268
Tyler Chatowa79419d2020-08-12 20:12:11 -0700269 bool success = false;
Tyler Chatowa79419d2020-08-12 20:12:11 -0700270
James Kuszmaul293b2172021-11-10 16:20:48 -0800271 client.SetSuccessHandler([&event_loop, &success]() {
272 success = true;
273 event_loop.Exit();
Austin Schuhe4b748a2021-10-16 14:19:58 -0700274 });
Tyler Chatowa79419d2020-08-12 20:12:11 -0700275
276 event_loop.Run();
277
278 return success;
279}
280
James Kuszmaul2ca441b2022-01-07 18:16:23 -0800281const std::optional<
282 std::pair<aos::monotonic_clock::time_point,
283 FlatbufferDetachedBuffer<aos::starter::ApplicationStatus>>>
James Kuszmaule4bb0a22022-01-07 18:14:43 -0800284GetStatus(std::string_view name, const Configuration *config,
285 const aos::Node *node) {
Tyler Chatowa79419d2020-08-12 20:12:11 -0700286 ShmEventLoop event_loop(config);
287 event_loop.SkipAosLog();
288
James Kuszmaul293b2172021-11-10 16:20:48 -0800289 auto status_fetcher = event_loop.MakeFetcher<aos::starter::Status>(
290 StatusChannelForNode(config, node)->name()->string_view());
Tyler Chatowa79419d2020-08-12 20:12:11 -0700291 status_fetcher.Fetch();
James Kuszmaule4bb0a22022-01-07 18:14:43 -0800292 if (status_fetcher.get() != nullptr) {
293 const aos::starter::ApplicationStatus *status =
294 FindApplicationStatus(*status_fetcher, name);
295 if (status != nullptr) {
James Kuszmaul2ca441b2022-01-07 18:16:23 -0800296 return std::make_pair(status_fetcher.context().monotonic_remote_time,
297 aos::CopyFlatBuffer(status));
James Kuszmaule4bb0a22022-01-07 18:14:43 -0800298 }
299 }
300 return std::nullopt;
Tyler Chatowa79419d2020-08-12 20:12:11 -0700301}
302
James Kuszmaul293b2172021-11-10 16:20:48 -0800303std::optional<std::pair<aos::monotonic_clock::time_point,
304 const aos::FlatbufferVector<aos::starter::Status>>>
305GetStarterStatus(const aos::Configuration *config, const aos::Node *node) {
Philipp Schrader08537492021-01-23 16:17:55 -0800306 ShmEventLoop event_loop(config);
307 event_loop.SkipAosLog();
308
James Kuszmaul293b2172021-11-10 16:20:48 -0800309 auto status_fetcher = event_loop.MakeFetcher<aos::starter::Status>(
310 StatusChannelForNode(config, node)->name()->string_view());
Philipp Schrader08537492021-01-23 16:17:55 -0800311 status_fetcher.Fetch();
James Kuszmaul293b2172021-11-10 16:20:48 -0800312 return (status_fetcher.get() == nullptr)
313 ? std::nullopt
314 : std::make_optional(std::make_pair(
315 status_fetcher.context().monotonic_remote_time,
316 status_fetcher.CopyFlatBuffer()));
Philipp Schrader08537492021-01-23 16:17:55 -0800317}
318
Stephan Pleinesf63bde82024-01-13 15:59:33 -0800319} // namespace aos::starter