blob: e507507056187fc9c56c85271af2c47b30b50a97 [file] [log] [blame]
Philipp Schrader790cb542023-07-05 21:06:52 -07001#include "aos/starter/starterd_lib.h"
Tyler Chatowa79419d2020-08-12 20:12:11 -07002
Stephan Pleinesf581a072024-05-23 20:59:27 -07003#include <string.h>
4
Tyler Chatowa79419d2020-08-12 20:12:11 -07005#include <algorithm>
Stephan Pleinesf581a072024-05-23 20:59:27 -07006#include <chrono>
7#include <functional>
8#include <ostream>
9#include <set>
10#include <string_view>
11#include <thread>
Tyler Chatowa79419d2020-08-12 20:12:11 -070012#include <utility>
13
Austin Schuh99f7c6a2024-06-25 22:07:44 -070014#include "absl/flags/declare.h"
15#include "absl/flags/flag.h"
16#include "absl/log/check.h"
17#include "absl/log/log.h"
Stephan Pleinesf581a072024-05-23 20:59:27 -070018#include "flatbuffers/buffer.h"
19#include "flatbuffers/flatbuffer_builder.h"
20#include "flatbuffers/string.h"
21#include "flatbuffers/vector.h"
Tyler Chatowa79419d2020-08-12 20:12:11 -070022
Stephan Pleinesf581a072024-05-23 20:59:27 -070023#include "aos/events/context.h"
Philipp Schrader790cb542023-07-05 21:06:52 -070024#include "aos/json_to_flatbuffer.h"
25
Austin Schuh4d275fc2022-09-16 15:42:45 -070026// FLAGS_shm_base is defined elsewhere, declare it here so it can be used
27// to override the shared memory folder for unit testing.
Austin Schuh99f7c6a2024-06-25 22:07:44 -070028ABSL_DECLARE_FLAG(std::string, shm_base);
Austin Schuh4d275fc2022-09-16 15:42:45 -070029// FLAGS_permissions is defined elsewhere, declare it here so it can be used
30// to set the file permissions on the shared memory block.
Austin Schuh99f7c6a2024-06-25 22:07:44 -070031ABSL_DECLARE_FLAG(uint32_t, permissions);
Austin Schuh4d275fc2022-09-16 15:42:45 -070032
Austin Schuh99f7c6a2024-06-25 22:07:44 -070033ABSL_FLAG(uint32_t, queue_initialization_threads, 0,
34 "Number of threads to spin up to initialize the queue. 0 means "
35 "use the main thread.");
36ABSL_DECLARE_FLAG(bool, enable_ftrace);
Austin Schuh816a1162023-05-31 16:29:47 -070037
Stephan Pleinesf63bde82024-01-13 15:59:33 -080038namespace aos::starter {
Tyler Chatowa79419d2020-08-12 20:12:11 -070039
James Kuszmaul293b2172021-11-10 16:20:48 -080040const aos::Channel *StatusChannelForNode(const aos::Configuration *config,
41 const aos::Node *node) {
42 return configuration::GetChannel<Status>(config, "/aos", "", node);
43}
44const aos::Channel *StarterRpcChannelForNode(const aos::Configuration *config,
45 const aos::Node *node) {
46 return configuration::GetChannel<StarterRpc>(config, "/aos", "", node);
47}
48
Tyler Chatowa79419d2020-08-12 20:12:11 -070049Starter::Starter(const aos::Configuration *event_loop_config)
50 : config_msg_(event_loop_config),
51 event_loop_(event_loop_config),
52 status_sender_(event_loop_.MakeSender<aos::starter::Status>("/aos")),
James Kuszmaul2c10e052023-08-09 10:22:36 -070053 status_timer_(event_loop_.AddPhasedLoop(
54 [this](int elapsed_cycles) {
55 ServiceTimingReportFetcher(elapsed_cycles);
56 SendStatus();
57 status_count_ = 0;
58 },
59 std::chrono::milliseconds(1000))),
Austin Schuh59398d32023-05-03 08:10:55 -070060 cleanup_timer_(event_loop_.AddTimer([this] {
61 event_loop_.Exit();
62 LOG(INFO) << "Starter event loop exit finished.";
63 })),
Austin Schuhfc304942021-10-16 14:20:05 -070064 max_status_count_(
65 event_loop_.GetChannel<aos::starter::Status>("/aos")->frequency() -
66 1),
James Kuszmaul8544c492023-07-31 15:00:38 -070067 timing_report_fetcher_(
68 event_loop_.MakeFetcher<aos::timing::Report>("/aos")),
Austin Schuh99f7c6a2024-06-25 22:07:44 -070069 shm_base_(absl::GetFlag(FLAGS_shm_base)),
Tyler Chatowa79419d2020-08-12 20:12:11 -070070 listener_(&event_loop_,
James Kuszmaul6295a642022-03-22 15:23:59 -070071 [this](signalfd_siginfo signal) { OnSignal(signal); }),
Maxwell Gumleyb27245f2024-04-11 15:46:22 -060072 top_(&event_loop_, aos::util::Top::TrackThreadsMode::kDisabled,
73 aos::util::Top::TrackPerThreadInfoMode::kEnabled) {
Tyler Chatowa79419d2020-08-12 20:12:11 -070074 event_loop_.SkipAosLog();
75
James Kuszmaul06a8f352024-03-15 14:15:57 -070076 cleanup_timer_->set_name("cleanup");
77
James Kuszmaul293b2172021-11-10 16:20:48 -080078 if (!aos::configuration::MultiNode(config_msg_)) {
79 event_loop_.MakeWatcher(
80 "/aos",
81 [this](const aos::starter::StarterRpc &cmd) { HandleStarterRpc(cmd); });
82 } else {
83 for (const aos::Node *node : aos::configuration::GetNodes(config_msg_)) {
84 const Channel *channel = StarterRpcChannelForNode(config_msg_, node);
85 CHECK(channel != nullptr) << ": Failed to find channel /aos for "
86 << StarterRpc::GetFullyQualifiedName() << " on "
87 << node->name()->string_view();
88 if (!aos::configuration::ChannelIsReadableOnNode(channel,
89 event_loop_.node())) {
90 LOG(INFO) << "StarterRpc channel "
91 << aos::configuration::StrippedChannelToString(channel)
92 << " is not readable on "
93 << event_loop_.node()->name()->string_view();
94 } else {
95 event_loop_.MakeWatcher(channel->name()->string_view(),
96 [this](const aos::starter::StarterRpc &cmd) {
97 HandleStarterRpc(cmd);
98 });
99 }
Tyler Chatowa79419d2020-08-12 20:12:11 -0700100 }
James Kuszmaul293b2172021-11-10 16:20:48 -0800101 }
Tyler Chatowa79419d2020-08-12 20:12:11 -0700102
Austin Schuh4d275fc2022-09-16 15:42:45 -0700103 // Catalogue all the applications for this node, so we can keep an eye on
104 // them.
Tyler Chatowa79419d2020-08-12 20:12:11 -0700105 if (config_msg_->has_applications()) {
106 const flatbuffers::Vector<flatbuffers::Offset<aos::Application>>
107 *applications = config_msg_->applications();
Ravago Jones7e2dd322020-11-21 15:58:58 -0800108
109 if (aos::configuration::MultiNode(config_msg_)) {
110 std::string_view current_node = event_loop_.node()->name()->string_view();
111 for (const aos::Application *application : *applications) {
Austin Schuh228609b2023-03-21 15:43:11 -0700112 CHECK(application->has_nodes())
113 << ": Missing nodes on " << aos::FlatbufferToJson(application);
Ravago Jones7e2dd322020-11-21 15:58:58 -0800114 for (const flatbuffers::String *node : *application->nodes()) {
115 if (node->string_view() == current_node) {
116 AddApplication(application);
117 break;
118 }
119 }
120 }
121 } else {
122 for (const aos::Application *application : *applications) {
123 AddApplication(application);
124 }
Tyler Chatowa79419d2020-08-12 20:12:11 -0700125 }
126 }
Austin Schuh4d275fc2022-09-16 15:42:45 -0700127
128 // Catalogue all the intranode channels for this node, and create
129 // MemoryMappedQueues for each one to allocate the shared memory before
130 // spawning any shasta process.
131 if (config_msg_->has_channels()) {
Austin Schuh816a1162023-05-31 16:29:47 -0700132 LOG(INFO) << "Starting to initialize shared memory.";
Austin Schuh4d275fc2022-09-16 15:42:45 -0700133 const aos::Node *this_node = event_loop_.node();
Austin Schuh816a1162023-05-31 16:29:47 -0700134 std::vector<const aos::Channel *> channels_to_construct;
Austin Schuh4d275fc2022-09-16 15:42:45 -0700135 for (const aos::Channel *channel : *config_msg_->channels()) {
136 if (aos::configuration::ChannelIsReadableOnNode(channel, this_node)) {
Austin Schuh99f7c6a2024-06-25 22:07:44 -0700137 if (absl::GetFlag(FLAGS_queue_initialization_threads) == 0) {
Austin Schuh816a1162023-05-31 16:29:47 -0700138 AddChannel(channel);
139 } else {
140 channels_to_construct.push_back(channel);
141 }
Austin Schuh4d275fc2022-09-16 15:42:45 -0700142 }
143 }
Austin Schuh816a1162023-05-31 16:29:47 -0700144
Austin Schuh99f7c6a2024-06-25 22:07:44 -0700145 if (absl::GetFlag(FLAGS_queue_initialization_threads) != 0) {
Austin Schuh816a1162023-05-31 16:29:47 -0700146 std::mutex pool_mutex;
147 std::vector<std::thread> threads;
Austin Schuh99f7c6a2024-06-25 22:07:44 -0700148 threads.reserve(absl::GetFlag(FLAGS_queue_initialization_threads));
149 for (size_t i = 0; i < absl::GetFlag(FLAGS_queue_initialization_threads);
150 ++i) {
Austin Schuh816a1162023-05-31 16:29:47 -0700151 threads.emplace_back([this, &pool_mutex, &channels_to_construct]() {
152 while (true) {
153 const aos::Channel *channel;
154 {
155 std::unique_lock<std::mutex> locker(pool_mutex);
156 if (channels_to_construct.empty()) {
157 return;
158 }
159 channel = channels_to_construct.back();
160 channels_to_construct.pop_back();
161 }
162 AddChannel(channel);
163 }
164 });
165 }
Austin Schuh99f7c6a2024-06-25 22:07:44 -0700166 for (size_t i = 0; i < absl::GetFlag(FLAGS_queue_initialization_threads);
167 ++i) {
Austin Schuh816a1162023-05-31 16:29:47 -0700168 threads[i].join();
169 }
170 }
171 LOG(INFO) << "Starting applications.";
Austin Schuh4d275fc2022-09-16 15:42:45 -0700172 }
Tyler Chatowa79419d2020-08-12 20:12:11 -0700173}
174
James Kuszmaul293b2172021-11-10 16:20:48 -0800175void Starter::HandleStarterRpc(const StarterRpc &command) {
176 if (!command.has_command() || !command.has_name() || exiting_) {
177 return;
178 }
179
180 LOG(INFO) << "Received " << aos::FlatbufferToJson(&command);
181
182 if (command.has_nodes()) {
183 CHECK(aos::configuration::MultiNode(config_msg_));
184 bool relevant_to_this_node = false;
185 for (const flatbuffers::String *node : *command.nodes()) {
186 if (node->string_view() == event_loop_.node()->name()->string_view()) {
187 relevant_to_this_node = true;
188 }
189 }
190 if (!relevant_to_this_node) {
191 return;
192 }
193 }
194 // If not populated, restart regardless of node.
195
196 auto search = applications_.find(command.name()->str());
197 if (search != applications_.end()) {
198 // If an applicatione exists by the given name, dispatch the command
199 search->second.HandleCommand(command.command());
200 }
201}
202
James Kuszmaul6295a642022-03-22 15:23:59 -0700203void Starter::HandleStateChange() {
204 std::set<pid_t> all_pids;
205 for (const auto &pair : applications_) {
206 if (pair.second.get_pid() > 0 &&
207 pair.second.status() != aos::starter::State::STOPPED) {
208 all_pids.insert(pair.second.get_pid());
209 }
210 }
211 top_.set_track_pids(all_pids);
212
Austin Schuhfc304942021-10-16 14:20:05 -0700213 if (status_count_ < max_status_count_) {
214 SendStatus();
215 ++status_count_;
216 } else {
217 VLOG(1) << "That's enough " << status_count_ << " " << max_status_count_;
218 }
219}
220
Tyler Chatowa79419d2020-08-12 20:12:11 -0700221void Starter::Cleanup() {
222 if (exiting_) {
223 return;
224 }
225 exiting_ = true;
226 for (auto &application : applications_) {
227 application.second.Terminate();
228 }
Philipp Schradera6712522023-07-05 20:25:11 -0700229 cleanup_timer_->Schedule(event_loop_.monotonic_now() +
230 std::chrono::milliseconds(1500));
Tyler Chatowa79419d2020-08-12 20:12:11 -0700231}
232
233void Starter::OnSignal(signalfd_siginfo info) {
Tyler Chatowa79419d2020-08-12 20:12:11 -0700234 if (info.ssi_signo == SIGCHLD) {
235 // SIGCHLD messages can be collapsed if multiple are received, so all
236 // applications must check their status.
Austin Schuh99f7c6a2024-06-25 22:07:44 -0700237 if (absl::GetFlag(FLAGS_enable_ftrace)) {
Pallavi Madhukaraa17d242023-12-20 13:42:41 -0800238 ftrace_.FormatMessage("SIGCHLD");
239 ftrace_.TurnOffOrDie();
240 }
241
Tyler Chatowa79419d2020-08-12 20:12:11 -0700242 for (auto iter = applications_.begin(); iter != applications_.end();) {
243 if (iter->second.MaybeHandleSignal()) {
244 iter = applications_.erase(iter);
245 } else {
246 ++iter;
247 }
248 }
249
250 if (exiting_ && applications_.empty()) {
251 event_loop_.Exit();
252 }
Austin Schuh3204b332021-10-16 14:20:10 -0700253 } else {
254 LOG(INFO) << "Received signal '" << strsignal(info.ssi_signo) << "'";
255
256 if (std::find(kStarterDeath.begin(), kStarterDeath.end(), info.ssi_signo) !=
257 kStarterDeath.end()) {
258 LOG(WARNING) << "Starter shutting down";
259 Cleanup();
260 }
Tyler Chatowa79419d2020-08-12 20:12:11 -0700261 }
262}
263
264Application *Starter::AddApplication(const aos::Application *application) {
James Kuszmaul6295a642022-03-22 15:23:59 -0700265 auto [iter, success] = applications_.try_emplace(
266 application->name()->str(), application, &event_loop_,
267 [this]() { HandleStateChange(); });
Tyler Chatowa79419d2020-08-12 20:12:11 -0700268 if (success) {
James Kuszmauld42edb42022-01-07 18:00:16 -0800269 // We should be catching and handling SIGCHLD correctly in the starter, so
270 // don't leave in the crutch for polling for the child process status (this
271 // is less about efficiency, and more about making sure bit rot doesn't
272 // result in the signal handling breaking).
273 iter->second.DisableChildDeathPolling();
Tyler Chatowa79419d2020-08-12 20:12:11 -0700274 return &(iter->second);
275 }
276 return nullptr;
277}
278
279void Starter::Run() {
Tyler Chatow03fdb2a2020-12-26 18:39:36 -0800280#ifdef AOS_ARCHITECTURE_arm_frc
281 PCHECK(setuid(0) == 0) << "Failed to change user to root";
282#endif
283
Tyler Chatowa79419d2020-08-12 20:12:11 -0700284 for (auto &application : applications_) {
Austin Schuh5f79a5a2021-10-12 17:46:50 -0700285 if (application.second.autostart()) {
286 application.second.Start();
287 }
Tyler Chatowa79419d2020-08-12 20:12:11 -0700288 }
289
290 event_loop_.Run();
291}
292
James Kuszmaul2c10e052023-08-09 10:22:36 -0700293void Starter::ServiceTimingReportFetcher(int elapsed_cycles) {
294 // If there is any chance that it has been longer than one cycle since we last
295 // serviced the fetcher, call Fetch(). This reduces the chances that the
296 // fetcher falls behind when the system is under heavy load. Dropping a few
297 // timing report messages when the system is under stress is fine.
298 if (timing_report_fetcher_.get() == nullptr || elapsed_cycles > 1) {
299 timing_report_fetcher_.Fetch();
300 }
James Kuszmaul8544c492023-07-31 15:00:38 -0700301 while (timing_report_fetcher_.FetchNext()) {
302 for (auto &application : applications_) {
303 application.second.ObserveTimingReport(
304 timing_report_fetcher_.context().monotonic_event_time,
305 timing_report_fetcher_.get());
306 }
307 }
308}
309
Tyler Chatowa79419d2020-08-12 20:12:11 -0700310void Starter::SendStatus() {
311 aos::Sender<aos::starter::Status>::Builder builder =
312 status_sender_.MakeBuilder();
313
314 std::vector<flatbuffers::Offset<aos::starter::ApplicationStatus>> statuses;
315
316 for (auto &application : applications_) {
James Kuszmaul6295a642022-03-22 15:23:59 -0700317 statuses.push_back(application.second.PopulateStatus(builder.fbb(), &top_));
Tyler Chatowa79419d2020-08-12 20:12:11 -0700318 }
319
320 auto statuses_fbs = builder.fbb()->CreateVector(statuses);
321
322 aos::starter::Status::Builder status_builder(*builder.fbb());
323 status_builder.add_statuses(statuses_fbs);
milind1f1dca32021-07-03 13:50:07 -0700324 builder.CheckOk(builder.Send(status_builder.Finish()));
Tyler Chatowa79419d2020-08-12 20:12:11 -0700325}
326
Austin Schuh4d275fc2022-09-16 15:42:45 -0700327void Starter::AddChannel(const aos::Channel *channel) {
Austin Schuh6bdcc372024-06-27 14:49:11 -0700328 CHECK(channel != nullptr);
Austin Schuh816a1162023-05-31 16:29:47 -0700329 std::unique_ptr<aos::ipc_lib::MemoryMappedQueue> queue =
330 std::make_unique<aos::ipc_lib::MemoryMappedQueue>(
Austin Schuh99f7c6a2024-06-25 22:07:44 -0700331 shm_base_, absl::GetFlag(FLAGS_permissions),
332 event_loop_.configuration(), channel);
Austin Schuh816a1162023-05-31 16:29:47 -0700333
334 {
335 std::unique_lock<std::mutex> locker(queue_mutex_);
336 shm_queues_.emplace_back(std::move(queue));
337 }
Austin Schuh4d275fc2022-09-16 15:42:45 -0700338 VLOG(1) << "Created MemoryMappedQueue for "
339 << aos::configuration::StrippedChannelToString(channel) << " under "
340 << shm_base_;
341}
342
Stephan Pleinesf63bde82024-01-13 15:59:33 -0800343} // namespace aos::starter