Add ThreadInfo to process_info.fbs

Add `per_thread_info` argument for the aos Top constructor.
When enabled, this will cause Top to include cpu usage per
per thread for all processes it reports on.
The process_info.fbs file has been updated to include this new
optional field.

Change-Id: I394a2ccb09f714b0edf0249fdab4332742bf3d3e
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/dump_rtprio.cc b/aos/dump_rtprio.cc
index e1286e2..74ade74 100644
--- a/aos/dump_rtprio.cc
+++ b/aos/dump_rtprio.cc
@@ -258,7 +258,8 @@
   aos::ShmEventLoop event_loop(&config.message());
   event_loop.SkipTimingReport();
   event_loop.SkipAosLog();
-  aos::util::Top top(&event_loop, true);
+  aos::util::Top top(&event_loop, aos::util::Top::TrackThreadsMode::kEnabled,
+                     aos::util::Top::TrackPerThreadInfoMode::kDisabled);
   top.set_track_top_processes(true);
 
   const cpu_set_t all_cpus = FindAllCpus();
diff --git a/aos/starter/irq_affinity.cc b/aos/starter/irq_affinity.cc
index a188127..7459e5e 100644
--- a/aos/starter/irq_affinity.cc
+++ b/aos/starter/irq_affinity.cc
@@ -148,7 +148,8 @@
       EventLoop *event_loop,
       const aos::FlatbufferDetachedBuffer<aos::starter::IrqAffinityConfig>
           &irq_affinity_config)
-      : top_(event_loop) {
+      : top_(event_loop, aos::util::Top::TrackThreadsMode::kDisabled,
+             aos::util::Top::TrackPerThreadInfoMode::kDisabled) {
     if (irq_affinity_config.message().has_kthreads()) {
       PopulateThreads(irq_affinity_config.message().kthreads(), &kthreads_);
     }
diff --git a/aos/starter/starterd_lib.cc b/aos/starter/starterd_lib.cc
index 6a1d3ed..4c71057 100644
--- a/aos/starter/starterd_lib.cc
+++ b/aos/starter/starterd_lib.cc
@@ -55,7 +55,8 @@
       shm_base_(FLAGS_shm_base),
       listener_(&event_loop_,
                 [this](signalfd_siginfo signal) { OnSignal(signal); }),
-      top_(&event_loop_) {
+      top_(&event_loop_, aos::util::Top::TrackThreadsMode::kDisabled,
+           aos::util::Top::TrackPerThreadInfoMode::kEnabled) {
   event_loop_.SkipAosLog();
 
   cleanup_timer_->set_name("cleanup");
diff --git a/aos/util/process_info.fbs b/aos/util/process_info.fbs
index aafdba3..1d63a14 100644
--- a/aos/util/process_info.fbs
+++ b/aos/util/process_info.fbs
@@ -1,5 +1,39 @@
 namespace aos.util;
 
+// Defines the various states a thread can be in within a Linux environment, as commonly displayed in system monitoring tools and the Linux /proc directory.
+enum ThreadState : byte {
+  // 'R' - Running: The thread is currently running or ready to run.
+  RUNNING = 0,
+  // 'S' - Sleeping interruptible: The thread is sleeping until some event happens.
+  SLEEPING_INTERRUPTIBLE = 1,
+  // 'D' Sleeping uninterruptible: The thread is sleeping and cannot interrupted (typically waiting for I/O).
+  SLEEPING_UNINTERRUPTIBLE = 2,
+  // 'T' - Stopped: The thread is stopped (suspended) by a signal or because it is being debugged.
+  STOPPED = 3,
+  // 'Z' - Zombie: The thread has completed execution but still has an entry in the process table to report its exit status to its parent.
+  ZOMBIE = 4,
+  // 'I' - Idle: The thread is an idle kernel thread not associated with any process (typically used by the kernel to perform functions such as handling deferred work).
+  IDLE = 5
+}
+
+
+table ThreadInfo {
+  // Thread ID of the thread in question.
+  tid: uint (id: 0);
+  // Name of the thread.
+  name: string (id: 1);
+  // CPU usage of the thread as a ratio of a single CPU core, calculated
+  // from the /proc/[pid]/task/[tid]/stat file. It involves parsing CPU time fields (utime and stime)
+  // and computing the usage relative to the elapsed time since the last check. This value is
+  // expressed as a ratio in the range [0, 1], where 1.0 usage means full utilization of one core.
+  cpu_usage: float (id: 2);
+  // In units of nanoseconds, reference clock monotonic. The time at which the thread
+  // started executing.
+  start_time: int64 (id: 3);
+  // State of the thread.
+  state: ThreadState (id: 4);
+}
+
 // ProcessInfo captures state information associated with a given process.
 table ProcessInfo {
   // Process ID of the process in question.
@@ -14,6 +48,8 @@
   // Amount of physical RAM taken by this process, in bytes. Will be a multiple of the
   // system's page size.
   physical_memory: uint64 (id: 3);
+  // List of threads, each with its own CPU usage, name, and ID.
+  threads: [ThreadInfo] (id: 4);
 }
 
 table TopProcessesFbs {
diff --git a/aos/util/top.cc b/aos/util/top.cc
index a74b019..c22fee4 100644
--- a/aos/util/top.cc
+++ b/aos/util/top.cc
@@ -1,8 +1,10 @@
 #include "aos/util/top.h"
 
 #include <dirent.h>
+#include <sys/types.h>  // used for DIR
 #include <unistd.h>
 
+#include <cstring>
 #include <queue>
 #include <string>
 
@@ -31,9 +33,12 @@
 }
 }  // namespace
 
-std::optional<ProcStat> ReadProcStat(pid_t pid) {
-  std::optional<std::string> contents =
-      ReadShortFile(absl::StrFormat("/proc/%d/stat", pid));
+std::optional<ProcStat> ReadProcStat(const pid_t pid,
+                                     const std::optional<pid_t> tid) {
+  const std::string path =
+      tid.has_value() ? absl::StrFormat("/proc/%d/task/%d/stat", pid, *tid)
+                      : absl::StrFormat("/proc/%d/stat", pid);
+  const std::optional<std::string> contents = ReadShortFile(path);
   if (!contents.has_value()) {
     return std::nullopt;
   }
@@ -124,11 +129,13 @@
       .exit_code = static_cast<int64_t>(numbers.at(48))};
 }
 
-Top::Top(aos::EventLoop *event_loop, bool track_threads)
+Top::Top(aos::EventLoop *event_loop, TrackThreadsMode track_threads,
+         TrackPerThreadInfoMode track_per_thread_info)
     : event_loop_(event_loop),
       clock_tick_(std::chrono::nanoseconds(1000000000 / sysconf(_SC_CLK_TCK))),
       page_size_(sysconf(_SC_PAGESIZE)),
-      track_threads_(track_threads) {
+      track_threads_(track_threads),
+      track_per_thread_info_(track_per_thread_info) {
   TimerHandler *timer = event_loop_->AddTimer([this]() { UpdateReadings(); });
   event_loop_->OnRun([timer, this]() {
     timer->Schedule(event_loop_->monotonic_now(), kSamplePeriod);
@@ -151,7 +158,7 @@
 }
 
 void Top::MaybeAddThreadIds(pid_t pid, std::set<pid_t> *pids) {
-  if (!track_threads_) {
+  if (track_threads_ == TrackThreadsMode::kDisabled) {
     return;
   }
 
@@ -176,6 +183,121 @@
   closedir(dir);
 }
 
+ThreadState CharToThreadState(const char state) {
+  switch (state) {
+    case 'R':
+      return ThreadState::RUNNING;
+    case 'S':
+      return ThreadState::SLEEPING_INTERRUPTIBLE;
+    case 'D':
+      return ThreadState::SLEEPING_UNINTERRUPTIBLE;
+    case 'T':
+      return ThreadState::STOPPED;
+    case 'Z':
+      return ThreadState::ZOMBIE;
+    case 'I':
+      return ThreadState::IDLE;
+    default:
+      LOG(FATAL) << "Invalid thread state character: " << state;
+  }
+}
+
+void Top::UpdateThreadReadings(pid_t pid, ProcessReadings &process) {
+  // Construct the path to the task directory which lists all threads
+  std::string task_dir = absl::StrFormat("/proc/%d/task", pid);
+
+  // Verify we can open the directory.
+  DIR *dir = opendir(task_dir.c_str());
+  if (dir == nullptr) {
+    LOG_EVERY_T(WARNING, 10) << "Unable to open directory: " << task_dir
+                             << ", error: " << strerror(errno);
+    ;
+    return;
+  }
+
+  // Use a set to track all the threads that we process.
+  std::set<pid_t> updated_threads;
+
+  // Iterate over all entries in the directory.
+  struct dirent *entry;
+  while ((entry = readdir(dir)) != nullptr) {
+    // Skip non-directories
+    if (entry->d_type != DT_DIR) {
+      continue;
+    }
+
+    // Skip "." and "..".
+    const bool is_current_dir = strcmp(entry->d_name, ".") == 0;
+    const bool is_parent_dir = strcmp(entry->d_name, "..") == 0;
+    if (is_current_dir || is_parent_dir) {
+      continue;
+    }
+
+    // Verify the entry is a valid thread ID.
+    pid_t tid;
+    const bool is_valid_thread_id = absl::SimpleAtoi(entry->d_name, &tid);
+    if (!is_valid_thread_id) {
+      continue;
+    }
+
+    // Read the stats for the thread.
+    const std::optional<ProcStat> thread_stats = ReadProcStat(pid, tid);
+
+    // If no stats could be read (thread may have exited), remove it.
+    if (!thread_stats.has_value()) {
+      VLOG(2) << "Removing thread " << tid << " from process " << pid;
+      process.thread_readings.erase(tid);
+      continue;
+    }
+
+    const ThreadState thread_state = CharToThreadState(thread_stats->state);
+
+    // Find or create new thread reading entry.
+    ThreadReadings &thread_reading = process.thread_readings[tid];
+
+    // Update thread name.
+    thread_reading.name = thread_stats.value().name;
+    thread_reading.start_time = ProcessStartTime(thread_stats.value());
+
+    // Update ThreadReadings with the latest cpu usage.
+    aos::RingBuffer<ThreadReading, kRingBufferSize> &readings =
+        thread_reading.readings;
+    const aos::monotonic_clock::time_point now = event_loop_->monotonic_now();
+    const std::chrono::nanoseconds run_time =
+        TotalProcessTime(thread_stats.value());
+    // The ring buffer will push out the oldest entry if it is full.
+    readings.Push({now, run_time});
+
+    // If the buffer is full, update the CPU usage percentage.
+    if (readings.full()) {
+      const ThreadReading &previous = readings[0];
+      const ThreadReading &current = readings[1];
+      const std::chrono::nanoseconds run_time =
+          current.total_run_time - previous.total_run_time;
+      const std::chrono::nanoseconds reading_time =
+          current.reading_time - previous.reading_time;
+      thread_reading.cpu_percent = aos::time::DurationInSeconds(run_time) /
+                                   aos::time::DurationInSeconds(reading_time);
+      thread_reading.state = thread_state;
+    }
+    updated_threads.insert(tid);
+  }
+
+  // Remove all threads from process.thread_readings that didn't get updated.
+  std::vector<pid_t> threads_to_remove;
+  for (const auto &[tid, thread_reading] : process.thread_readings) {
+    if (!updated_threads.contains(tid)) {
+      threads_to_remove.push_back(tid);
+    }
+  }
+  for (const pid_t tid : threads_to_remove) {
+    process.thread_readings.erase(tid);
+  }
+
+  // Close the directory.
+  closedir(dir);
+}
+
 void Top::UpdateReadings() {
   aos::monotonic_clock::time_point now = event_loop_->monotonic_now();
   // Get all the processes that we *might* care about.
@@ -220,12 +342,15 @@
       reading_iter =
           readings_
               .insert(std::make_pair(
-                  pid, ProcessReadings{.name = proc_stat->name,
-                                       .start_time = start_time,
-                                       .cpu_percent = 0.0,
-                                       .kthread = !!(proc_stat->kernel_flags &
-                                                     PF_KTHREAD),
-                                       .readings = {}}))
+                  pid,
+                  ProcessReadings{
+                      .name = proc_stat->name,
+                      .start_time = start_time,
+                      .cpu_percent = 0.0,
+                      .kthread = !!(proc_stat->kernel_flags & PF_KTHREAD),
+                      .readings = {},
+                      .thread_readings = {},
+                  }))
               .first;
     }
     ProcessReadings &process = reading_iter->second;
@@ -243,7 +368,7 @@
 
     process.readings.Push(Reading{now, TotalProcessTime(*proc_stat),
                                   RealMemoryUsage(*proc_stat)});
-    if (process.readings.size() == 2) {
+    if (process.readings.full()) {
       process.cpu_percent =
           aos::time::DurationInSeconds(process.readings[1].total_run_time -
                                        process.readings[0].total_run_time) /
@@ -252,6 +377,11 @@
     } else {
       process.cpu_percent = 0.0;
     }
+
+    // Update thread readings for this process
+    if (track_per_thread_info_ == TrackPerThreadInfoMode::kEnabled) {
+      UpdateThreadReadings(pid, process);
+    }
   }
 
   if (on_reading_update_) {
@@ -266,6 +396,44 @@
     return {};
   }
   const ProcessReadings &reading = reading_iter->second;
+
+  if (reading.readings.empty()) {
+    return {};  // Return an empty offset if readings is empty.
+  }
+
+  std::vector<flatbuffers::Offset<ThreadInfo>> thread_infos_offsets;
+  flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<ThreadInfo>>>
+      threads_vector_offset;
+
+  if (track_per_thread_info_ == TrackPerThreadInfoMode::kEnabled &&
+      !reading.thread_readings.empty()) {
+    thread_infos_offsets.reserve(reading.thread_readings.size());
+    for (const auto &[tid, thread_reading] : reading.thread_readings) {
+      // Calculate how long the thread has been alive by comparing the thread
+      // start time to the current time.
+      const aos::monotonic_clock::time_point start_time =
+          thread_reading.start_time;
+      // convert start_time to int64
+      const int64_t start_time_ns = start_time.time_since_epoch().count();
+
+      const flatbuffers::Offset<flatbuffers::String> threadName =
+          fbb->CreateString(thread_reading.name);
+      ThreadInfo::Builder thread_info_builder(*fbb);
+      thread_info_builder.add_tid(tid);
+      thread_info_builder.add_name(threadName);
+      thread_info_builder.add_cpu_usage(thread_reading.cpu_percent);
+      thread_info_builder.add_start_time(start_time_ns);
+      thread_info_builder.add_state(thread_reading.state);
+      const flatbuffers::Offset<ThreadInfo> threadInfo =
+          thread_info_builder.Finish();
+      thread_infos_offsets.push_back(threadInfo);
+    }
+    threads_vector_offset = fbb->CreateVector(thread_infos_offsets);
+  } else {
+    threads_vector_offset = 0;
+  }
+
+  // Create name string offset
   const flatbuffers::Offset<flatbuffers::String> name =
       fbb->CreateString(reading.name);
   ProcessInfo::Builder builder(*fbb);
@@ -274,6 +442,10 @@
   builder.add_cpu_usage(reading.cpu_percent);
   builder.add_physical_memory(
       reading.readings[reading.readings.size() - 1].memory_usage);
+  if (!threads_vector_offset.IsNull()) {
+    builder.add_threads(threads_vector_offset);
+  }
+
   return builder.Finish();
 }
 
diff --git a/aos/util/top.h b/aos/util/top.h
index a71873a..c24d56c 100644
--- a/aos/util/top.h
+++ b/aos/util/top.h
@@ -78,11 +78,13 @@
   int64_t exit_code;
 };
 
-// Retrieves the stats for a particular process (note that there also exists a
-// /proc/[pid]/task/[tid]/stat with the same format for per-thread information;
-// we currently do not read that).
-// Returns nullopt if unable to read/parse the file.
-std::optional<ProcStat> ReadProcStat(int pid);
+// Retrieves the statistics for a particular process or thread. If only a pid is
+// provided, it reads the process's stat file at /proc/[pid]/stat. If both pid
+// and tid are provided, it reads the thread's stat file at
+// /proc/[pid]/task/[tid]/stat. Returns nullopt if unable to read or parse the
+// file.
+std::optional<ProcStat> ReadProcStat(pid_t pid,
+                                     std::optional<pid_t> tid = std::nullopt);
 
 // This class provides a basic utility for retrieving general performance
 // information on running processes (named after the top utility). It can either
@@ -95,6 +97,10 @@
 // extremely short-lived loads, this may do a poor job of capturing information.
 class Top {
  public:
+  // Set the ring buffer size to 2 so we can keep track of a current reading and
+  // previous reading.
+  static constexpr int kRingBufferSize = 2;
+
   // A snapshot of the resource usage of a process.
   struct Reading {
     aos::monotonic_clock::time_point reading_time;
@@ -103,6 +109,19 @@
     uint64_t memory_usage;
   };
 
+  struct ThreadReading {
+    aos::monotonic_clock::time_point reading_time;
+    std::chrono::nanoseconds total_run_time;
+  };
+
+  struct ThreadReadings {
+    aos::RingBuffer<ThreadReading, kRingBufferSize> readings;
+    double cpu_percent;
+    std::string name;  // Name of the thread
+    aos::monotonic_clock::time_point start_time;
+    ThreadState state;
+  };
+
   // All the information we have about a process.
   struct ProcessReadings {
     std::string name;
@@ -112,10 +131,28 @@
     // True if this is a kernel thread, false if this is a userspace thread.
     bool kthread;
     // Last 2 readings
-    aos::RingBuffer<Reading, 2> readings;
+    aos::RingBuffer<Reading, kRingBufferSize> readings;
+    std::map<pid_t, ThreadReadings> thread_readings;
   };
 
-  Top(aos::EventLoop *event_loop, bool track_threads = false);
+  // An enum for track_threads with enabled and disabled
+  enum class TrackThreadsMode {
+    kDisabled,
+    kEnabled  // Track the thread ids for each process.
+  };
+
+  // An enum for track_per_thread_info with enabled and disabled
+  enum class TrackPerThreadInfoMode {
+    kDisabled,
+    kEnabled  // Track statistics for each thread.
+  };
+
+  // Constructs a new Top object.
+  //   event_loop: The event loop object to be used.
+  //   track_threads: Set to true to track the thread IDs for each process.
+  //   track_per_thread_info: Set to true to track statistics for each thread.
+  Top(aos::EventLoop *event_loop, TrackThreadsMode track_threads,
+      TrackPerThreadInfoMode track_per_thread_info_mode);
 
   // Set whether to track all the top processes (this will result in us having
   // to track every single process on the system, so that we can sort them).
@@ -150,6 +187,7 @@
   aos::monotonic_clock::time_point ProcessStartTime(const ProcStat &proc_stat);
   uint64_t RealMemoryUsage(const ProcStat &proc_stat);
   void UpdateReadings();
+  void UpdateThreadReadings(pid_t pid, ProcessReadings &process);
   // Adds thread ids for the given pid to the pids set,
   // if we are tracking threads.
   void MaybeAddThreadIds(pid_t pid, std::set<pid_t> *pids);
@@ -164,7 +202,10 @@
 
   std::set<pid_t> pids_to_track_;
   bool track_all_ = false;
-  bool track_threads_;
+  TrackThreadsMode track_threads_;
+
+  // Whether to include per-thread information in the top processes.
+  TrackPerThreadInfoMode track_per_thread_info_;
 
   std::map<pid_t, ProcessReadings> readings_;
 
diff --git a/aos/util/top_test.cc b/aos/util/top_test.cc
index 6888457..ce9986f 100644
--- a/aos/util/top_test.cc
+++ b/aos/util/top_test.cc
@@ -7,6 +7,7 @@
 #include <thread>
 
 #include "gtest/gtest.h"
+#include <gmock/gmock.h>
 
 #include "aos/events/shm_event_loop.h"
 #include "aos/json_to_flatbuffer.h"
@@ -15,11 +16,18 @@
 
 namespace aos::util::testing {
 
+void SetThreadName(const std::string &name) {
+  pthread_setname_np(pthread_self(), name.c_str());
+}
+
+constexpr std::string_view kTestCPUConsumer = "TestCPUConsumer";
+
 class TopTest : public ::testing::Test {
  protected:
   TopTest()
       : shm_dir_(aos::testing::TestTmpDir() + "/aos"),
         cpu_consumer_([this]() {
+          SetThreadName(std::string(kTestCPUConsumer));
           while (!stop_flag_.load()) {
           }
         }),
@@ -60,7 +68,8 @@
 
 TEST_F(TopTest, QuerySingleProcess) {
   const pid_t pid = getpid();
-  Top top(&event_loop_);
+  Top top(&event_loop_, Top::TrackThreadsMode::kDisabled,
+          Top::TrackPerThreadInfoMode::kDisabled);
   top.set_track_pids({pid});
   event_loop_.AddTimer([this]() { event_loop_.Exit(); })
       ->Schedule(event_loop_.monotonic_now() + std::chrono::seconds(2));
@@ -80,6 +89,49 @@
   // Sanity check memory usage.
   ASSERT_LT(1000000, info.message().physical_memory());
   ASSERT_GT(1000000000, info.message().physical_memory());
+
+  // Verify no per-thread information is included by default.
+  ASSERT_FALSE(info.message().has_threads());
+}
+
+TEST_F(TopTest, QuerySingleProcessWithThreads) {
+  const pid_t pid = getpid();
+  Top top(&event_loop_, Top::TrackThreadsMode::kDisabled,
+          Top::TrackPerThreadInfoMode::kEnabled);
+  top.set_track_pids({pid});
+  event_loop_.AddTimer([this]() { event_loop_.Exit(); })
+      ->Schedule(event_loop_.monotonic_now() + std::chrono::seconds(2));
+  event_loop_.Run();
+  flatbuffers::FlatBufferBuilder fbb;
+  fbb.ForceDefaults(true);
+  fbb.Finish(top.InfoForProcess(&fbb, pid));
+  aos::FlatbufferDetachedBuffer<ProcessInfo> info = fbb.Release();
+  ASSERT_EQ(pid, info.message().pid());
+  ASSERT_TRUE(info.message().has_name());
+  ASSERT_EQ("top_test", info.message().name()->string_view());
+  // Check that we did indeed consume ~1 CPU core (because we're multi-threaded,
+  // we could've consumed a bit more; and on systems where we are competing with
+  // other processes for CPU time, we may not get a full 100% load).
+  ASSERT_LT(0.5, info.message().cpu_usage());
+  ASSERT_GT(1.1, info.message().cpu_usage());
+  // Sanity check memory usage.
+  ASSERT_LT(1000000, info.message().physical_memory());
+  ASSERT_GT(1000000000, info.message().physical_memory());
+
+  // Validate that we have some per-thread information.
+  ASSERT_TRUE(info.message().has_threads());
+  ASSERT_GT(info.message().threads()->size(), 0);
+  std::set<std::string_view> thread_names;
+  double thread_cpu_usage = 0.0;
+  for (const ThreadInfo *thread_info : *info.message().threads()) {
+    thread_names.insert(thread_info->name()->str());
+    thread_cpu_usage += thread_info->cpu_usage();
+    ASSERT_TRUE(thread_info->has_state());
+  }
+  // Validate that at least one thread was named correctly.
+  ASSERT_THAT(thread_names, ::testing::Contains(kTestCPUConsumer));
+  // Validate that we consumed at some cpu one a thread.
+  ASSERT_GT(thread_cpu_usage, 0);
 }
 
 TEST_F(TopTest, TopProcesses) {
@@ -108,7 +160,8 @@
     }
   }
 
-  Top top(&event_loop_);
+  Top top(&event_loop_, Top::TrackThreadsMode::kDisabled,
+          Top::TrackPerThreadInfoMode::kDisabled);
   top.set_track_top_processes(true);
   event_loop_.AddTimer([this]() { event_loop_.Exit(); })
       ->Schedule(event_loop_.monotonic_now() + std::chrono::seconds(2));
@@ -152,7 +205,8 @@
 TEST_F(TopTest, AllTopProcesses) {
   constexpr int kNProcesses = 1000000;
 
-  Top top(&event_loop_);
+  Top top(&event_loop_, Top::TrackThreadsMode::kDisabled,
+          Top::TrackPerThreadInfoMode::kDisabled);
   top.set_track_top_processes(true);
   event_loop_.AddTimer([this]() { event_loop_.Exit(); })
       ->Schedule(event_loop_.monotonic_now() + std::chrono::seconds(2));