blob: 9f666ff7a594c55c3ea144caee1f4cbaa1e86e15 [file] [log] [blame]
Tyler Chatowa79419d2020-08-12 20:12:11 -07001#include "starter_rpc_lib.h"
2
3#include "aos/events/shm_event_loop.h"
4#include "aos/flatbuffer_merge.h"
James Kuszmaul293b2172021-11-10 16:20:48 -08005#include "aos/starter/starterd_lib.h"
Tyler Chatowa79419d2020-08-12 20:12:11 -07006
7namespace aos {
8namespace starter {
9
James Kuszmaul293b2172021-11-10 16:20:48 -080010namespace {
11State ExpectedStateForCommand(Command command) {
12 switch (command) {
13 case Command::START:
14 case Command::RESTART:
15 return State::RUNNING;
16 case Command::STOP:
17 return State::STOPPED;
18 }
19 return State::STOPPED;
20}
21} // namespace
22
Tyler Chatowa79419d2020-08-12 20:12:11 -070023const aos::starter::ApplicationStatus *FindApplicationStatus(
24 const aos::starter::Status &status, std::string_view name) {
25 if (!status.has_statuses()) {
26 return nullptr;
27 }
28
29 auto statuses = status.statuses();
30
31 auto search =
32 std::find_if(statuses->begin(), statuses->end(),
33 [name](const aos::starter::ApplicationStatus *app_status) {
34 return app_status->has_name() &&
35 app_status->name()->string_view() == name;
36 });
37 if (search == statuses->end()) {
38 return nullptr;
39 }
40 return *search;
41}
42
milind upadhyay4272f382021-04-07 18:03:08 -070043std::string_view FindApplication(const std::string_view &name,
44 const aos::Configuration *config) {
45 std::string_view app_name = name;
46 for (const auto app : *config->applications()) {
Milind Upadhyay7641ce82021-04-10 14:15:28 -070047 if (app->has_executable_name() &&
milind upadhyay4272f382021-04-07 18:03:08 -070048 app->executable_name()->string_view() == name) {
49 app_name = app->name()->string_view();
50 break;
51 }
52 }
53 return app_name;
54}
55
James Kuszmaul293b2172021-11-10 16:20:48 -080056StarterClient::StarterClient(EventLoop *event_loop)
57 : event_loop_(event_loop),
58 timeout_timer_(event_loop_->AddTimer([this]() { Timeout(); })),
59 cmd_sender_(event_loop_->MakeSender<StarterRpc>("/aos")) {
60 if (configuration::MultiNode(event_loop_->configuration())) {
61 for (const aos::Node *node :
62 configuration::GetNodes(event_loop_->configuration())) {
63 const Channel *channel =
64 StatusChannelForNode(event_loop_->configuration(), node);
65 CHECK(channel != nullptr) << ": Failed to find channel /aos for "
66 << Status::GetFullyQualifiedName() << " on "
67 << node->name()->string_view();
68 if (!configuration::ChannelIsReadableOnNode(channel,
69 event_loop_->node())) {
70 LOG(INFO) << "Status channel "
71 << configuration::StrippedChannelToString(channel)
72 << " is not readable on "
73 << event_loop_->node()->name()->string_view();
74 } else if (!configuration::ChannelIsReadableOnNode(
75 StarterRpcChannelForNode(event_loop_->configuration(),
76 event_loop_->node()),
77 node)) {
78 // Don't attempt to construct a status fetcher if the other node won't
79 // even be able to receive our commands.
80 LOG(INFO) << "StarterRpc channel for "
81 << event_loop_->node()->name()->string_view()
82 << " is not readable on " << node->name()->string_view();
83 } else {
84 status_fetchers_[node->name()->str()] =
85 event_loop_->MakeFetcher<Status>(channel->name()->string_view());
86 event_loop_->MakeNoArgWatcher<Status>(channel->name()->string_view(),
87 [this]() {
88 if (CheckCommandsSucceeded()) {
89 Succeed();
90 }
91 });
92 }
93 }
94 } else {
95 status_fetchers_[""] = event_loop_->MakeFetcher<Status>("/aos");
96 event_loop_->MakeNoArgWatcher<Status>("/aos", [this]() {
97 if (CheckCommandsSucceeded()) {
98 Succeed();
99 }
100 });
101 }
Austin Schuhe4b748a2021-10-16 14:19:58 -0700102}
103
James Kuszmaul293b2172021-11-10 16:20:48 -0800104void StarterClient::SendCommands(
105 const std::vector<ApplicationCommand> &commands,
106 monotonic_clock::duration timeout) {
107 CHECK(current_commands_.empty());
108 for (auto &pair : status_fetchers_) {
109 pair.second.Fetch();
110 }
111 const bool is_multi_node =
112 aos::configuration::MultiNode(event_loop_->configuration());
113 for (const auto &command : commands) {
114 auto builder = cmd_sender_.MakeBuilder();
115 const auto application_offset =
116 builder.fbb()->CreateString(command.application);
117 std::vector<flatbuffers::Offset<flatbuffers::String>> node_offsets;
118 CHECK(!command.nodes.empty())
119 << "At least one node must be specified for application "
120 << command.application;
121 for (const aos::Node *node : command.nodes) {
122 const std::string node_name((node == nullptr) ? "" : node->name()->str());
123 if (status_fetchers_.count(node_name) == 0) {
124 if (is_multi_node) {
125 LOG(FATAL) << "Node \"" << node_name
126 << "\" must be configured to both receive StarterRpc "
127 "messages from \""
128 << event_loop_->node()->name()->string_view()
129 << "\" as well as to send starter Status messages back.";
130 } else {
131 LOG(FATAL) << "On single-node configs, use an empty string for the "
132 "node name.";
133 }
134 }
135 CHECK(status_fetchers_[node_name].get() != nullptr)
136 << ": No status available for node " << node_name;
137 if (is_multi_node) {
138 node_offsets.push_back(builder.fbb()->CreateString(node_name));
139 }
140 const ApplicationStatus *last_status =
141 CHECK_NOTNULL(FindApplicationStatus(*status_fetchers_[node_name],
142 command.application));
143 current_commands_[node_name].push_back(CommandStatus{
144 .expected_state = ExpectedStateForCommand(command.command),
145 .application = std::string(command.application),
146 .old_id = std::nullopt});
147 // If we are restarting, then we need to track what the current ID of the
148 // process is to ensure that it actually got restarted. For just starting,
149 // we leave the application running and so don't care.
150 if (command.command == Command::RESTART && last_status->has_id()) {
151 current_commands_[node_name].back().old_id = last_status->id();
152 }
153 }
154 flatbuffers::Offset<
155 flatbuffers::Vector<flatbuffers::Offset<flatbuffers::String>>>
156 nodes_offset;
157 if (is_multi_node) {
158 nodes_offset = builder.fbb()->CreateVector(node_offsets);
159 }
160 auto command_builder = builder.MakeBuilder<StarterRpc>();
161 command_builder.add_command(command.command);
162 command_builder.add_name(application_offset);
163 if (is_multi_node) {
164 command_builder.add_nodes(nodes_offset);
165 }
166 CHECK(builder.Send(command_builder.Finish()));
167 }
168
169 timeout_timer_->Setup(event_loop_->monotonic_now() + timeout);
170}
171
172bool StarterClient::CheckCommandsSucceeded() {
173 if (current_commands_.empty()) {
174 return false;
175 }
176
177 for (auto &pair : status_fetchers_) {
178 pair.second.Fetch();
179 }
180
181 bool succeeded = true;
182
183 for (const auto &pair : current_commands_) {
184 if (pair.second.empty()) {
185 continue;
186 }
187 CHECK(status_fetchers_[pair.first].get() != nullptr)
188 << ": No status available for node " << pair.first;
189 const Status &status = *status_fetchers_[pair.first];
190 for (const auto &command : pair.second) {
191 const ApplicationStatus *application_status =
192 CHECK_NOTNULL(FindApplicationStatus(status, command.application));
193 if (application_status->state() == command.expected_state) {
194 if (command.expected_state == State::RUNNING &&
195 application_status->id() == command.old_id) {
196 succeeded = false;
197 }
198 } else {
199 succeeded = false;
200 }
201 }
202 }
203 return succeeded;
204}
205
206void StarterClient::Timeout() {
207 // Clear commands prior to calling handlers to allow the handler to call
208 // SendCommands() again if desired.
209 current_commands_.clear();
210 if (timeout_handler_) {
211 timeout_handler_();
212 }
213}
214
215void StarterClient::Succeed() {
216 // Clear commands prior to calling handlers to allow the handler to call
217 // SendCommands() again if desired.
218 current_commands_.clear();
219 if (success_handler_) {
220 success_handler_();
221 }
222 timeout_timer_->Disable();
223}
224
225bool SendCommandBlocking(aos::starter::Command command, std::string_view name,
226 const aos::Configuration *config,
227 std::chrono::milliseconds timeout,
228 std::vector<const aos::Node *> nodes) {
229 return SendCommandBlocking({{command, name, nodes}}, config, timeout);
230}
231
232bool SendCommandBlocking(const std::vector<ApplicationCommand> &commands,
233 const aos::Configuration *config,
234 std::chrono::milliseconds timeout) {
Tyler Chatowa79419d2020-08-12 20:12:11 -0700235 aos::ShmEventLoop event_loop(config);
236 event_loop.SkipAosLog();
237
James Kuszmaul293b2172021-11-10 16:20:48 -0800238 StarterClient client(&event_loop);
Tyler Chatowa79419d2020-08-12 20:12:11 -0700239
Austin Schuhe4b748a2021-10-16 14:19:58 -0700240 // Wait until event loop starts to send all commands so the watcher is ready
James Kuszmaul293b2172021-11-10 16:20:48 -0800241 event_loop.OnRun([&commands, &client, timeout]() {
242 client.SendCommands(commands, timeout);
Tyler Chatowa79419d2020-08-12 20:12:11 -0700243 });
244
245 // If still waiting after timeout milliseconds, exit the loop
James Kuszmaul293b2172021-11-10 16:20:48 -0800246 client.SetTimeoutHandler([&event_loop]() { event_loop.Exit(); });
Tyler Chatowa79419d2020-08-12 20:12:11 -0700247
Tyler Chatowa79419d2020-08-12 20:12:11 -0700248 bool success = false;
Tyler Chatowa79419d2020-08-12 20:12:11 -0700249
James Kuszmaul293b2172021-11-10 16:20:48 -0800250 client.SetSuccessHandler([&event_loop, &success]() {
251 success = true;
252 event_loop.Exit();
Austin Schuhe4b748a2021-10-16 14:19:58 -0700253 });
Tyler Chatowa79419d2020-08-12 20:12:11 -0700254
255 event_loop.Run();
256
257 return success;
258}
259
260const FlatbufferDetachedBuffer<aos::starter::ApplicationStatus> GetStatus(
James Kuszmaul293b2172021-11-10 16:20:48 -0800261 std::string_view name, const Configuration *config, const aos::Node *node) {
Tyler Chatowa79419d2020-08-12 20:12:11 -0700262 ShmEventLoop event_loop(config);
263 event_loop.SkipAosLog();
264
James Kuszmaul293b2172021-11-10 16:20:48 -0800265 auto status_fetcher = event_loop.MakeFetcher<aos::starter::Status>(
266 StatusChannelForNode(config, node)->name()->string_view());
Tyler Chatowa79419d2020-08-12 20:12:11 -0700267 status_fetcher.Fetch();
Milind Upadhyay49174a72021-04-10 16:24:57 -0700268 auto status = status_fetcher.get()
269 ? FindApplicationStatus(*status_fetcher, name)
270 : nullptr;
Tyler Chatowa79419d2020-08-12 20:12:11 -0700271 return status ? aos::CopyFlatBuffer(status)
272 : FlatbufferDetachedBuffer<
273 aos::starter::ApplicationStatus>::Empty();
274}
275
James Kuszmaul293b2172021-11-10 16:20:48 -0800276std::optional<std::pair<aos::monotonic_clock::time_point,
277 const aos::FlatbufferVector<aos::starter::Status>>>
278GetStarterStatus(const aos::Configuration *config, const aos::Node *node) {
Philipp Schrader08537492021-01-23 16:17:55 -0800279 ShmEventLoop event_loop(config);
280 event_loop.SkipAosLog();
281
James Kuszmaul293b2172021-11-10 16:20:48 -0800282 auto status_fetcher = event_loop.MakeFetcher<aos::starter::Status>(
283 StatusChannelForNode(config, node)->name()->string_view());
Philipp Schrader08537492021-01-23 16:17:55 -0800284 status_fetcher.Fetch();
James Kuszmaul293b2172021-11-10 16:20:48 -0800285 return (status_fetcher.get() == nullptr)
286 ? std::nullopt
287 : std::make_optional(std::make_pair(
288 status_fetcher.context().monotonic_remote_time,
289 status_fetcher.CopyFlatBuffer()));
Philipp Schrader08537492021-01-23 16:17:55 -0800290}
291
Tyler Chatowa79419d2020-08-12 20:12:11 -0700292} // namespace starter
293} // namespace aos