Add ThreadInfo to process_info.fbs
Add `per_thread_info` argument for the aos Top constructor.
When enabled, this will cause Top to include cpu usage per
per thread for all processes it reports on.
The process_info.fbs file has been updated to include this new
optional field.
Change-Id: I394a2ccb09f714b0edf0249fdab4332742bf3d3e
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/dump_rtprio.cc b/aos/dump_rtprio.cc
index e1286e2..74ade74 100644
--- a/aos/dump_rtprio.cc
+++ b/aos/dump_rtprio.cc
@@ -258,7 +258,8 @@
aos::ShmEventLoop event_loop(&config.message());
event_loop.SkipTimingReport();
event_loop.SkipAosLog();
- aos::util::Top top(&event_loop, true);
+ aos::util::Top top(&event_loop, aos::util::Top::TrackThreadsMode::kEnabled,
+ aos::util::Top::TrackPerThreadInfoMode::kDisabled);
top.set_track_top_processes(true);
const cpu_set_t all_cpus = FindAllCpus();
diff --git a/aos/starter/irq_affinity.cc b/aos/starter/irq_affinity.cc
index a188127..7459e5e 100644
--- a/aos/starter/irq_affinity.cc
+++ b/aos/starter/irq_affinity.cc
@@ -148,7 +148,8 @@
EventLoop *event_loop,
const aos::FlatbufferDetachedBuffer<aos::starter::IrqAffinityConfig>
&irq_affinity_config)
- : top_(event_loop) {
+ : top_(event_loop, aos::util::Top::TrackThreadsMode::kDisabled,
+ aos::util::Top::TrackPerThreadInfoMode::kDisabled) {
if (irq_affinity_config.message().has_kthreads()) {
PopulateThreads(irq_affinity_config.message().kthreads(), &kthreads_);
}
diff --git a/aos/starter/starterd_lib.cc b/aos/starter/starterd_lib.cc
index 6a1d3ed..4c71057 100644
--- a/aos/starter/starterd_lib.cc
+++ b/aos/starter/starterd_lib.cc
@@ -55,7 +55,8 @@
shm_base_(FLAGS_shm_base),
listener_(&event_loop_,
[this](signalfd_siginfo signal) { OnSignal(signal); }),
- top_(&event_loop_) {
+ top_(&event_loop_, aos::util::Top::TrackThreadsMode::kDisabled,
+ aos::util::Top::TrackPerThreadInfoMode::kEnabled) {
event_loop_.SkipAosLog();
cleanup_timer_->set_name("cleanup");
diff --git a/aos/util/process_info.fbs b/aos/util/process_info.fbs
index aafdba3..1d63a14 100644
--- a/aos/util/process_info.fbs
+++ b/aos/util/process_info.fbs
@@ -1,5 +1,39 @@
namespace aos.util;
+// Defines the various states a thread can be in within a Linux environment, as commonly displayed in system monitoring tools and the Linux /proc directory.
+enum ThreadState : byte {
+ // 'R' - Running: The thread is currently running or ready to run.
+ RUNNING = 0,
+ // 'S' - Sleeping interruptible: The thread is sleeping until some event happens.
+ SLEEPING_INTERRUPTIBLE = 1,
+ // 'D' Sleeping uninterruptible: The thread is sleeping and cannot interrupted (typically waiting for I/O).
+ SLEEPING_UNINTERRUPTIBLE = 2,
+ // 'T' - Stopped: The thread is stopped (suspended) by a signal or because it is being debugged.
+ STOPPED = 3,
+ // 'Z' - Zombie: The thread has completed execution but still has an entry in the process table to report its exit status to its parent.
+ ZOMBIE = 4,
+ // 'I' - Idle: The thread is an idle kernel thread not associated with any process (typically used by the kernel to perform functions such as handling deferred work).
+ IDLE = 5
+}
+
+
+table ThreadInfo {
+ // Thread ID of the thread in question.
+ tid: uint (id: 0);
+ // Name of the thread.
+ name: string (id: 1);
+ // CPU usage of the thread as a ratio of a single CPU core, calculated
+ // from the /proc/[pid]/task/[tid]/stat file. It involves parsing CPU time fields (utime and stime)
+ // and computing the usage relative to the elapsed time since the last check. This value is
+ // expressed as a ratio in the range [0, 1], where 1.0 usage means full utilization of one core.
+ cpu_usage: float (id: 2);
+ // In units of nanoseconds, reference clock monotonic. The time at which the thread
+ // started executing.
+ start_time: int64 (id: 3);
+ // State of the thread.
+ state: ThreadState (id: 4);
+}
+
// ProcessInfo captures state information associated with a given process.
table ProcessInfo {
// Process ID of the process in question.
@@ -14,6 +48,8 @@
// Amount of physical RAM taken by this process, in bytes. Will be a multiple of the
// system's page size.
physical_memory: uint64 (id: 3);
+ // List of threads, each with its own CPU usage, name, and ID.
+ threads: [ThreadInfo] (id: 4);
}
table TopProcessesFbs {
diff --git a/aos/util/top.cc b/aos/util/top.cc
index a74b019..c22fee4 100644
--- a/aos/util/top.cc
+++ b/aos/util/top.cc
@@ -1,8 +1,10 @@
#include "aos/util/top.h"
#include <dirent.h>
+#include <sys/types.h> // used for DIR
#include <unistd.h>
+#include <cstring>
#include <queue>
#include <string>
@@ -31,9 +33,12 @@
}
} // namespace
-std::optional<ProcStat> ReadProcStat(pid_t pid) {
- std::optional<std::string> contents =
- ReadShortFile(absl::StrFormat("/proc/%d/stat", pid));
+std::optional<ProcStat> ReadProcStat(const pid_t pid,
+ const std::optional<pid_t> tid) {
+ const std::string path =
+ tid.has_value() ? absl::StrFormat("/proc/%d/task/%d/stat", pid, *tid)
+ : absl::StrFormat("/proc/%d/stat", pid);
+ const std::optional<std::string> contents = ReadShortFile(path);
if (!contents.has_value()) {
return std::nullopt;
}
@@ -124,11 +129,13 @@
.exit_code = static_cast<int64_t>(numbers.at(48))};
}
-Top::Top(aos::EventLoop *event_loop, bool track_threads)
+Top::Top(aos::EventLoop *event_loop, TrackThreadsMode track_threads,
+ TrackPerThreadInfoMode track_per_thread_info)
: event_loop_(event_loop),
clock_tick_(std::chrono::nanoseconds(1000000000 / sysconf(_SC_CLK_TCK))),
page_size_(sysconf(_SC_PAGESIZE)),
- track_threads_(track_threads) {
+ track_threads_(track_threads),
+ track_per_thread_info_(track_per_thread_info) {
TimerHandler *timer = event_loop_->AddTimer([this]() { UpdateReadings(); });
event_loop_->OnRun([timer, this]() {
timer->Schedule(event_loop_->monotonic_now(), kSamplePeriod);
@@ -151,7 +158,7 @@
}
void Top::MaybeAddThreadIds(pid_t pid, std::set<pid_t> *pids) {
- if (!track_threads_) {
+ if (track_threads_ == TrackThreadsMode::kDisabled) {
return;
}
@@ -176,6 +183,121 @@
closedir(dir);
}
+ThreadState CharToThreadState(const char state) {
+ switch (state) {
+ case 'R':
+ return ThreadState::RUNNING;
+ case 'S':
+ return ThreadState::SLEEPING_INTERRUPTIBLE;
+ case 'D':
+ return ThreadState::SLEEPING_UNINTERRUPTIBLE;
+ case 'T':
+ return ThreadState::STOPPED;
+ case 'Z':
+ return ThreadState::ZOMBIE;
+ case 'I':
+ return ThreadState::IDLE;
+ default:
+ LOG(FATAL) << "Invalid thread state character: " << state;
+ }
+}
+
+void Top::UpdateThreadReadings(pid_t pid, ProcessReadings &process) {
+ // Construct the path to the task directory which lists all threads
+ std::string task_dir = absl::StrFormat("/proc/%d/task", pid);
+
+ // Verify we can open the directory.
+ DIR *dir = opendir(task_dir.c_str());
+ if (dir == nullptr) {
+ LOG_EVERY_T(WARNING, 10) << "Unable to open directory: " << task_dir
+ << ", error: " << strerror(errno);
+ ;
+ return;
+ }
+
+ // Use a set to track all the threads that we process.
+ std::set<pid_t> updated_threads;
+
+ // Iterate over all entries in the directory.
+ struct dirent *entry;
+ while ((entry = readdir(dir)) != nullptr) {
+ // Skip non-directories
+ if (entry->d_type != DT_DIR) {
+ continue;
+ }
+
+ // Skip "." and "..".
+ const bool is_current_dir = strcmp(entry->d_name, ".") == 0;
+ const bool is_parent_dir = strcmp(entry->d_name, "..") == 0;
+ if (is_current_dir || is_parent_dir) {
+ continue;
+ }
+
+ // Verify the entry is a valid thread ID.
+ pid_t tid;
+ const bool is_valid_thread_id = absl::SimpleAtoi(entry->d_name, &tid);
+ if (!is_valid_thread_id) {
+ continue;
+ }
+
+ // Read the stats for the thread.
+ const std::optional<ProcStat> thread_stats = ReadProcStat(pid, tid);
+
+ // If no stats could be read (thread may have exited), remove it.
+ if (!thread_stats.has_value()) {
+ VLOG(2) << "Removing thread " << tid << " from process " << pid;
+ process.thread_readings.erase(tid);
+ continue;
+ }
+
+ const ThreadState thread_state = CharToThreadState(thread_stats->state);
+
+ // Find or create new thread reading entry.
+ ThreadReadings &thread_reading = process.thread_readings[tid];
+
+ // Update thread name.
+ thread_reading.name = thread_stats.value().name;
+ thread_reading.start_time = ProcessStartTime(thread_stats.value());
+
+ // Update ThreadReadings with the latest cpu usage.
+ aos::RingBuffer<ThreadReading, kRingBufferSize> &readings =
+ thread_reading.readings;
+ const aos::monotonic_clock::time_point now = event_loop_->monotonic_now();
+ const std::chrono::nanoseconds run_time =
+ TotalProcessTime(thread_stats.value());
+ // The ring buffer will push out the oldest entry if it is full.
+ readings.Push({now, run_time});
+
+ // If the buffer is full, update the CPU usage percentage.
+ if (readings.full()) {
+ const ThreadReading &previous = readings[0];
+ const ThreadReading ¤t = readings[1];
+ const std::chrono::nanoseconds run_time =
+ current.total_run_time - previous.total_run_time;
+ const std::chrono::nanoseconds reading_time =
+ current.reading_time - previous.reading_time;
+ thread_reading.cpu_percent = aos::time::DurationInSeconds(run_time) /
+ aos::time::DurationInSeconds(reading_time);
+ thread_reading.state = thread_state;
+ }
+ updated_threads.insert(tid);
+ }
+
+ // Remove all threads from process.thread_readings that didn't get updated.
+ std::vector<pid_t> threads_to_remove;
+ for (const auto &[tid, thread_reading] : process.thread_readings) {
+ if (!updated_threads.contains(tid)) {
+ threads_to_remove.push_back(tid);
+ }
+ }
+ for (const pid_t tid : threads_to_remove) {
+ process.thread_readings.erase(tid);
+ }
+
+ // Close the directory.
+ closedir(dir);
+}
+
void Top::UpdateReadings() {
aos::monotonic_clock::time_point now = event_loop_->monotonic_now();
// Get all the processes that we *might* care about.
@@ -220,12 +342,15 @@
reading_iter =
readings_
.insert(std::make_pair(
- pid, ProcessReadings{.name = proc_stat->name,
- .start_time = start_time,
- .cpu_percent = 0.0,
- .kthread = !!(proc_stat->kernel_flags &
- PF_KTHREAD),
- .readings = {}}))
+ pid,
+ ProcessReadings{
+ .name = proc_stat->name,
+ .start_time = start_time,
+ .cpu_percent = 0.0,
+ .kthread = !!(proc_stat->kernel_flags & PF_KTHREAD),
+ .readings = {},
+ .thread_readings = {},
+ }))
.first;
}
ProcessReadings &process = reading_iter->second;
@@ -243,7 +368,7 @@
process.readings.Push(Reading{now, TotalProcessTime(*proc_stat),
RealMemoryUsage(*proc_stat)});
- if (process.readings.size() == 2) {
+ if (process.readings.full()) {
process.cpu_percent =
aos::time::DurationInSeconds(process.readings[1].total_run_time -
process.readings[0].total_run_time) /
@@ -252,6 +377,11 @@
} else {
process.cpu_percent = 0.0;
}
+
+ // Update thread readings for this process
+ if (track_per_thread_info_ == TrackPerThreadInfoMode::kEnabled) {
+ UpdateThreadReadings(pid, process);
+ }
}
if (on_reading_update_) {
@@ -266,6 +396,44 @@
return {};
}
const ProcessReadings &reading = reading_iter->second;
+
+ if (reading.readings.empty()) {
+ return {}; // Return an empty offset if readings is empty.
+ }
+
+ std::vector<flatbuffers::Offset<ThreadInfo>> thread_infos_offsets;
+ flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<ThreadInfo>>>
+ threads_vector_offset;
+
+ if (track_per_thread_info_ == TrackPerThreadInfoMode::kEnabled &&
+ !reading.thread_readings.empty()) {
+ thread_infos_offsets.reserve(reading.thread_readings.size());
+ for (const auto &[tid, thread_reading] : reading.thread_readings) {
+ // Calculate how long the thread has been alive by comparing the thread
+ // start time to the current time.
+ const aos::monotonic_clock::time_point start_time =
+ thread_reading.start_time;
+ // convert start_time to int64
+ const int64_t start_time_ns = start_time.time_since_epoch().count();
+
+ const flatbuffers::Offset<flatbuffers::String> threadName =
+ fbb->CreateString(thread_reading.name);
+ ThreadInfo::Builder thread_info_builder(*fbb);
+ thread_info_builder.add_tid(tid);
+ thread_info_builder.add_name(threadName);
+ thread_info_builder.add_cpu_usage(thread_reading.cpu_percent);
+ thread_info_builder.add_start_time(start_time_ns);
+ thread_info_builder.add_state(thread_reading.state);
+ const flatbuffers::Offset<ThreadInfo> threadInfo =
+ thread_info_builder.Finish();
+ thread_infos_offsets.push_back(threadInfo);
+ }
+ threads_vector_offset = fbb->CreateVector(thread_infos_offsets);
+ } else {
+ threads_vector_offset = 0;
+ }
+
+ // Create name string offset
const flatbuffers::Offset<flatbuffers::String> name =
fbb->CreateString(reading.name);
ProcessInfo::Builder builder(*fbb);
@@ -274,6 +442,10 @@
builder.add_cpu_usage(reading.cpu_percent);
builder.add_physical_memory(
reading.readings[reading.readings.size() - 1].memory_usage);
+ if (!threads_vector_offset.IsNull()) {
+ builder.add_threads(threads_vector_offset);
+ }
+
return builder.Finish();
}
diff --git a/aos/util/top.h b/aos/util/top.h
index a71873a..c24d56c 100644
--- a/aos/util/top.h
+++ b/aos/util/top.h
@@ -78,11 +78,13 @@
int64_t exit_code;
};
-// Retrieves the stats for a particular process (note that there also exists a
-// /proc/[pid]/task/[tid]/stat with the same format for per-thread information;
-// we currently do not read that).
-// Returns nullopt if unable to read/parse the file.
-std::optional<ProcStat> ReadProcStat(int pid);
+// Retrieves the statistics for a particular process or thread. If only a pid is
+// provided, it reads the process's stat file at /proc/[pid]/stat. If both pid
+// and tid are provided, it reads the thread's stat file at
+// /proc/[pid]/task/[tid]/stat. Returns nullopt if unable to read or parse the
+// file.
+std::optional<ProcStat> ReadProcStat(pid_t pid,
+ std::optional<pid_t> tid = std::nullopt);
// This class provides a basic utility for retrieving general performance
// information on running processes (named after the top utility). It can either
@@ -95,6 +97,10 @@
// extremely short-lived loads, this may do a poor job of capturing information.
class Top {
public:
+ // Set the ring buffer size to 2 so we can keep track of a current reading and
+ // previous reading.
+ static constexpr int kRingBufferSize = 2;
+
// A snapshot of the resource usage of a process.
struct Reading {
aos::monotonic_clock::time_point reading_time;
@@ -103,6 +109,19 @@
uint64_t memory_usage;
};
+ struct ThreadReading {
+ aos::monotonic_clock::time_point reading_time;
+ std::chrono::nanoseconds total_run_time;
+ };
+
+ struct ThreadReadings {
+ aos::RingBuffer<ThreadReading, kRingBufferSize> readings;
+ double cpu_percent;
+ std::string name; // Name of the thread
+ aos::monotonic_clock::time_point start_time;
+ ThreadState state;
+ };
+
// All the information we have about a process.
struct ProcessReadings {
std::string name;
@@ -112,10 +131,28 @@
// True if this is a kernel thread, false if this is a userspace thread.
bool kthread;
// Last 2 readings
- aos::RingBuffer<Reading, 2> readings;
+ aos::RingBuffer<Reading, kRingBufferSize> readings;
+ std::map<pid_t, ThreadReadings> thread_readings;
};
- Top(aos::EventLoop *event_loop, bool track_threads = false);
+ // An enum for track_threads with enabled and disabled
+ enum class TrackThreadsMode {
+ kDisabled,
+ kEnabled // Track the thread ids for each process.
+ };
+
+ // An enum for track_per_thread_info with enabled and disabled
+ enum class TrackPerThreadInfoMode {
+ kDisabled,
+ kEnabled // Track statistics for each thread.
+ };
+
+ // Constructs a new Top object.
+ // event_loop: The event loop object to be used.
+ // track_threads: Set to true to track the thread IDs for each process.
+ // track_per_thread_info: Set to true to track statistics for each thread.
+ Top(aos::EventLoop *event_loop, TrackThreadsMode track_threads,
+ TrackPerThreadInfoMode track_per_thread_info_mode);
// Set whether to track all the top processes (this will result in us having
// to track every single process on the system, so that we can sort them).
@@ -150,6 +187,7 @@
aos::monotonic_clock::time_point ProcessStartTime(const ProcStat &proc_stat);
uint64_t RealMemoryUsage(const ProcStat &proc_stat);
void UpdateReadings();
+ void UpdateThreadReadings(pid_t pid, ProcessReadings &process);
// Adds thread ids for the given pid to the pids set,
// if we are tracking threads.
void MaybeAddThreadIds(pid_t pid, std::set<pid_t> *pids);
@@ -164,7 +202,10 @@
std::set<pid_t> pids_to_track_;
bool track_all_ = false;
- bool track_threads_;
+ TrackThreadsMode track_threads_;
+
+ // Whether to include per-thread information in the top processes.
+ TrackPerThreadInfoMode track_per_thread_info_;
std::map<pid_t, ProcessReadings> readings_;
diff --git a/aos/util/top_test.cc b/aos/util/top_test.cc
index 6888457..ce9986f 100644
--- a/aos/util/top_test.cc
+++ b/aos/util/top_test.cc
@@ -7,6 +7,7 @@
#include <thread>
#include "gtest/gtest.h"
+#include <gmock/gmock.h>
#include "aos/events/shm_event_loop.h"
#include "aos/json_to_flatbuffer.h"
@@ -15,11 +16,18 @@
namespace aos::util::testing {
+void SetThreadName(const std::string &name) {
+ pthread_setname_np(pthread_self(), name.c_str());
+}
+
+constexpr std::string_view kTestCPUConsumer = "TestCPUConsumer";
+
class TopTest : public ::testing::Test {
protected:
TopTest()
: shm_dir_(aos::testing::TestTmpDir() + "/aos"),
cpu_consumer_([this]() {
+ SetThreadName(std::string(kTestCPUConsumer));
while (!stop_flag_.load()) {
}
}),
@@ -60,7 +68,8 @@
TEST_F(TopTest, QuerySingleProcess) {
const pid_t pid = getpid();
- Top top(&event_loop_);
+ Top top(&event_loop_, Top::TrackThreadsMode::kDisabled,
+ Top::TrackPerThreadInfoMode::kDisabled);
top.set_track_pids({pid});
event_loop_.AddTimer([this]() { event_loop_.Exit(); })
->Schedule(event_loop_.monotonic_now() + std::chrono::seconds(2));
@@ -80,6 +89,49 @@
// Sanity check memory usage.
ASSERT_LT(1000000, info.message().physical_memory());
ASSERT_GT(1000000000, info.message().physical_memory());
+
+ // Verify no per-thread information is included by default.
+ ASSERT_FALSE(info.message().has_threads());
+}
+
+TEST_F(TopTest, QuerySingleProcessWithThreads) {
+ const pid_t pid = getpid();
+ Top top(&event_loop_, Top::TrackThreadsMode::kDisabled,
+ Top::TrackPerThreadInfoMode::kEnabled);
+ top.set_track_pids({pid});
+ event_loop_.AddTimer([this]() { event_loop_.Exit(); })
+ ->Schedule(event_loop_.monotonic_now() + std::chrono::seconds(2));
+ event_loop_.Run();
+ flatbuffers::FlatBufferBuilder fbb;
+ fbb.ForceDefaults(true);
+ fbb.Finish(top.InfoForProcess(&fbb, pid));
+ aos::FlatbufferDetachedBuffer<ProcessInfo> info = fbb.Release();
+ ASSERT_EQ(pid, info.message().pid());
+ ASSERT_TRUE(info.message().has_name());
+ ASSERT_EQ("top_test", info.message().name()->string_view());
+ // Check that we did indeed consume ~1 CPU core (because we're multi-threaded,
+ // we could've consumed a bit more; and on systems where we are competing with
+ // other processes for CPU time, we may not get a full 100% load).
+ ASSERT_LT(0.5, info.message().cpu_usage());
+ ASSERT_GT(1.1, info.message().cpu_usage());
+ // Sanity check memory usage.
+ ASSERT_LT(1000000, info.message().physical_memory());
+ ASSERT_GT(1000000000, info.message().physical_memory());
+
+ // Validate that we have some per-thread information.
+ ASSERT_TRUE(info.message().has_threads());
+ ASSERT_GT(info.message().threads()->size(), 0);
+ std::set<std::string_view> thread_names;
+ double thread_cpu_usage = 0.0;
+ for (const ThreadInfo *thread_info : *info.message().threads()) {
+ thread_names.insert(thread_info->name()->str());
+ thread_cpu_usage += thread_info->cpu_usage();
+ ASSERT_TRUE(thread_info->has_state());
+ }
+ // Validate that at least one thread was named correctly.
+ ASSERT_THAT(thread_names, ::testing::Contains(kTestCPUConsumer));
+ // Validate that we consumed at some cpu one a thread.
+ ASSERT_GT(thread_cpu_usage, 0);
}
TEST_F(TopTest, TopProcesses) {
@@ -108,7 +160,8 @@
}
}
- Top top(&event_loop_);
+ Top top(&event_loop_, Top::TrackThreadsMode::kDisabled,
+ Top::TrackPerThreadInfoMode::kDisabled);
top.set_track_top_processes(true);
event_loop_.AddTimer([this]() { event_loop_.Exit(); })
->Schedule(event_loop_.monotonic_now() + std::chrono::seconds(2));
@@ -152,7 +205,8 @@
TEST_F(TopTest, AllTopProcesses) {
constexpr int kNProcesses = 1000000;
- Top top(&event_loop_);
+ Top top(&event_loop_, Top::TrackThreadsMode::kDisabled,
+ Top::TrackPerThreadInfoMode::kDisabled);
top.set_track_top_processes(true);
event_loop_.AddTimer([this]() { event_loop_.Exit(); })
->Schedule(event_loop_.monotonic_now() + std::chrono::seconds(2));