blob: 4a54f712c9fa04420351e3acfa585f69e865f9c7 [file] [log] [blame]
Philipp Schrader790cb542023-07-05 21:06:52 -07001#include "aos/starter/starter_rpc_lib.h"
2
Stephan Pleinesf581a072024-05-23 20:59:27 -07003#include <algorithm>
4#include <ostream>
5
Austin Schuh99f7c6a2024-06-25 22:07:44 -07006#include "absl/log/check.h"
7#include "absl/log/log.h"
Stephan Pleinesf581a072024-05-23 20:59:27 -07008#include "flatbuffers/buffer.h"
9#include "flatbuffers/flatbuffer_builder.h"
10#include "flatbuffers/string.h"
11#include "flatbuffers/vector.h"
Tyler Chatowa79419d2020-08-12 20:12:11 -070012
Stephan Pleinesf581a072024-05-23 20:59:27 -070013#include "aos/events/context.h"
Tyler Chatowa79419d2020-08-12 20:12:11 -070014#include "aos/events/shm_event_loop.h"
15#include "aos/flatbuffer_merge.h"
James Kuszmaul293b2172021-11-10 16:20:48 -080016#include "aos/starter/starterd_lib.h"
Tyler Chatowa79419d2020-08-12 20:12:11 -070017
Stephan Pleinesf63bde82024-01-13 15:59:33 -080018namespace aos::starter {
Tyler Chatowa79419d2020-08-12 20:12:11 -070019
James Kuszmaul293b2172021-11-10 16:20:48 -080020namespace {
21State ExpectedStateForCommand(Command command) {
22 switch (command) {
23 case Command::START:
24 case Command::RESTART:
25 return State::RUNNING;
26 case Command::STOP:
27 return State::STOPPED;
28 }
29 return State::STOPPED;
30}
31} // namespace
32
Tyler Chatowa79419d2020-08-12 20:12:11 -070033const aos::starter::ApplicationStatus *FindApplicationStatus(
34 const aos::starter::Status &status, std::string_view name) {
35 if (!status.has_statuses()) {
36 return nullptr;
37 }
38
39 auto statuses = status.statuses();
40
41 auto search =
42 std::find_if(statuses->begin(), statuses->end(),
43 [name](const aos::starter::ApplicationStatus *app_status) {
44 return app_status->has_name() &&
45 app_status->name()->string_view() == name;
46 });
47 if (search == statuses->end()) {
48 return nullptr;
49 }
50 return *search;
51}
52
Austin Schuhdae1e8d2022-03-26 15:09:31 -070053std::string_view FindApplication(const std::string_view name,
milind upadhyay4272f382021-04-07 18:03:08 -070054 const aos::Configuration *config) {
55 std::string_view app_name = name;
56 for (const auto app : *config->applications()) {
Austin Schuhdae1e8d2022-03-26 15:09:31 -070057 if (app->name()->string_view() == name) {
58 return name;
59 }
60 }
61 for (const auto app : *config->applications()) {
Milind Upadhyay7641ce82021-04-10 14:15:28 -070062 if (app->has_executable_name() &&
milind upadhyay4272f382021-04-07 18:03:08 -070063 app->executable_name()->string_view() == name) {
64 app_name = app->name()->string_view();
65 break;
66 }
67 }
68 return app_name;
69}
70
James Kuszmaul293b2172021-11-10 16:20:48 -080071StarterClient::StarterClient(EventLoop *event_loop)
72 : event_loop_(event_loop),
73 timeout_timer_(event_loop_->AddTimer([this]() { Timeout(); })),
74 cmd_sender_(event_loop_->MakeSender<StarterRpc>("/aos")) {
James Kuszmaul06a8f352024-03-15 14:15:57 -070075 timeout_timer_->set_name("rpc_timeout");
James Kuszmaul293b2172021-11-10 16:20:48 -080076 if (configuration::MultiNode(event_loop_->configuration())) {
77 for (const aos::Node *node :
78 configuration::GetNodes(event_loop_->configuration())) {
79 const Channel *channel =
80 StatusChannelForNode(event_loop_->configuration(), node);
81 CHECK(channel != nullptr) << ": Failed to find channel /aos for "
82 << Status::GetFullyQualifiedName() << " on "
83 << node->name()->string_view();
84 if (!configuration::ChannelIsReadableOnNode(channel,
85 event_loop_->node())) {
James Kuszmaul9d36a622021-11-10 16:48:00 -080086 VLOG(1) << "Status channel "
87 << configuration::StrippedChannelToString(channel)
88 << " is not readable on "
89 << event_loop_->node()->name()->string_view();
James Kuszmaul293b2172021-11-10 16:20:48 -080090 } else if (!configuration::ChannelIsReadableOnNode(
91 StarterRpcChannelForNode(event_loop_->configuration(),
92 event_loop_->node()),
93 node)) {
94 // Don't attempt to construct a status fetcher if the other node won't
95 // even be able to receive our commands.
James Kuszmaul9d36a622021-11-10 16:48:00 -080096 VLOG(1) << "StarterRpc channel for "
97 << event_loop_->node()->name()->string_view()
98 << " is not readable on " << node->name()->string_view();
James Kuszmaul293b2172021-11-10 16:20:48 -080099 } else {
100 status_fetchers_[node->name()->str()] =
101 event_loop_->MakeFetcher<Status>(channel->name()->string_view());
102 event_loop_->MakeNoArgWatcher<Status>(channel->name()->string_view(),
103 [this]() {
104 if (CheckCommandsSucceeded()) {
105 Succeed();
106 }
107 });
108 }
109 }
110 } else {
111 status_fetchers_[""] = event_loop_->MakeFetcher<Status>("/aos");
112 event_loop_->MakeNoArgWatcher<Status>("/aos", [this]() {
113 if (CheckCommandsSucceeded()) {
114 Succeed();
115 }
116 });
117 }
Austin Schuhe4b748a2021-10-16 14:19:58 -0700118}
119
James Kuszmaul293b2172021-11-10 16:20:48 -0800120void StarterClient::SendCommands(
121 const std::vector<ApplicationCommand> &commands,
122 monotonic_clock::duration timeout) {
123 CHECK(current_commands_.empty());
124 for (auto &pair : status_fetchers_) {
125 pair.second.Fetch();
126 }
127 const bool is_multi_node =
128 aos::configuration::MultiNode(event_loop_->configuration());
129 for (const auto &command : commands) {
130 auto builder = cmd_sender_.MakeBuilder();
131 const auto application_offset =
132 builder.fbb()->CreateString(command.application);
133 std::vector<flatbuffers::Offset<flatbuffers::String>> node_offsets;
134 CHECK(!command.nodes.empty())
135 << "At least one node must be specified for application "
136 << command.application;
137 for (const aos::Node *node : command.nodes) {
138 const std::string node_name((node == nullptr) ? "" : node->name()->str());
139 if (status_fetchers_.count(node_name) == 0) {
140 if (is_multi_node) {
141 LOG(FATAL) << "Node \"" << node_name
142 << "\" must be configured to both receive StarterRpc "
143 "messages from \""
144 << event_loop_->node()->name()->string_view()
145 << "\" as well as to send starter Status messages back.";
146 } else {
147 LOG(FATAL) << "On single-node configs, use an empty string for the "
148 "node name.";
149 }
150 }
James Kuszmaul43d70162023-03-04 18:25:14 -0800151 if (status_fetchers_[node_name].get() == nullptr) {
152 LOG(WARNING) << ": No status available for node " << node_name
153 << "; not executing commands for that node.";
154 continue;
155 }
James Kuszmaul293b2172021-11-10 16:20:48 -0800156 if (is_multi_node) {
157 node_offsets.push_back(builder.fbb()->CreateString(node_name));
158 }
Austin Schuh6bdcc372024-06-27 14:49:11 -0700159 const ApplicationStatus *last_status = FindApplicationStatus(
160 *status_fetchers_[node_name], command.application);
161 CHECK(last_status != nullptr);
James Kuszmaul293b2172021-11-10 16:20:48 -0800162 current_commands_[node_name].push_back(CommandStatus{
163 .expected_state = ExpectedStateForCommand(command.command),
164 .application = std::string(command.application),
165 .old_id = std::nullopt});
166 // If we are restarting, then we need to track what the current ID of the
167 // process is to ensure that it actually got restarted. For just starting,
168 // we leave the application running and so don't care.
169 if (command.command == Command::RESTART && last_status->has_id()) {
170 current_commands_[node_name].back().old_id = last_status->id();
171 }
172 }
173 flatbuffers::Offset<
174 flatbuffers::Vector<flatbuffers::Offset<flatbuffers::String>>>
175 nodes_offset;
176 if (is_multi_node) {
177 nodes_offset = builder.fbb()->CreateVector(node_offsets);
178 }
179 auto command_builder = builder.MakeBuilder<StarterRpc>();
180 command_builder.add_command(command.command);
181 command_builder.add_name(application_offset);
182 if (is_multi_node) {
183 command_builder.add_nodes(nodes_offset);
184 }
milind1f1dca32021-07-03 13:50:07 -0700185 builder.CheckOk(builder.Send(command_builder.Finish()));
James Kuszmaul293b2172021-11-10 16:20:48 -0800186 }
187
Philipp Schradera6712522023-07-05 20:25:11 -0700188 timeout_timer_->Schedule(event_loop_->monotonic_now() + timeout);
James Kuszmaul293b2172021-11-10 16:20:48 -0800189}
190
191bool StarterClient::CheckCommandsSucceeded() {
192 if (current_commands_.empty()) {
193 return false;
194 }
195
196 for (auto &pair : status_fetchers_) {
197 pair.second.Fetch();
198 }
199
200 bool succeeded = true;
201
202 for (const auto &pair : current_commands_) {
203 if (pair.second.empty()) {
204 continue;
205 }
206 CHECK(status_fetchers_[pair.first].get() != nullptr)
207 << ": No status available for node " << pair.first;
208 const Status &status = *status_fetchers_[pair.first];
209 for (const auto &command : pair.second) {
210 const ApplicationStatus *application_status =
Austin Schuh6bdcc372024-06-27 14:49:11 -0700211 FindApplicationStatus(status, command.application);
212 CHECK(application_status != nullptr);
James Kuszmaul293b2172021-11-10 16:20:48 -0800213 if (application_status->state() == command.expected_state) {
214 if (command.expected_state == State::RUNNING &&
215 application_status->id() == command.old_id) {
216 succeeded = false;
217 }
218 } else {
219 succeeded = false;
220 }
221 }
222 }
223 return succeeded;
224}
225
226void StarterClient::Timeout() {
227 // Clear commands prior to calling handlers to allow the handler to call
228 // SendCommands() again if desired.
229 current_commands_.clear();
230 if (timeout_handler_) {
231 timeout_handler_();
232 }
233}
234
235void StarterClient::Succeed() {
236 // Clear commands prior to calling handlers to allow the handler to call
237 // SendCommands() again if desired.
238 current_commands_.clear();
Austin Schuh59398d32023-05-03 08:10:55 -0700239 // Clear the timer before calling success handler, in case the success
240 // handler needs to modify timeout handler.
241 timeout_timer_->Disable();
James Kuszmaul293b2172021-11-10 16:20:48 -0800242 if (success_handler_) {
243 success_handler_();
244 }
James Kuszmaul293b2172021-11-10 16:20:48 -0800245}
246
247bool SendCommandBlocking(aos::starter::Command command, std::string_view name,
248 const aos::Configuration *config,
249 std::chrono::milliseconds timeout,
250 std::vector<const aos::Node *> nodes) {
251 return SendCommandBlocking({{command, name, nodes}}, config, timeout);
252}
253
254bool SendCommandBlocking(const std::vector<ApplicationCommand> &commands,
255 const aos::Configuration *config,
256 std::chrono::milliseconds timeout) {
Tyler Chatowa79419d2020-08-12 20:12:11 -0700257 aos::ShmEventLoop event_loop(config);
258 event_loop.SkipAosLog();
259
James Kuszmaul293b2172021-11-10 16:20:48 -0800260 StarterClient client(&event_loop);
Tyler Chatowa79419d2020-08-12 20:12:11 -0700261
Austin Schuhe4b748a2021-10-16 14:19:58 -0700262 // Wait until event loop starts to send all commands so the watcher is ready
James Kuszmaul293b2172021-11-10 16:20:48 -0800263 event_loop.OnRun([&commands, &client, timeout]() {
264 client.SendCommands(commands, timeout);
Tyler Chatowa79419d2020-08-12 20:12:11 -0700265 });
266
267 // If still waiting after timeout milliseconds, exit the loop
James Kuszmaul293b2172021-11-10 16:20:48 -0800268 client.SetTimeoutHandler([&event_loop]() { event_loop.Exit(); });
Tyler Chatowa79419d2020-08-12 20:12:11 -0700269
Tyler Chatowa79419d2020-08-12 20:12:11 -0700270 bool success = false;
Tyler Chatowa79419d2020-08-12 20:12:11 -0700271
James Kuszmaul293b2172021-11-10 16:20:48 -0800272 client.SetSuccessHandler([&event_loop, &success]() {
273 success = true;
274 event_loop.Exit();
Austin Schuhe4b748a2021-10-16 14:19:58 -0700275 });
Tyler Chatowa79419d2020-08-12 20:12:11 -0700276
277 event_loop.Run();
278
279 return success;
280}
281
James Kuszmaul2ca441b2022-01-07 18:16:23 -0800282const std::optional<
283 std::pair<aos::monotonic_clock::time_point,
284 FlatbufferDetachedBuffer<aos::starter::ApplicationStatus>>>
James Kuszmaule4bb0a22022-01-07 18:14:43 -0800285GetStatus(std::string_view name, const Configuration *config,
286 const aos::Node *node) {
Tyler Chatowa79419d2020-08-12 20:12:11 -0700287 ShmEventLoop event_loop(config);
288 event_loop.SkipAosLog();
289
James Kuszmaul293b2172021-11-10 16:20:48 -0800290 auto status_fetcher = event_loop.MakeFetcher<aos::starter::Status>(
291 StatusChannelForNode(config, node)->name()->string_view());
Tyler Chatowa79419d2020-08-12 20:12:11 -0700292 status_fetcher.Fetch();
James Kuszmaule4bb0a22022-01-07 18:14:43 -0800293 if (status_fetcher.get() != nullptr) {
294 const aos::starter::ApplicationStatus *status =
295 FindApplicationStatus(*status_fetcher, name);
296 if (status != nullptr) {
James Kuszmaul2ca441b2022-01-07 18:16:23 -0800297 return std::make_pair(status_fetcher.context().monotonic_remote_time,
298 aos::CopyFlatBuffer(status));
James Kuszmaule4bb0a22022-01-07 18:14:43 -0800299 }
300 }
301 return std::nullopt;
Tyler Chatowa79419d2020-08-12 20:12:11 -0700302}
303
James Kuszmaul293b2172021-11-10 16:20:48 -0800304std::optional<std::pair<aos::monotonic_clock::time_point,
305 const aos::FlatbufferVector<aos::starter::Status>>>
306GetStarterStatus(const aos::Configuration *config, const aos::Node *node) {
Philipp Schrader08537492021-01-23 16:17:55 -0800307 ShmEventLoop event_loop(config);
308 event_loop.SkipAosLog();
309
James Kuszmaul293b2172021-11-10 16:20:48 -0800310 auto status_fetcher = event_loop.MakeFetcher<aos::starter::Status>(
311 StatusChannelForNode(config, node)->name()->string_view());
Philipp Schrader08537492021-01-23 16:17:55 -0800312 status_fetcher.Fetch();
James Kuszmaul293b2172021-11-10 16:20:48 -0800313 return (status_fetcher.get() == nullptr)
314 ? std::nullopt
315 : std::make_optional(std::make_pair(
316 status_fetcher.context().monotonic_remote_time,
317 status_fetcher.CopyFlatBuffer()));
Philipp Schrader08537492021-01-23 16:17:55 -0800318}
319
Stephan Pleinesf63bde82024-01-13 15:59:33 -0800320} // namespace aos::starter