Add ThreadInfo to process_info.fbs

Add `per_thread_info` argument for the aos Top constructor.
When enabled, this will cause Top to include cpu usage per
per thread for all processes it reports on.
The process_info.fbs file has been updated to include this new
optional field.

Change-Id: I394a2ccb09f714b0edf0249fdab4332742bf3d3e
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/util/top.cc b/aos/util/top.cc
index a74b019..c22fee4 100644
--- a/aos/util/top.cc
+++ b/aos/util/top.cc
@@ -1,8 +1,10 @@
 #include "aos/util/top.h"
 
 #include <dirent.h>
+#include <sys/types.h>  // used for DIR
 #include <unistd.h>
 
+#include <cstring>
 #include <queue>
 #include <string>
 
@@ -31,9 +33,12 @@
 }
 }  // namespace
 
-std::optional<ProcStat> ReadProcStat(pid_t pid) {
-  std::optional<std::string> contents =
-      ReadShortFile(absl::StrFormat("/proc/%d/stat", pid));
+std::optional<ProcStat> ReadProcStat(const pid_t pid,
+                                     const std::optional<pid_t> tid) {
+  const std::string path =
+      tid.has_value() ? absl::StrFormat("/proc/%d/task/%d/stat", pid, *tid)
+                      : absl::StrFormat("/proc/%d/stat", pid);
+  const std::optional<std::string> contents = ReadShortFile(path);
   if (!contents.has_value()) {
     return std::nullopt;
   }
@@ -124,11 +129,13 @@
       .exit_code = static_cast<int64_t>(numbers.at(48))};
 }
 
-Top::Top(aos::EventLoop *event_loop, bool track_threads)
+Top::Top(aos::EventLoop *event_loop, TrackThreadsMode track_threads,
+         TrackPerThreadInfoMode track_per_thread_info)
     : event_loop_(event_loop),
       clock_tick_(std::chrono::nanoseconds(1000000000 / sysconf(_SC_CLK_TCK))),
       page_size_(sysconf(_SC_PAGESIZE)),
-      track_threads_(track_threads) {
+      track_threads_(track_threads),
+      track_per_thread_info_(track_per_thread_info) {
   TimerHandler *timer = event_loop_->AddTimer([this]() { UpdateReadings(); });
   event_loop_->OnRun([timer, this]() {
     timer->Schedule(event_loop_->monotonic_now(), kSamplePeriod);
@@ -151,7 +158,7 @@
 }
 
 void Top::MaybeAddThreadIds(pid_t pid, std::set<pid_t> *pids) {
-  if (!track_threads_) {
+  if (track_threads_ == TrackThreadsMode::kDisabled) {
     return;
   }
 
@@ -176,6 +183,121 @@
   closedir(dir);
 }
 
+ThreadState CharToThreadState(const char state) {
+  switch (state) {
+    case 'R':
+      return ThreadState::RUNNING;
+    case 'S':
+      return ThreadState::SLEEPING_INTERRUPTIBLE;
+    case 'D':
+      return ThreadState::SLEEPING_UNINTERRUPTIBLE;
+    case 'T':
+      return ThreadState::STOPPED;
+    case 'Z':
+      return ThreadState::ZOMBIE;
+    case 'I':
+      return ThreadState::IDLE;
+    default:
+      LOG(FATAL) << "Invalid thread state character: " << state;
+  }
+}
+
+void Top::UpdateThreadReadings(pid_t pid, ProcessReadings &process) {
+  // Construct the path to the task directory which lists all threads
+  std::string task_dir = absl::StrFormat("/proc/%d/task", pid);
+
+  // Verify we can open the directory.
+  DIR *dir = opendir(task_dir.c_str());
+  if (dir == nullptr) {
+    LOG_EVERY_T(WARNING, 10) << "Unable to open directory: " << task_dir
+                             << ", error: " << strerror(errno);
+    ;
+    return;
+  }
+
+  // Use a set to track all the threads that we process.
+  std::set<pid_t> updated_threads;
+
+  // Iterate over all entries in the directory.
+  struct dirent *entry;
+  while ((entry = readdir(dir)) != nullptr) {
+    // Skip non-directories
+    if (entry->d_type != DT_DIR) {
+      continue;
+    }
+
+    // Skip "." and "..".
+    const bool is_current_dir = strcmp(entry->d_name, ".") == 0;
+    const bool is_parent_dir = strcmp(entry->d_name, "..") == 0;
+    if (is_current_dir || is_parent_dir) {
+      continue;
+    }
+
+    // Verify the entry is a valid thread ID.
+    pid_t tid;
+    const bool is_valid_thread_id = absl::SimpleAtoi(entry->d_name, &tid);
+    if (!is_valid_thread_id) {
+      continue;
+    }
+
+    // Read the stats for the thread.
+    const std::optional<ProcStat> thread_stats = ReadProcStat(pid, tid);
+
+    // If no stats could be read (thread may have exited), remove it.
+    if (!thread_stats.has_value()) {
+      VLOG(2) << "Removing thread " << tid << " from process " << pid;
+      process.thread_readings.erase(tid);
+      continue;
+    }
+
+    const ThreadState thread_state = CharToThreadState(thread_stats->state);
+
+    // Find or create new thread reading entry.
+    ThreadReadings &thread_reading = process.thread_readings[tid];
+
+    // Update thread name.
+    thread_reading.name = thread_stats.value().name;
+    thread_reading.start_time = ProcessStartTime(thread_stats.value());
+
+    // Update ThreadReadings with the latest cpu usage.
+    aos::RingBuffer<ThreadReading, kRingBufferSize> &readings =
+        thread_reading.readings;
+    const aos::monotonic_clock::time_point now = event_loop_->monotonic_now();
+    const std::chrono::nanoseconds run_time =
+        TotalProcessTime(thread_stats.value());
+    // The ring buffer will push out the oldest entry if it is full.
+    readings.Push({now, run_time});
+
+    // If the buffer is full, update the CPU usage percentage.
+    if (readings.full()) {
+      const ThreadReading &previous = readings[0];
+      const ThreadReading &current = readings[1];
+      const std::chrono::nanoseconds run_time =
+          current.total_run_time - previous.total_run_time;
+      const std::chrono::nanoseconds reading_time =
+          current.reading_time - previous.reading_time;
+      thread_reading.cpu_percent = aos::time::DurationInSeconds(run_time) /
+                                   aos::time::DurationInSeconds(reading_time);
+      thread_reading.state = thread_state;
+    }
+    updated_threads.insert(tid);
+  }
+
+  // Remove all threads from process.thread_readings that didn't get updated.
+  std::vector<pid_t> threads_to_remove;
+  for (const auto &[tid, thread_reading] : process.thread_readings) {
+    if (!updated_threads.contains(tid)) {
+      threads_to_remove.push_back(tid);
+    }
+  }
+  for (const pid_t tid : threads_to_remove) {
+    process.thread_readings.erase(tid);
+  }
+
+  // Close the directory.
+  closedir(dir);
+}
+
 void Top::UpdateReadings() {
   aos::monotonic_clock::time_point now = event_loop_->monotonic_now();
   // Get all the processes that we *might* care about.
@@ -220,12 +342,15 @@
       reading_iter =
           readings_
               .insert(std::make_pair(
-                  pid, ProcessReadings{.name = proc_stat->name,
-                                       .start_time = start_time,
-                                       .cpu_percent = 0.0,
-                                       .kthread = !!(proc_stat->kernel_flags &
-                                                     PF_KTHREAD),
-                                       .readings = {}}))
+                  pid,
+                  ProcessReadings{
+                      .name = proc_stat->name,
+                      .start_time = start_time,
+                      .cpu_percent = 0.0,
+                      .kthread = !!(proc_stat->kernel_flags & PF_KTHREAD),
+                      .readings = {},
+                      .thread_readings = {},
+                  }))
               .first;
     }
     ProcessReadings &process = reading_iter->second;
@@ -243,7 +368,7 @@
 
     process.readings.Push(Reading{now, TotalProcessTime(*proc_stat),
                                   RealMemoryUsage(*proc_stat)});
-    if (process.readings.size() == 2) {
+    if (process.readings.full()) {
       process.cpu_percent =
           aos::time::DurationInSeconds(process.readings[1].total_run_time -
                                        process.readings[0].total_run_time) /
@@ -252,6 +377,11 @@
     } else {
       process.cpu_percent = 0.0;
     }
+
+    // Update thread readings for this process
+    if (track_per_thread_info_ == TrackPerThreadInfoMode::kEnabled) {
+      UpdateThreadReadings(pid, process);
+    }
   }
 
   if (on_reading_update_) {
@@ -266,6 +396,44 @@
     return {};
   }
   const ProcessReadings &reading = reading_iter->second;
+
+  if (reading.readings.empty()) {
+    return {};  // Return an empty offset if readings is empty.
+  }
+
+  std::vector<flatbuffers::Offset<ThreadInfo>> thread_infos_offsets;
+  flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<ThreadInfo>>>
+      threads_vector_offset;
+
+  if (track_per_thread_info_ == TrackPerThreadInfoMode::kEnabled &&
+      !reading.thread_readings.empty()) {
+    thread_infos_offsets.reserve(reading.thread_readings.size());
+    for (const auto &[tid, thread_reading] : reading.thread_readings) {
+      // Calculate how long the thread has been alive by comparing the thread
+      // start time to the current time.
+      const aos::monotonic_clock::time_point start_time =
+          thread_reading.start_time;
+      // convert start_time to int64
+      const int64_t start_time_ns = start_time.time_since_epoch().count();
+
+      const flatbuffers::Offset<flatbuffers::String> threadName =
+          fbb->CreateString(thread_reading.name);
+      ThreadInfo::Builder thread_info_builder(*fbb);
+      thread_info_builder.add_tid(tid);
+      thread_info_builder.add_name(threadName);
+      thread_info_builder.add_cpu_usage(thread_reading.cpu_percent);
+      thread_info_builder.add_start_time(start_time_ns);
+      thread_info_builder.add_state(thread_reading.state);
+      const flatbuffers::Offset<ThreadInfo> threadInfo =
+          thread_info_builder.Finish();
+      thread_infos_offsets.push_back(threadInfo);
+    }
+    threads_vector_offset = fbb->CreateVector(thread_infos_offsets);
+  } else {
+    threads_vector_offset = 0;
+  }
+
+  // Create name string offset
   const flatbuffers::Offset<flatbuffers::String> name =
       fbb->CreateString(reading.name);
   ProcessInfo::Builder builder(*fbb);
@@ -274,6 +442,10 @@
   builder.add_cpu_usage(reading.cpu_percent);
   builder.add_physical_memory(
       reading.readings[reading.readings.size() - 1].memory_usage);
+  if (!threads_vector_offset.IsNull()) {
+    builder.add_threads(threads_vector_offset);
+  }
+
   return builder.Finish();
 }