blob: 4c710576f80c099c32c6490ac0b111b7f8820549 [file] [log] [blame]
Philipp Schrader790cb542023-07-05 21:06:52 -07001#include "aos/starter/starterd_lib.h"
Tyler Chatowa79419d2020-08-12 20:12:11 -07002
Tyler Chatowa79419d2020-08-12 20:12:11 -07003#include <algorithm>
4#include <utility>
5
James Kuszmaul293b2172021-11-10 16:20:48 -08006#include "absl/strings/str_format.h"
Tyler Chatowa79419d2020-08-12 20:12:11 -07007#include "glog/logging.h"
8#include "glog/stl_logging.h"
9
Philipp Schrader790cb542023-07-05 21:06:52 -070010#include "aos/json_to_flatbuffer.h"
11
Austin Schuh4d275fc2022-09-16 15:42:45 -070012// FLAGS_shm_base is defined elsewhere, declare it here so it can be used
13// to override the shared memory folder for unit testing.
14DECLARE_string(shm_base);
15// FLAGS_permissions is defined elsewhere, declare it here so it can be used
16// to set the file permissions on the shared memory block.
17DECLARE_uint32(permissions);
18
Austin Schuh816a1162023-05-31 16:29:47 -070019DEFINE_uint32(queue_initialization_threads, 0,
20 "Number of threads to spin up to initialize the queue. 0 means "
21 "use the main thread.");
Pallavi Madhukaraa17d242023-12-20 13:42:41 -080022DECLARE_bool(enable_ftrace);
Austin Schuh816a1162023-05-31 16:29:47 -070023
Stephan Pleinesf63bde82024-01-13 15:59:33 -080024namespace aos::starter {
Tyler Chatowa79419d2020-08-12 20:12:11 -070025
James Kuszmaul293b2172021-11-10 16:20:48 -080026const aos::Channel *StatusChannelForNode(const aos::Configuration *config,
27 const aos::Node *node) {
28 return configuration::GetChannel<Status>(config, "/aos", "", node);
29}
30const aos::Channel *StarterRpcChannelForNode(const aos::Configuration *config,
31 const aos::Node *node) {
32 return configuration::GetChannel<StarterRpc>(config, "/aos", "", node);
33}
34
Tyler Chatowa79419d2020-08-12 20:12:11 -070035Starter::Starter(const aos::Configuration *event_loop_config)
36 : config_msg_(event_loop_config),
37 event_loop_(event_loop_config),
38 status_sender_(event_loop_.MakeSender<aos::starter::Status>("/aos")),
James Kuszmaul2c10e052023-08-09 10:22:36 -070039 status_timer_(event_loop_.AddPhasedLoop(
40 [this](int elapsed_cycles) {
41 ServiceTimingReportFetcher(elapsed_cycles);
42 SendStatus();
43 status_count_ = 0;
44 },
45 std::chrono::milliseconds(1000))),
Austin Schuh59398d32023-05-03 08:10:55 -070046 cleanup_timer_(event_loop_.AddTimer([this] {
47 event_loop_.Exit();
48 LOG(INFO) << "Starter event loop exit finished.";
49 })),
Austin Schuhfc304942021-10-16 14:20:05 -070050 max_status_count_(
51 event_loop_.GetChannel<aos::starter::Status>("/aos")->frequency() -
52 1),
James Kuszmaul8544c492023-07-31 15:00:38 -070053 timing_report_fetcher_(
54 event_loop_.MakeFetcher<aos::timing::Report>("/aos")),
Austin Schuh4d275fc2022-09-16 15:42:45 -070055 shm_base_(FLAGS_shm_base),
Tyler Chatowa79419d2020-08-12 20:12:11 -070056 listener_(&event_loop_,
James Kuszmaul6295a642022-03-22 15:23:59 -070057 [this](signalfd_siginfo signal) { OnSignal(signal); }),
Maxwell Gumleyb27245f2024-04-11 15:46:22 -060058 top_(&event_loop_, aos::util::Top::TrackThreadsMode::kDisabled,
59 aos::util::Top::TrackPerThreadInfoMode::kEnabled) {
Tyler Chatowa79419d2020-08-12 20:12:11 -070060 event_loop_.SkipAosLog();
61
James Kuszmaul06a8f352024-03-15 14:15:57 -070062 cleanup_timer_->set_name("cleanup");
63
James Kuszmaul293b2172021-11-10 16:20:48 -080064 if (!aos::configuration::MultiNode(config_msg_)) {
65 event_loop_.MakeWatcher(
66 "/aos",
67 [this](const aos::starter::StarterRpc &cmd) { HandleStarterRpc(cmd); });
68 } else {
69 for (const aos::Node *node : aos::configuration::GetNodes(config_msg_)) {
70 const Channel *channel = StarterRpcChannelForNode(config_msg_, node);
71 CHECK(channel != nullptr) << ": Failed to find channel /aos for "
72 << StarterRpc::GetFullyQualifiedName() << " on "
73 << node->name()->string_view();
74 if (!aos::configuration::ChannelIsReadableOnNode(channel,
75 event_loop_.node())) {
76 LOG(INFO) << "StarterRpc channel "
77 << aos::configuration::StrippedChannelToString(channel)
78 << " is not readable on "
79 << event_loop_.node()->name()->string_view();
80 } else {
81 event_loop_.MakeWatcher(channel->name()->string_view(),
82 [this](const aos::starter::StarterRpc &cmd) {
83 HandleStarterRpc(cmd);
84 });
85 }
Tyler Chatowa79419d2020-08-12 20:12:11 -070086 }
James Kuszmaul293b2172021-11-10 16:20:48 -080087 }
Tyler Chatowa79419d2020-08-12 20:12:11 -070088
Austin Schuh4d275fc2022-09-16 15:42:45 -070089 // Catalogue all the applications for this node, so we can keep an eye on
90 // them.
Tyler Chatowa79419d2020-08-12 20:12:11 -070091 if (config_msg_->has_applications()) {
92 const flatbuffers::Vector<flatbuffers::Offset<aos::Application>>
93 *applications = config_msg_->applications();
Ravago Jones7e2dd322020-11-21 15:58:58 -080094
95 if (aos::configuration::MultiNode(config_msg_)) {
96 std::string_view current_node = event_loop_.node()->name()->string_view();
97 for (const aos::Application *application : *applications) {
Austin Schuh228609b2023-03-21 15:43:11 -070098 CHECK(application->has_nodes())
99 << ": Missing nodes on " << aos::FlatbufferToJson(application);
Ravago Jones7e2dd322020-11-21 15:58:58 -0800100 for (const flatbuffers::String *node : *application->nodes()) {
101 if (node->string_view() == current_node) {
102 AddApplication(application);
103 break;
104 }
105 }
106 }
107 } else {
108 for (const aos::Application *application : *applications) {
109 AddApplication(application);
110 }
Tyler Chatowa79419d2020-08-12 20:12:11 -0700111 }
112 }
Austin Schuh4d275fc2022-09-16 15:42:45 -0700113
114 // Catalogue all the intranode channels for this node, and create
115 // MemoryMappedQueues for each one to allocate the shared memory before
116 // spawning any shasta process.
117 if (config_msg_->has_channels()) {
Austin Schuh816a1162023-05-31 16:29:47 -0700118 LOG(INFO) << "Starting to initialize shared memory.";
Austin Schuh4d275fc2022-09-16 15:42:45 -0700119 const aos::Node *this_node = event_loop_.node();
Austin Schuh816a1162023-05-31 16:29:47 -0700120 std::vector<const aos::Channel *> channels_to_construct;
Austin Schuh4d275fc2022-09-16 15:42:45 -0700121 for (const aos::Channel *channel : *config_msg_->channels()) {
122 if (aos::configuration::ChannelIsReadableOnNode(channel, this_node)) {
Austin Schuh816a1162023-05-31 16:29:47 -0700123 if (FLAGS_queue_initialization_threads == 0) {
124 AddChannel(channel);
125 } else {
126 channels_to_construct.push_back(channel);
127 }
Austin Schuh4d275fc2022-09-16 15:42:45 -0700128 }
129 }
Austin Schuh816a1162023-05-31 16:29:47 -0700130
131 if (FLAGS_queue_initialization_threads != 0) {
132 std::mutex pool_mutex;
133 std::vector<std::thread> threads;
134 threads.reserve(FLAGS_queue_initialization_threads);
135 for (size_t i = 0; i < FLAGS_queue_initialization_threads; ++i) {
136 threads.emplace_back([this, &pool_mutex, &channels_to_construct]() {
137 while (true) {
138 const aos::Channel *channel;
139 {
140 std::unique_lock<std::mutex> locker(pool_mutex);
141 if (channels_to_construct.empty()) {
142 return;
143 }
144 channel = channels_to_construct.back();
145 channels_to_construct.pop_back();
146 }
147 AddChannel(channel);
148 }
149 });
150 }
151 for (size_t i = 0; i < FLAGS_queue_initialization_threads; ++i) {
152 threads[i].join();
153 }
154 }
155 LOG(INFO) << "Starting applications.";
Austin Schuh4d275fc2022-09-16 15:42:45 -0700156 }
Tyler Chatowa79419d2020-08-12 20:12:11 -0700157}
158
James Kuszmaul293b2172021-11-10 16:20:48 -0800159void Starter::HandleStarterRpc(const StarterRpc &command) {
160 if (!command.has_command() || !command.has_name() || exiting_) {
161 return;
162 }
163
164 LOG(INFO) << "Received " << aos::FlatbufferToJson(&command);
165
166 if (command.has_nodes()) {
167 CHECK(aos::configuration::MultiNode(config_msg_));
168 bool relevant_to_this_node = false;
169 for (const flatbuffers::String *node : *command.nodes()) {
170 if (node->string_view() == event_loop_.node()->name()->string_view()) {
171 relevant_to_this_node = true;
172 }
173 }
174 if (!relevant_to_this_node) {
175 return;
176 }
177 }
178 // If not populated, restart regardless of node.
179
180 auto search = applications_.find(command.name()->str());
181 if (search != applications_.end()) {
182 // If an applicatione exists by the given name, dispatch the command
183 search->second.HandleCommand(command.command());
184 }
185}
186
James Kuszmaul6295a642022-03-22 15:23:59 -0700187void Starter::HandleStateChange() {
188 std::set<pid_t> all_pids;
189 for (const auto &pair : applications_) {
190 if (pair.second.get_pid() > 0 &&
191 pair.second.status() != aos::starter::State::STOPPED) {
192 all_pids.insert(pair.second.get_pid());
193 }
194 }
195 top_.set_track_pids(all_pids);
196
Austin Schuhfc304942021-10-16 14:20:05 -0700197 if (status_count_ < max_status_count_) {
198 SendStatus();
199 ++status_count_;
200 } else {
201 VLOG(1) << "That's enough " << status_count_ << " " << max_status_count_;
202 }
203}
204
Tyler Chatowa79419d2020-08-12 20:12:11 -0700205void Starter::Cleanup() {
206 if (exiting_) {
207 return;
208 }
209 exiting_ = true;
210 for (auto &application : applications_) {
211 application.second.Terminate();
212 }
Philipp Schradera6712522023-07-05 20:25:11 -0700213 cleanup_timer_->Schedule(event_loop_.monotonic_now() +
214 std::chrono::milliseconds(1500));
Tyler Chatowa79419d2020-08-12 20:12:11 -0700215}
216
217void Starter::OnSignal(signalfd_siginfo info) {
Tyler Chatowa79419d2020-08-12 20:12:11 -0700218 if (info.ssi_signo == SIGCHLD) {
219 // SIGCHLD messages can be collapsed if multiple are received, so all
220 // applications must check their status.
Pallavi Madhukaraa17d242023-12-20 13:42:41 -0800221 if (FLAGS_enable_ftrace) {
222 ftrace_.FormatMessage("SIGCHLD");
223 ftrace_.TurnOffOrDie();
224 }
225
Tyler Chatowa79419d2020-08-12 20:12:11 -0700226 for (auto iter = applications_.begin(); iter != applications_.end();) {
227 if (iter->second.MaybeHandleSignal()) {
228 iter = applications_.erase(iter);
229 } else {
230 ++iter;
231 }
232 }
233
234 if (exiting_ && applications_.empty()) {
235 event_loop_.Exit();
236 }
Austin Schuh3204b332021-10-16 14:20:10 -0700237 } else {
238 LOG(INFO) << "Received signal '" << strsignal(info.ssi_signo) << "'";
239
240 if (std::find(kStarterDeath.begin(), kStarterDeath.end(), info.ssi_signo) !=
241 kStarterDeath.end()) {
242 LOG(WARNING) << "Starter shutting down";
243 Cleanup();
244 }
Tyler Chatowa79419d2020-08-12 20:12:11 -0700245 }
246}
247
248Application *Starter::AddApplication(const aos::Application *application) {
James Kuszmaul6295a642022-03-22 15:23:59 -0700249 auto [iter, success] = applications_.try_emplace(
250 application->name()->str(), application, &event_loop_,
251 [this]() { HandleStateChange(); });
Tyler Chatowa79419d2020-08-12 20:12:11 -0700252 if (success) {
James Kuszmauld42edb42022-01-07 18:00:16 -0800253 // We should be catching and handling SIGCHLD correctly in the starter, so
254 // don't leave in the crutch for polling for the child process status (this
255 // is less about efficiency, and more about making sure bit rot doesn't
256 // result in the signal handling breaking).
257 iter->second.DisableChildDeathPolling();
Tyler Chatowa79419d2020-08-12 20:12:11 -0700258 return &(iter->second);
259 }
260 return nullptr;
261}
262
263void Starter::Run() {
Tyler Chatow03fdb2a2020-12-26 18:39:36 -0800264#ifdef AOS_ARCHITECTURE_arm_frc
265 PCHECK(setuid(0) == 0) << "Failed to change user to root";
266#endif
267
Tyler Chatowa79419d2020-08-12 20:12:11 -0700268 for (auto &application : applications_) {
Austin Schuh5f79a5a2021-10-12 17:46:50 -0700269 if (application.second.autostart()) {
270 application.second.Start();
271 }
Tyler Chatowa79419d2020-08-12 20:12:11 -0700272 }
273
274 event_loop_.Run();
275}
276
James Kuszmaul2c10e052023-08-09 10:22:36 -0700277void Starter::ServiceTimingReportFetcher(int elapsed_cycles) {
278 // If there is any chance that it has been longer than one cycle since we last
279 // serviced the fetcher, call Fetch(). This reduces the chances that the
280 // fetcher falls behind when the system is under heavy load. Dropping a few
281 // timing report messages when the system is under stress is fine.
282 if (timing_report_fetcher_.get() == nullptr || elapsed_cycles > 1) {
283 timing_report_fetcher_.Fetch();
284 }
James Kuszmaul8544c492023-07-31 15:00:38 -0700285 while (timing_report_fetcher_.FetchNext()) {
286 for (auto &application : applications_) {
287 application.second.ObserveTimingReport(
288 timing_report_fetcher_.context().monotonic_event_time,
289 timing_report_fetcher_.get());
290 }
291 }
292}
293
Tyler Chatowa79419d2020-08-12 20:12:11 -0700294void Starter::SendStatus() {
295 aos::Sender<aos::starter::Status>::Builder builder =
296 status_sender_.MakeBuilder();
297
298 std::vector<flatbuffers::Offset<aos::starter::ApplicationStatus>> statuses;
299
300 for (auto &application : applications_) {
James Kuszmaul6295a642022-03-22 15:23:59 -0700301 statuses.push_back(application.second.PopulateStatus(builder.fbb(), &top_));
Tyler Chatowa79419d2020-08-12 20:12:11 -0700302 }
303
304 auto statuses_fbs = builder.fbb()->CreateVector(statuses);
305
306 aos::starter::Status::Builder status_builder(*builder.fbb());
307 status_builder.add_statuses(statuses_fbs);
milind1f1dca32021-07-03 13:50:07 -0700308 builder.CheckOk(builder.Send(status_builder.Finish()));
Tyler Chatowa79419d2020-08-12 20:12:11 -0700309}
310
Austin Schuh4d275fc2022-09-16 15:42:45 -0700311void Starter::AddChannel(const aos::Channel *channel) {
312 CHECK_NOTNULL(channel);
Austin Schuh816a1162023-05-31 16:29:47 -0700313 std::unique_ptr<aos::ipc_lib::MemoryMappedQueue> queue =
314 std::make_unique<aos::ipc_lib::MemoryMappedQueue>(
315 shm_base_, FLAGS_permissions, event_loop_.configuration(), channel);
316
317 {
318 std::unique_lock<std::mutex> locker(queue_mutex_);
319 shm_queues_.emplace_back(std::move(queue));
320 }
Austin Schuh4d275fc2022-09-16 15:42:45 -0700321 VLOG(1) << "Created MemoryMappedQueue for "
322 << aos::configuration::StrippedChannelToString(channel) << " under "
323 << shm_base_;
324}
325
Stephan Pleinesf63bde82024-01-13 15:59:33 -0800326} // namespace aos::starter