blob: 754fc2cc8959b89a122494ae7c67048f7f7834b1 [file] [log] [blame]
Philipp Schrader790cb542023-07-05 21:06:52 -07001#include "aos/starter/starter_rpc_lib.h"
2
3#include "glog/logging.h"
Tyler Chatowa79419d2020-08-12 20:12:11 -07004
5#include "aos/events/shm_event_loop.h"
6#include "aos/flatbuffer_merge.h"
James Kuszmaul293b2172021-11-10 16:20:48 -08007#include "aos/starter/starterd_lib.h"
Tyler Chatowa79419d2020-08-12 20:12:11 -07008
Stephan Pleinesf63bde82024-01-13 15:59:33 -08009namespace aos::starter {
Tyler Chatowa79419d2020-08-12 20:12:11 -070010
James Kuszmaul293b2172021-11-10 16:20:48 -080011namespace {
12State ExpectedStateForCommand(Command command) {
13 switch (command) {
14 case Command::START:
15 case Command::RESTART:
16 return State::RUNNING;
17 case Command::STOP:
18 return State::STOPPED;
19 }
20 return State::STOPPED;
21}
22} // namespace
23
Tyler Chatowa79419d2020-08-12 20:12:11 -070024const aos::starter::ApplicationStatus *FindApplicationStatus(
25 const aos::starter::Status &status, std::string_view name) {
26 if (!status.has_statuses()) {
27 return nullptr;
28 }
29
30 auto statuses = status.statuses();
31
32 auto search =
33 std::find_if(statuses->begin(), statuses->end(),
34 [name](const aos::starter::ApplicationStatus *app_status) {
35 return app_status->has_name() &&
36 app_status->name()->string_view() == name;
37 });
38 if (search == statuses->end()) {
39 return nullptr;
40 }
41 return *search;
42}
43
Austin Schuhdae1e8d2022-03-26 15:09:31 -070044std::string_view FindApplication(const std::string_view name,
milind upadhyay4272f382021-04-07 18:03:08 -070045 const aos::Configuration *config) {
46 std::string_view app_name = name;
47 for (const auto app : *config->applications()) {
Austin Schuhdae1e8d2022-03-26 15:09:31 -070048 if (app->name()->string_view() == name) {
49 return name;
50 }
51 }
52 for (const auto app : *config->applications()) {
Milind Upadhyay7641ce82021-04-10 14:15:28 -070053 if (app->has_executable_name() &&
milind upadhyay4272f382021-04-07 18:03:08 -070054 app->executable_name()->string_view() == name) {
55 app_name = app->name()->string_view();
56 break;
57 }
58 }
59 return app_name;
60}
61
James Kuszmaul293b2172021-11-10 16:20:48 -080062StarterClient::StarterClient(EventLoop *event_loop)
63 : event_loop_(event_loop),
64 timeout_timer_(event_loop_->AddTimer([this]() { Timeout(); })),
65 cmd_sender_(event_loop_->MakeSender<StarterRpc>("/aos")) {
James Kuszmaul06a8f352024-03-15 14:15:57 -070066 timeout_timer_->set_name("rpc_timeout");
James Kuszmaul293b2172021-11-10 16:20:48 -080067 if (configuration::MultiNode(event_loop_->configuration())) {
68 for (const aos::Node *node :
69 configuration::GetNodes(event_loop_->configuration())) {
70 const Channel *channel =
71 StatusChannelForNode(event_loop_->configuration(), node);
72 CHECK(channel != nullptr) << ": Failed to find channel /aos for "
73 << Status::GetFullyQualifiedName() << " on "
74 << node->name()->string_view();
75 if (!configuration::ChannelIsReadableOnNode(channel,
76 event_loop_->node())) {
James Kuszmaul9d36a622021-11-10 16:48:00 -080077 VLOG(1) << "Status channel "
78 << configuration::StrippedChannelToString(channel)
79 << " is not readable on "
80 << event_loop_->node()->name()->string_view();
James Kuszmaul293b2172021-11-10 16:20:48 -080081 } else if (!configuration::ChannelIsReadableOnNode(
82 StarterRpcChannelForNode(event_loop_->configuration(),
83 event_loop_->node()),
84 node)) {
85 // Don't attempt to construct a status fetcher if the other node won't
86 // even be able to receive our commands.
James Kuszmaul9d36a622021-11-10 16:48:00 -080087 VLOG(1) << "StarterRpc channel for "
88 << event_loop_->node()->name()->string_view()
89 << " is not readable on " << node->name()->string_view();
James Kuszmaul293b2172021-11-10 16:20:48 -080090 } else {
91 status_fetchers_[node->name()->str()] =
92 event_loop_->MakeFetcher<Status>(channel->name()->string_view());
93 event_loop_->MakeNoArgWatcher<Status>(channel->name()->string_view(),
94 [this]() {
95 if (CheckCommandsSucceeded()) {
96 Succeed();
97 }
98 });
99 }
100 }
101 } else {
102 status_fetchers_[""] = event_loop_->MakeFetcher<Status>("/aos");
103 event_loop_->MakeNoArgWatcher<Status>("/aos", [this]() {
104 if (CheckCommandsSucceeded()) {
105 Succeed();
106 }
107 });
108 }
Austin Schuhe4b748a2021-10-16 14:19:58 -0700109}
110
James Kuszmaul293b2172021-11-10 16:20:48 -0800111void StarterClient::SendCommands(
112 const std::vector<ApplicationCommand> &commands,
113 monotonic_clock::duration timeout) {
114 CHECK(current_commands_.empty());
115 for (auto &pair : status_fetchers_) {
116 pair.second.Fetch();
117 }
118 const bool is_multi_node =
119 aos::configuration::MultiNode(event_loop_->configuration());
120 for (const auto &command : commands) {
121 auto builder = cmd_sender_.MakeBuilder();
122 const auto application_offset =
123 builder.fbb()->CreateString(command.application);
124 std::vector<flatbuffers::Offset<flatbuffers::String>> node_offsets;
125 CHECK(!command.nodes.empty())
126 << "At least one node must be specified for application "
127 << command.application;
128 for (const aos::Node *node : command.nodes) {
129 const std::string node_name((node == nullptr) ? "" : node->name()->str());
130 if (status_fetchers_.count(node_name) == 0) {
131 if (is_multi_node) {
132 LOG(FATAL) << "Node \"" << node_name
133 << "\" must be configured to both receive StarterRpc "
134 "messages from \""
135 << event_loop_->node()->name()->string_view()
136 << "\" as well as to send starter Status messages back.";
137 } else {
138 LOG(FATAL) << "On single-node configs, use an empty string for the "
139 "node name.";
140 }
141 }
James Kuszmaul43d70162023-03-04 18:25:14 -0800142 if (status_fetchers_[node_name].get() == nullptr) {
143 LOG(WARNING) << ": No status available for node " << node_name
144 << "; not executing commands for that node.";
145 continue;
146 }
James Kuszmaul293b2172021-11-10 16:20:48 -0800147 if (is_multi_node) {
148 node_offsets.push_back(builder.fbb()->CreateString(node_name));
149 }
150 const ApplicationStatus *last_status =
151 CHECK_NOTNULL(FindApplicationStatus(*status_fetchers_[node_name],
152 command.application));
153 current_commands_[node_name].push_back(CommandStatus{
154 .expected_state = ExpectedStateForCommand(command.command),
155 .application = std::string(command.application),
156 .old_id = std::nullopt});
157 // If we are restarting, then we need to track what the current ID of the
158 // process is to ensure that it actually got restarted. For just starting,
159 // we leave the application running and so don't care.
160 if (command.command == Command::RESTART && last_status->has_id()) {
161 current_commands_[node_name].back().old_id = last_status->id();
162 }
163 }
164 flatbuffers::Offset<
165 flatbuffers::Vector<flatbuffers::Offset<flatbuffers::String>>>
166 nodes_offset;
167 if (is_multi_node) {
168 nodes_offset = builder.fbb()->CreateVector(node_offsets);
169 }
170 auto command_builder = builder.MakeBuilder<StarterRpc>();
171 command_builder.add_command(command.command);
172 command_builder.add_name(application_offset);
173 if (is_multi_node) {
174 command_builder.add_nodes(nodes_offset);
175 }
milind1f1dca32021-07-03 13:50:07 -0700176 builder.CheckOk(builder.Send(command_builder.Finish()));
James Kuszmaul293b2172021-11-10 16:20:48 -0800177 }
178
Philipp Schradera6712522023-07-05 20:25:11 -0700179 timeout_timer_->Schedule(event_loop_->monotonic_now() + timeout);
James Kuszmaul293b2172021-11-10 16:20:48 -0800180}
181
182bool StarterClient::CheckCommandsSucceeded() {
183 if (current_commands_.empty()) {
184 return false;
185 }
186
187 for (auto &pair : status_fetchers_) {
188 pair.second.Fetch();
189 }
190
191 bool succeeded = true;
192
193 for (const auto &pair : current_commands_) {
194 if (pair.second.empty()) {
195 continue;
196 }
197 CHECK(status_fetchers_[pair.first].get() != nullptr)
198 << ": No status available for node " << pair.first;
199 const Status &status = *status_fetchers_[pair.first];
200 for (const auto &command : pair.second) {
201 const ApplicationStatus *application_status =
202 CHECK_NOTNULL(FindApplicationStatus(status, command.application));
203 if (application_status->state() == command.expected_state) {
204 if (command.expected_state == State::RUNNING &&
205 application_status->id() == command.old_id) {
206 succeeded = false;
207 }
208 } else {
209 succeeded = false;
210 }
211 }
212 }
213 return succeeded;
214}
215
216void StarterClient::Timeout() {
217 // Clear commands prior to calling handlers to allow the handler to call
218 // SendCommands() again if desired.
219 current_commands_.clear();
220 if (timeout_handler_) {
221 timeout_handler_();
222 }
223}
224
225void StarterClient::Succeed() {
226 // Clear commands prior to calling handlers to allow the handler to call
227 // SendCommands() again if desired.
228 current_commands_.clear();
Austin Schuh59398d32023-05-03 08:10:55 -0700229 // Clear the timer before calling success handler, in case the success
230 // handler needs to modify timeout handler.
231 timeout_timer_->Disable();
James Kuszmaul293b2172021-11-10 16:20:48 -0800232 if (success_handler_) {
233 success_handler_();
234 }
James Kuszmaul293b2172021-11-10 16:20:48 -0800235}
236
237bool SendCommandBlocking(aos::starter::Command command, std::string_view name,
238 const aos::Configuration *config,
239 std::chrono::milliseconds timeout,
240 std::vector<const aos::Node *> nodes) {
241 return SendCommandBlocking({{command, name, nodes}}, config, timeout);
242}
243
244bool SendCommandBlocking(const std::vector<ApplicationCommand> &commands,
245 const aos::Configuration *config,
246 std::chrono::milliseconds timeout) {
Tyler Chatowa79419d2020-08-12 20:12:11 -0700247 aos::ShmEventLoop event_loop(config);
248 event_loop.SkipAosLog();
249
James Kuszmaul293b2172021-11-10 16:20:48 -0800250 StarterClient client(&event_loop);
Tyler Chatowa79419d2020-08-12 20:12:11 -0700251
Austin Schuhe4b748a2021-10-16 14:19:58 -0700252 // Wait until event loop starts to send all commands so the watcher is ready
James Kuszmaul293b2172021-11-10 16:20:48 -0800253 event_loop.OnRun([&commands, &client, timeout]() {
254 client.SendCommands(commands, timeout);
Tyler Chatowa79419d2020-08-12 20:12:11 -0700255 });
256
257 // If still waiting after timeout milliseconds, exit the loop
James Kuszmaul293b2172021-11-10 16:20:48 -0800258 client.SetTimeoutHandler([&event_loop]() { event_loop.Exit(); });
Tyler Chatowa79419d2020-08-12 20:12:11 -0700259
Tyler Chatowa79419d2020-08-12 20:12:11 -0700260 bool success = false;
Tyler Chatowa79419d2020-08-12 20:12:11 -0700261
James Kuszmaul293b2172021-11-10 16:20:48 -0800262 client.SetSuccessHandler([&event_loop, &success]() {
263 success = true;
264 event_loop.Exit();
Austin Schuhe4b748a2021-10-16 14:19:58 -0700265 });
Tyler Chatowa79419d2020-08-12 20:12:11 -0700266
267 event_loop.Run();
268
269 return success;
270}
271
James Kuszmaul2ca441b2022-01-07 18:16:23 -0800272const std::optional<
273 std::pair<aos::monotonic_clock::time_point,
274 FlatbufferDetachedBuffer<aos::starter::ApplicationStatus>>>
James Kuszmaule4bb0a22022-01-07 18:14:43 -0800275GetStatus(std::string_view name, const Configuration *config,
276 const aos::Node *node) {
Tyler Chatowa79419d2020-08-12 20:12:11 -0700277 ShmEventLoop event_loop(config);
278 event_loop.SkipAosLog();
279
James Kuszmaul293b2172021-11-10 16:20:48 -0800280 auto status_fetcher = event_loop.MakeFetcher<aos::starter::Status>(
281 StatusChannelForNode(config, node)->name()->string_view());
Tyler Chatowa79419d2020-08-12 20:12:11 -0700282 status_fetcher.Fetch();
James Kuszmaule4bb0a22022-01-07 18:14:43 -0800283 if (status_fetcher.get() != nullptr) {
284 const aos::starter::ApplicationStatus *status =
285 FindApplicationStatus(*status_fetcher, name);
286 if (status != nullptr) {
James Kuszmaul2ca441b2022-01-07 18:16:23 -0800287 return std::make_pair(status_fetcher.context().monotonic_remote_time,
288 aos::CopyFlatBuffer(status));
James Kuszmaule4bb0a22022-01-07 18:14:43 -0800289 }
290 }
291 return std::nullopt;
Tyler Chatowa79419d2020-08-12 20:12:11 -0700292}
293
James Kuszmaul293b2172021-11-10 16:20:48 -0800294std::optional<std::pair<aos::monotonic_clock::time_point,
295 const aos::FlatbufferVector<aos::starter::Status>>>
296GetStarterStatus(const aos::Configuration *config, const aos::Node *node) {
Philipp Schrader08537492021-01-23 16:17:55 -0800297 ShmEventLoop event_loop(config);
298 event_loop.SkipAosLog();
299
James Kuszmaul293b2172021-11-10 16:20:48 -0800300 auto status_fetcher = event_loop.MakeFetcher<aos::starter::Status>(
301 StatusChannelForNode(config, node)->name()->string_view());
Philipp Schrader08537492021-01-23 16:17:55 -0800302 status_fetcher.Fetch();
James Kuszmaul293b2172021-11-10 16:20:48 -0800303 return (status_fetcher.get() == nullptr)
304 ? std::nullopt
305 : std::make_optional(std::make_pair(
306 status_fetcher.context().monotonic_remote_time,
307 status_fetcher.CopyFlatBuffer()));
Philipp Schrader08537492021-01-23 16:17:55 -0800308}
309
Stephan Pleinesf63bde82024-01-13 15:59:33 -0800310} // namespace aos::starter