blob: 6917bb5047e9331e8ab2454ba48dc12605c750fd [file] [log] [blame]
Philipp Schrader790cb542023-07-05 21:06:52 -07001#include "aos/starter/starterd_lib.h"
Tyler Chatowa79419d2020-08-12 20:12:11 -07002
Stephan Pleinesf581a072024-05-23 20:59:27 -07003#include <string.h>
4
Tyler Chatowa79419d2020-08-12 20:12:11 -07005#include <algorithm>
Stephan Pleinesf581a072024-05-23 20:59:27 -07006#include <chrono>
7#include <functional>
8#include <ostream>
9#include <set>
10#include <string_view>
11#include <thread>
Tyler Chatowa79419d2020-08-12 20:12:11 -070012#include <utility>
13
Stephan Pleinesf581a072024-05-23 20:59:27 -070014#include "flatbuffers/buffer.h"
15#include "flatbuffers/flatbuffer_builder.h"
16#include "flatbuffers/string.h"
17#include "flatbuffers/vector.h"
18#include "gflags/gflags.h"
Tyler Chatowa79419d2020-08-12 20:12:11 -070019#include "glog/logging.h"
Tyler Chatowa79419d2020-08-12 20:12:11 -070020
Stephan Pleinesf581a072024-05-23 20:59:27 -070021#include "aos/events/context.h"
Philipp Schrader790cb542023-07-05 21:06:52 -070022#include "aos/json_to_flatbuffer.h"
23
Austin Schuh4d275fc2022-09-16 15:42:45 -070024// FLAGS_shm_base is defined elsewhere, declare it here so it can be used
25// to override the shared memory folder for unit testing.
26DECLARE_string(shm_base);
27// FLAGS_permissions is defined elsewhere, declare it here so it can be used
28// to set the file permissions on the shared memory block.
29DECLARE_uint32(permissions);
30
Austin Schuh816a1162023-05-31 16:29:47 -070031DEFINE_uint32(queue_initialization_threads, 0,
32 "Number of threads to spin up to initialize the queue. 0 means "
33 "use the main thread.");
Pallavi Madhukaraa17d242023-12-20 13:42:41 -080034DECLARE_bool(enable_ftrace);
Austin Schuh816a1162023-05-31 16:29:47 -070035
Stephan Pleinesf63bde82024-01-13 15:59:33 -080036namespace aos::starter {
Tyler Chatowa79419d2020-08-12 20:12:11 -070037
James Kuszmaul293b2172021-11-10 16:20:48 -080038const aos::Channel *StatusChannelForNode(const aos::Configuration *config,
39 const aos::Node *node) {
40 return configuration::GetChannel<Status>(config, "/aos", "", node);
41}
42const aos::Channel *StarterRpcChannelForNode(const aos::Configuration *config,
43 const aos::Node *node) {
44 return configuration::GetChannel<StarterRpc>(config, "/aos", "", node);
45}
46
Tyler Chatowa79419d2020-08-12 20:12:11 -070047Starter::Starter(const aos::Configuration *event_loop_config)
48 : config_msg_(event_loop_config),
49 event_loop_(event_loop_config),
50 status_sender_(event_loop_.MakeSender<aos::starter::Status>("/aos")),
James Kuszmaul2c10e052023-08-09 10:22:36 -070051 status_timer_(event_loop_.AddPhasedLoop(
52 [this](int elapsed_cycles) {
53 ServiceTimingReportFetcher(elapsed_cycles);
54 SendStatus();
55 status_count_ = 0;
56 },
57 std::chrono::milliseconds(1000))),
Austin Schuh59398d32023-05-03 08:10:55 -070058 cleanup_timer_(event_loop_.AddTimer([this] {
59 event_loop_.Exit();
60 LOG(INFO) << "Starter event loop exit finished.";
61 })),
Austin Schuhfc304942021-10-16 14:20:05 -070062 max_status_count_(
63 event_loop_.GetChannel<aos::starter::Status>("/aos")->frequency() -
64 1),
James Kuszmaul8544c492023-07-31 15:00:38 -070065 timing_report_fetcher_(
66 event_loop_.MakeFetcher<aos::timing::Report>("/aos")),
Austin Schuh4d275fc2022-09-16 15:42:45 -070067 shm_base_(FLAGS_shm_base),
Tyler Chatowa79419d2020-08-12 20:12:11 -070068 listener_(&event_loop_,
James Kuszmaul6295a642022-03-22 15:23:59 -070069 [this](signalfd_siginfo signal) { OnSignal(signal); }),
Maxwell Gumleyb27245f2024-04-11 15:46:22 -060070 top_(&event_loop_, aos::util::Top::TrackThreadsMode::kDisabled,
71 aos::util::Top::TrackPerThreadInfoMode::kEnabled) {
Tyler Chatowa79419d2020-08-12 20:12:11 -070072 event_loop_.SkipAosLog();
73
James Kuszmaul06a8f352024-03-15 14:15:57 -070074 cleanup_timer_->set_name("cleanup");
75
James Kuszmaul293b2172021-11-10 16:20:48 -080076 if (!aos::configuration::MultiNode(config_msg_)) {
77 event_loop_.MakeWatcher(
78 "/aos",
79 [this](const aos::starter::StarterRpc &cmd) { HandleStarterRpc(cmd); });
80 } else {
81 for (const aos::Node *node : aos::configuration::GetNodes(config_msg_)) {
82 const Channel *channel = StarterRpcChannelForNode(config_msg_, node);
83 CHECK(channel != nullptr) << ": Failed to find channel /aos for "
84 << StarterRpc::GetFullyQualifiedName() << " on "
85 << node->name()->string_view();
86 if (!aos::configuration::ChannelIsReadableOnNode(channel,
87 event_loop_.node())) {
88 LOG(INFO) << "StarterRpc channel "
89 << aos::configuration::StrippedChannelToString(channel)
90 << " is not readable on "
91 << event_loop_.node()->name()->string_view();
92 } else {
93 event_loop_.MakeWatcher(channel->name()->string_view(),
94 [this](const aos::starter::StarterRpc &cmd) {
95 HandleStarterRpc(cmd);
96 });
97 }
Tyler Chatowa79419d2020-08-12 20:12:11 -070098 }
James Kuszmaul293b2172021-11-10 16:20:48 -080099 }
Tyler Chatowa79419d2020-08-12 20:12:11 -0700100
Austin Schuh4d275fc2022-09-16 15:42:45 -0700101 // Catalogue all the applications for this node, so we can keep an eye on
102 // them.
Tyler Chatowa79419d2020-08-12 20:12:11 -0700103 if (config_msg_->has_applications()) {
104 const flatbuffers::Vector<flatbuffers::Offset<aos::Application>>
105 *applications = config_msg_->applications();
Ravago Jones7e2dd322020-11-21 15:58:58 -0800106
107 if (aos::configuration::MultiNode(config_msg_)) {
108 std::string_view current_node = event_loop_.node()->name()->string_view();
109 for (const aos::Application *application : *applications) {
Austin Schuh228609b2023-03-21 15:43:11 -0700110 CHECK(application->has_nodes())
111 << ": Missing nodes on " << aos::FlatbufferToJson(application);
Ravago Jones7e2dd322020-11-21 15:58:58 -0800112 for (const flatbuffers::String *node : *application->nodes()) {
113 if (node->string_view() == current_node) {
114 AddApplication(application);
115 break;
116 }
117 }
118 }
119 } else {
120 for (const aos::Application *application : *applications) {
121 AddApplication(application);
122 }
Tyler Chatowa79419d2020-08-12 20:12:11 -0700123 }
124 }
Austin Schuh4d275fc2022-09-16 15:42:45 -0700125
126 // Catalogue all the intranode channels for this node, and create
127 // MemoryMappedQueues for each one to allocate the shared memory before
128 // spawning any shasta process.
129 if (config_msg_->has_channels()) {
Austin Schuh816a1162023-05-31 16:29:47 -0700130 LOG(INFO) << "Starting to initialize shared memory.";
Austin Schuh4d275fc2022-09-16 15:42:45 -0700131 const aos::Node *this_node = event_loop_.node();
Austin Schuh816a1162023-05-31 16:29:47 -0700132 std::vector<const aos::Channel *> channels_to_construct;
Austin Schuh4d275fc2022-09-16 15:42:45 -0700133 for (const aos::Channel *channel : *config_msg_->channels()) {
134 if (aos::configuration::ChannelIsReadableOnNode(channel, this_node)) {
Austin Schuh816a1162023-05-31 16:29:47 -0700135 if (FLAGS_queue_initialization_threads == 0) {
136 AddChannel(channel);
137 } else {
138 channels_to_construct.push_back(channel);
139 }
Austin Schuh4d275fc2022-09-16 15:42:45 -0700140 }
141 }
Austin Schuh816a1162023-05-31 16:29:47 -0700142
143 if (FLAGS_queue_initialization_threads != 0) {
144 std::mutex pool_mutex;
145 std::vector<std::thread> threads;
146 threads.reserve(FLAGS_queue_initialization_threads);
147 for (size_t i = 0; i < FLAGS_queue_initialization_threads; ++i) {
148 threads.emplace_back([this, &pool_mutex, &channels_to_construct]() {
149 while (true) {
150 const aos::Channel *channel;
151 {
152 std::unique_lock<std::mutex> locker(pool_mutex);
153 if (channels_to_construct.empty()) {
154 return;
155 }
156 channel = channels_to_construct.back();
157 channels_to_construct.pop_back();
158 }
159 AddChannel(channel);
160 }
161 });
162 }
163 for (size_t i = 0; i < FLAGS_queue_initialization_threads; ++i) {
164 threads[i].join();
165 }
166 }
167 LOG(INFO) << "Starting applications.";
Austin Schuh4d275fc2022-09-16 15:42:45 -0700168 }
Tyler Chatowa79419d2020-08-12 20:12:11 -0700169}
170
James Kuszmaul293b2172021-11-10 16:20:48 -0800171void Starter::HandleStarterRpc(const StarterRpc &command) {
172 if (!command.has_command() || !command.has_name() || exiting_) {
173 return;
174 }
175
176 LOG(INFO) << "Received " << aos::FlatbufferToJson(&command);
177
178 if (command.has_nodes()) {
179 CHECK(aos::configuration::MultiNode(config_msg_));
180 bool relevant_to_this_node = false;
181 for (const flatbuffers::String *node : *command.nodes()) {
182 if (node->string_view() == event_loop_.node()->name()->string_view()) {
183 relevant_to_this_node = true;
184 }
185 }
186 if (!relevant_to_this_node) {
187 return;
188 }
189 }
190 // If not populated, restart regardless of node.
191
192 auto search = applications_.find(command.name()->str());
193 if (search != applications_.end()) {
194 // If an applicatione exists by the given name, dispatch the command
195 search->second.HandleCommand(command.command());
196 }
197}
198
James Kuszmaul6295a642022-03-22 15:23:59 -0700199void Starter::HandleStateChange() {
200 std::set<pid_t> all_pids;
201 for (const auto &pair : applications_) {
202 if (pair.second.get_pid() > 0 &&
203 pair.second.status() != aos::starter::State::STOPPED) {
204 all_pids.insert(pair.second.get_pid());
205 }
206 }
207 top_.set_track_pids(all_pids);
208
Austin Schuhfc304942021-10-16 14:20:05 -0700209 if (status_count_ < max_status_count_) {
210 SendStatus();
211 ++status_count_;
212 } else {
213 VLOG(1) << "That's enough " << status_count_ << " " << max_status_count_;
214 }
215}
216
Tyler Chatowa79419d2020-08-12 20:12:11 -0700217void Starter::Cleanup() {
218 if (exiting_) {
219 return;
220 }
221 exiting_ = true;
222 for (auto &application : applications_) {
223 application.second.Terminate();
224 }
Philipp Schradera6712522023-07-05 20:25:11 -0700225 cleanup_timer_->Schedule(event_loop_.monotonic_now() +
226 std::chrono::milliseconds(1500));
Tyler Chatowa79419d2020-08-12 20:12:11 -0700227}
228
229void Starter::OnSignal(signalfd_siginfo info) {
Tyler Chatowa79419d2020-08-12 20:12:11 -0700230 if (info.ssi_signo == SIGCHLD) {
231 // SIGCHLD messages can be collapsed if multiple are received, so all
232 // applications must check their status.
Pallavi Madhukaraa17d242023-12-20 13:42:41 -0800233 if (FLAGS_enable_ftrace) {
234 ftrace_.FormatMessage("SIGCHLD");
235 ftrace_.TurnOffOrDie();
236 }
237
Tyler Chatowa79419d2020-08-12 20:12:11 -0700238 for (auto iter = applications_.begin(); iter != applications_.end();) {
239 if (iter->second.MaybeHandleSignal()) {
240 iter = applications_.erase(iter);
241 } else {
242 ++iter;
243 }
244 }
245
246 if (exiting_ && applications_.empty()) {
247 event_loop_.Exit();
248 }
Austin Schuh3204b332021-10-16 14:20:10 -0700249 } else {
250 LOG(INFO) << "Received signal '" << strsignal(info.ssi_signo) << "'";
251
252 if (std::find(kStarterDeath.begin(), kStarterDeath.end(), info.ssi_signo) !=
253 kStarterDeath.end()) {
254 LOG(WARNING) << "Starter shutting down";
255 Cleanup();
256 }
Tyler Chatowa79419d2020-08-12 20:12:11 -0700257 }
258}
259
260Application *Starter::AddApplication(const aos::Application *application) {
James Kuszmaul6295a642022-03-22 15:23:59 -0700261 auto [iter, success] = applications_.try_emplace(
262 application->name()->str(), application, &event_loop_,
263 [this]() { HandleStateChange(); });
Tyler Chatowa79419d2020-08-12 20:12:11 -0700264 if (success) {
James Kuszmauld42edb42022-01-07 18:00:16 -0800265 // We should be catching and handling SIGCHLD correctly in the starter, so
266 // don't leave in the crutch for polling for the child process status (this
267 // is less about efficiency, and more about making sure bit rot doesn't
268 // result in the signal handling breaking).
269 iter->second.DisableChildDeathPolling();
Tyler Chatowa79419d2020-08-12 20:12:11 -0700270 return &(iter->second);
271 }
272 return nullptr;
273}
274
275void Starter::Run() {
Tyler Chatow03fdb2a2020-12-26 18:39:36 -0800276#ifdef AOS_ARCHITECTURE_arm_frc
277 PCHECK(setuid(0) == 0) << "Failed to change user to root";
278#endif
279
Tyler Chatowa79419d2020-08-12 20:12:11 -0700280 for (auto &application : applications_) {
Austin Schuh5f79a5a2021-10-12 17:46:50 -0700281 if (application.second.autostart()) {
282 application.second.Start();
283 }
Tyler Chatowa79419d2020-08-12 20:12:11 -0700284 }
285
286 event_loop_.Run();
287}
288
James Kuszmaul2c10e052023-08-09 10:22:36 -0700289void Starter::ServiceTimingReportFetcher(int elapsed_cycles) {
290 // If there is any chance that it has been longer than one cycle since we last
291 // serviced the fetcher, call Fetch(). This reduces the chances that the
292 // fetcher falls behind when the system is under heavy load. Dropping a few
293 // timing report messages when the system is under stress is fine.
294 if (timing_report_fetcher_.get() == nullptr || elapsed_cycles > 1) {
295 timing_report_fetcher_.Fetch();
296 }
James Kuszmaul8544c492023-07-31 15:00:38 -0700297 while (timing_report_fetcher_.FetchNext()) {
298 for (auto &application : applications_) {
299 application.second.ObserveTimingReport(
300 timing_report_fetcher_.context().monotonic_event_time,
301 timing_report_fetcher_.get());
302 }
303 }
304}
305
Tyler Chatowa79419d2020-08-12 20:12:11 -0700306void Starter::SendStatus() {
307 aos::Sender<aos::starter::Status>::Builder builder =
308 status_sender_.MakeBuilder();
309
310 std::vector<flatbuffers::Offset<aos::starter::ApplicationStatus>> statuses;
311
312 for (auto &application : applications_) {
James Kuszmaul6295a642022-03-22 15:23:59 -0700313 statuses.push_back(application.second.PopulateStatus(builder.fbb(), &top_));
Tyler Chatowa79419d2020-08-12 20:12:11 -0700314 }
315
316 auto statuses_fbs = builder.fbb()->CreateVector(statuses);
317
318 aos::starter::Status::Builder status_builder(*builder.fbb());
319 status_builder.add_statuses(statuses_fbs);
milind1f1dca32021-07-03 13:50:07 -0700320 builder.CheckOk(builder.Send(status_builder.Finish()));
Tyler Chatowa79419d2020-08-12 20:12:11 -0700321}
322
Austin Schuh4d275fc2022-09-16 15:42:45 -0700323void Starter::AddChannel(const aos::Channel *channel) {
324 CHECK_NOTNULL(channel);
Austin Schuh816a1162023-05-31 16:29:47 -0700325 std::unique_ptr<aos::ipc_lib::MemoryMappedQueue> queue =
326 std::make_unique<aos::ipc_lib::MemoryMappedQueue>(
327 shm_base_, FLAGS_permissions, event_loop_.configuration(), channel);
328
329 {
330 std::unique_lock<std::mutex> locker(queue_mutex_);
331 shm_queues_.emplace_back(std::move(queue));
332 }
Austin Schuh4d275fc2022-09-16 15:42:45 -0700333 VLOG(1) << "Created MemoryMappedQueue for "
334 << aos::configuration::StrippedChannelToString(channel) << " under "
335 << shm_base_;
336}
337
Stephan Pleinesf63bde82024-01-13 15:59:33 -0800338} // namespace aos::starter