Add ThreadInfo to process_info.fbs
Add `per_thread_info` argument for the aos Top constructor.
When enabled, this will cause Top to include cpu usage per
per thread for all processes it reports on.
The process_info.fbs file has been updated to include this new
optional field.
Change-Id: I394a2ccb09f714b0edf0249fdab4332742bf3d3e
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/util/top.cc b/aos/util/top.cc
index a74b019..c22fee4 100644
--- a/aos/util/top.cc
+++ b/aos/util/top.cc
@@ -1,8 +1,10 @@
#include "aos/util/top.h"
#include <dirent.h>
+#include <sys/types.h> // used for DIR
#include <unistd.h>
+#include <cstring>
#include <queue>
#include <string>
@@ -31,9 +33,12 @@
}
} // namespace
-std::optional<ProcStat> ReadProcStat(pid_t pid) {
- std::optional<std::string> contents =
- ReadShortFile(absl::StrFormat("/proc/%d/stat", pid));
+std::optional<ProcStat> ReadProcStat(const pid_t pid,
+ const std::optional<pid_t> tid) {
+ const std::string path =
+ tid.has_value() ? absl::StrFormat("/proc/%d/task/%d/stat", pid, *tid)
+ : absl::StrFormat("/proc/%d/stat", pid);
+ const std::optional<std::string> contents = ReadShortFile(path);
if (!contents.has_value()) {
return std::nullopt;
}
@@ -124,11 +129,13 @@
.exit_code = static_cast<int64_t>(numbers.at(48))};
}
-Top::Top(aos::EventLoop *event_loop, bool track_threads)
+Top::Top(aos::EventLoop *event_loop, TrackThreadsMode track_threads,
+ TrackPerThreadInfoMode track_per_thread_info)
: event_loop_(event_loop),
clock_tick_(std::chrono::nanoseconds(1000000000 / sysconf(_SC_CLK_TCK))),
page_size_(sysconf(_SC_PAGESIZE)),
- track_threads_(track_threads) {
+ track_threads_(track_threads),
+ track_per_thread_info_(track_per_thread_info) {
TimerHandler *timer = event_loop_->AddTimer([this]() { UpdateReadings(); });
event_loop_->OnRun([timer, this]() {
timer->Schedule(event_loop_->monotonic_now(), kSamplePeriod);
@@ -151,7 +158,7 @@
}
void Top::MaybeAddThreadIds(pid_t pid, std::set<pid_t> *pids) {
- if (!track_threads_) {
+ if (track_threads_ == TrackThreadsMode::kDisabled) {
return;
}
@@ -176,6 +183,121 @@
closedir(dir);
}
+ThreadState CharToThreadState(const char state) {
+ switch (state) {
+ case 'R':
+ return ThreadState::RUNNING;
+ case 'S':
+ return ThreadState::SLEEPING_INTERRUPTIBLE;
+ case 'D':
+ return ThreadState::SLEEPING_UNINTERRUPTIBLE;
+ case 'T':
+ return ThreadState::STOPPED;
+ case 'Z':
+ return ThreadState::ZOMBIE;
+ case 'I':
+ return ThreadState::IDLE;
+ default:
+ LOG(FATAL) << "Invalid thread state character: " << state;
+ }
+}
+
+void Top::UpdateThreadReadings(pid_t pid, ProcessReadings &process) {
+ // Construct the path to the task directory which lists all threads
+ std::string task_dir = absl::StrFormat("/proc/%d/task", pid);
+
+ // Verify we can open the directory.
+ DIR *dir = opendir(task_dir.c_str());
+ if (dir == nullptr) {
+ LOG_EVERY_T(WARNING, 10) << "Unable to open directory: " << task_dir
+ << ", error: " << strerror(errno);
+ ;
+ return;
+ }
+
+ // Use a set to track all the threads that we process.
+ std::set<pid_t> updated_threads;
+
+ // Iterate over all entries in the directory.
+ struct dirent *entry;
+ while ((entry = readdir(dir)) != nullptr) {
+ // Skip non-directories
+ if (entry->d_type != DT_DIR) {
+ continue;
+ }
+
+ // Skip "." and "..".
+ const bool is_current_dir = strcmp(entry->d_name, ".") == 0;
+ const bool is_parent_dir = strcmp(entry->d_name, "..") == 0;
+ if (is_current_dir || is_parent_dir) {
+ continue;
+ }
+
+ // Verify the entry is a valid thread ID.
+ pid_t tid;
+ const bool is_valid_thread_id = absl::SimpleAtoi(entry->d_name, &tid);
+ if (!is_valid_thread_id) {
+ continue;
+ }
+
+ // Read the stats for the thread.
+ const std::optional<ProcStat> thread_stats = ReadProcStat(pid, tid);
+
+ // If no stats could be read (thread may have exited), remove it.
+ if (!thread_stats.has_value()) {
+ VLOG(2) << "Removing thread " << tid << " from process " << pid;
+ process.thread_readings.erase(tid);
+ continue;
+ }
+
+ const ThreadState thread_state = CharToThreadState(thread_stats->state);
+
+ // Find or create new thread reading entry.
+ ThreadReadings &thread_reading = process.thread_readings[tid];
+
+ // Update thread name.
+ thread_reading.name = thread_stats.value().name;
+ thread_reading.start_time = ProcessStartTime(thread_stats.value());
+
+ // Update ThreadReadings with the latest cpu usage.
+ aos::RingBuffer<ThreadReading, kRingBufferSize> &readings =
+ thread_reading.readings;
+ const aos::monotonic_clock::time_point now = event_loop_->monotonic_now();
+ const std::chrono::nanoseconds run_time =
+ TotalProcessTime(thread_stats.value());
+ // The ring buffer will push out the oldest entry if it is full.
+ readings.Push({now, run_time});
+
+ // If the buffer is full, update the CPU usage percentage.
+ if (readings.full()) {
+ const ThreadReading &previous = readings[0];
+ const ThreadReading ¤t = readings[1];
+ const std::chrono::nanoseconds run_time =
+ current.total_run_time - previous.total_run_time;
+ const std::chrono::nanoseconds reading_time =
+ current.reading_time - previous.reading_time;
+ thread_reading.cpu_percent = aos::time::DurationInSeconds(run_time) /
+ aos::time::DurationInSeconds(reading_time);
+ thread_reading.state = thread_state;
+ }
+ updated_threads.insert(tid);
+ }
+
+ // Remove all threads from process.thread_readings that didn't get updated.
+ std::vector<pid_t> threads_to_remove;
+ for (const auto &[tid, thread_reading] : process.thread_readings) {
+ if (!updated_threads.contains(tid)) {
+ threads_to_remove.push_back(tid);
+ }
+ }
+ for (const pid_t tid : threads_to_remove) {
+ process.thread_readings.erase(tid);
+ }
+
+ // Close the directory.
+ closedir(dir);
+}
+
void Top::UpdateReadings() {
aos::monotonic_clock::time_point now = event_loop_->monotonic_now();
// Get all the processes that we *might* care about.
@@ -220,12 +342,15 @@
reading_iter =
readings_
.insert(std::make_pair(
- pid, ProcessReadings{.name = proc_stat->name,
- .start_time = start_time,
- .cpu_percent = 0.0,
- .kthread = !!(proc_stat->kernel_flags &
- PF_KTHREAD),
- .readings = {}}))
+ pid,
+ ProcessReadings{
+ .name = proc_stat->name,
+ .start_time = start_time,
+ .cpu_percent = 0.0,
+ .kthread = !!(proc_stat->kernel_flags & PF_KTHREAD),
+ .readings = {},
+ .thread_readings = {},
+ }))
.first;
}
ProcessReadings &process = reading_iter->second;
@@ -243,7 +368,7 @@
process.readings.Push(Reading{now, TotalProcessTime(*proc_stat),
RealMemoryUsage(*proc_stat)});
- if (process.readings.size() == 2) {
+ if (process.readings.full()) {
process.cpu_percent =
aos::time::DurationInSeconds(process.readings[1].total_run_time -
process.readings[0].total_run_time) /
@@ -252,6 +377,11 @@
} else {
process.cpu_percent = 0.0;
}
+
+ // Update thread readings for this process
+ if (track_per_thread_info_ == TrackPerThreadInfoMode::kEnabled) {
+ UpdateThreadReadings(pid, process);
+ }
}
if (on_reading_update_) {
@@ -266,6 +396,44 @@
return {};
}
const ProcessReadings &reading = reading_iter->second;
+
+ if (reading.readings.empty()) {
+ return {}; // Return an empty offset if readings is empty.
+ }
+
+ std::vector<flatbuffers::Offset<ThreadInfo>> thread_infos_offsets;
+ flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<ThreadInfo>>>
+ threads_vector_offset;
+
+ if (track_per_thread_info_ == TrackPerThreadInfoMode::kEnabled &&
+ !reading.thread_readings.empty()) {
+ thread_infos_offsets.reserve(reading.thread_readings.size());
+ for (const auto &[tid, thread_reading] : reading.thread_readings) {
+ // Calculate how long the thread has been alive by comparing the thread
+ // start time to the current time.
+ const aos::monotonic_clock::time_point start_time =
+ thread_reading.start_time;
+ // convert start_time to int64
+ const int64_t start_time_ns = start_time.time_since_epoch().count();
+
+ const flatbuffers::Offset<flatbuffers::String> threadName =
+ fbb->CreateString(thread_reading.name);
+ ThreadInfo::Builder thread_info_builder(*fbb);
+ thread_info_builder.add_tid(tid);
+ thread_info_builder.add_name(threadName);
+ thread_info_builder.add_cpu_usage(thread_reading.cpu_percent);
+ thread_info_builder.add_start_time(start_time_ns);
+ thread_info_builder.add_state(thread_reading.state);
+ const flatbuffers::Offset<ThreadInfo> threadInfo =
+ thread_info_builder.Finish();
+ thread_infos_offsets.push_back(threadInfo);
+ }
+ threads_vector_offset = fbb->CreateVector(thread_infos_offsets);
+ } else {
+ threads_vector_offset = 0;
+ }
+
+ // Create name string offset
const flatbuffers::Offset<flatbuffers::String> name =
fbb->CreateString(reading.name);
ProcessInfo::Builder builder(*fbb);
@@ -274,6 +442,10 @@
builder.add_cpu_usage(reading.cpu_percent);
builder.add_physical_memory(
reading.readings[reading.readings.size() - 1].memory_usage);
+ if (!threads_vector_offset.IsNull()) {
+ builder.add_threads(threads_vector_offset);
+ }
+
return builder.Finish();
}