Add utility for tracking process CPU usage

Being able to log this sort of information can be helpful for debugging
things when something weird happened on the system.

References: PRO-13362
Change-Id: Ie2847536fdc58279f62c9b7b0208d7fe51a90a5c
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/util/BUILD b/aos/util/BUILD
index 3b96cfd..8d21c47 100644
--- a/aos/util/BUILD
+++ b/aos/util/BUILD
@@ -1,3 +1,5 @@
+load("@com_github_google_flatbuffers//:build_defs.bzl", "flatbuffer_cc_library")
+
 package(default_visibility = ["//visibility:public"])
 
 cc_library(
@@ -272,6 +274,45 @@
     ],
 )
 
+flatbuffer_cc_library(
+    name = "process_info_fbs",
+    srcs = ["process_info.fbs"],
+    gen_reflections = True,
+    target_compatible_with = ["@platforms//os:linux"],
+    visibility = ["//visibility:public"],
+)
+
+cc_library(
+    name = "top",
+    srcs = ["top.cc"],
+    hdrs = ["top.h"],
+    target_compatible_with = ["@platforms//os:linux"],
+    deps = [
+        ":process_info_fbs",
+        "//aos/containers:ring_buffer",
+        "//aos/events:event_loop",
+        "@com_github_google_glog//:glog",
+        "@com_google_absl//absl/strings",
+    ],
+)
+
+cc_test(
+    name = "top_test",
+    srcs = ["top_test.cc"],
+    data = [
+        "//aos/events:pingpong_config",
+    ],
+    flaky = True,
+    target_compatible_with = ["@platforms//os:linux"],
+    deps = [
+        ":top",
+        "//aos/events:shm_event_loop",
+        "//aos/testing:googletest",
+        "//aos/testing:path",
+        "//aos/testing:tmpdir",
+    ],
+)
+
 cc_library(
     name = "scoped_pipe",
     srcs = ["scoped_pipe.cc"],
diff --git a/aos/util/process_info.fbs b/aos/util/process_info.fbs
new file mode 100644
index 0000000..aafdba3
--- /dev/null
+++ b/aos/util/process_info.fbs
@@ -0,0 +1,23 @@
+namespace aos.util;
+
+// ProcessInfo captures state information associated with a given process.
+table ProcessInfo {
+  // Process ID of the process in question.
+  pid: uint (id: 0);
+  // Name of the running executable.
+  name: string (id: 1);
+  // Time that the process spent executing over the past ~1 second, divided by
+  // the amount of wall-clock time that elapsed in that period. I.e., if a process is
+  // consuming all of one CPU core then this would be 1.0. Multi-threaded processes
+  // can exceed 1.0.
+  cpu_usage: float (id: 2);
+  // Amount of physical RAM taken by this process, in bytes. Will be a multiple of the
+  // system's page size.
+  physical_memory: uint64 (id: 3);
+}
+
+table TopProcessesFbs {
+  // List of processes consuming the most CPU in the last sample period, in order from
+  // most CPU to least.
+  processes: [ProcessInfo] (id: 0);
+}
diff --git a/aos/util/top.cc b/aos/util/top.cc
new file mode 100644
index 0000000..4882af7
--- /dev/null
+++ b/aos/util/top.cc
@@ -0,0 +1,254 @@
+#include "aos/util/top.h"
+
+#include <dirent.h>
+#include <unistd.h>
+
+#include <queue>
+#include <string>
+
+#include "absl/strings/numbers.h"
+#include "absl/strings/str_format.h"
+#include "absl/strings/str_split.h"
+
+namespace aos::util {
+namespace {
+std::optional<std::string> ReadShortFile(std::string_view file_name) {
+  // Open as input and seek to end immediately.
+  std::ifstream file(std::string(file_name), std::ios_base::in);
+  if (!file.good()) {
+    VLOG(1) << "Can't read " << file_name;
+    return std::nullopt;
+  }
+  const size_t kMaxLineLength = 4096;
+  char buffer[kMaxLineLength];
+  file.read(buffer, kMaxLineLength);
+  if (!file.eof()) {
+    return std::nullopt;
+  }
+  return std::string(buffer, file.gcount());
+}
+}  // namespace
+
+std::optional<ProcStat> ReadProcStat(pid_t pid) {
+  std::optional<std::string> contents =
+      ReadShortFile(absl::StrFormat("/proc/%d/stat", pid));
+  if (!contents.has_value()) {
+    return std::nullopt;
+  }
+  const size_t start_name = contents->find_first_of('(');
+  const size_t end_name = contents->find_last_of(')');
+  if (start_name == std::string::npos || end_name == std::string::npos ||
+      end_name < start_name) {
+    VLOG(1) << "No name found in stat line " << contents.value();
+    return std::nullopt;
+  }
+  std::string_view name(contents->c_str() + start_name + 1,
+                        end_name - start_name - 1);
+
+  std::vector<std::string_view> fields =
+      absl::StrSplit(std::string_view(contents->c_str() + end_name + 1,
+                                      contents->size() - end_name - 1),
+                     ' ', absl::SkipWhitespace());
+  constexpr int kNumFieldsAfterName = 50;
+  if (fields.size() != kNumFieldsAfterName) {
+    VLOG(1) << "Incorrect number of fields " << fields.size();
+    return std::nullopt;
+  }
+  // The first field is a character for the current process state; every single
+  // field after that should be an integer.
+  if (fields[0].size() != 1) {
+    VLOG(1) << "State field is too long: " << fields[0];
+    return std::nullopt;
+  }
+  std::array<absl::int128, kNumFieldsAfterName - 1> numbers;
+  for (int ii = 1; ii < kNumFieldsAfterName; ++ii) {
+    if (!absl::SimpleAtoi(fields[ii], &numbers[ii - 1])) {
+      VLOG(1) << "Failed to parse field " << ii << " as number: " << fields[ii];
+      return std::nullopt;
+    }
+  }
+  return ProcStat{
+      .pid = pid,
+      .name = std::string(name),
+      .state = fields.at(0).at(0),
+      .parent_pid = static_cast<int64_t>(numbers.at(0)),
+      .group_id = static_cast<int64_t>(numbers.at(1)),
+      .session_id = static_cast<int64_t>(numbers.at(2)),
+      .tty = static_cast<int64_t>(numbers.at(3)),
+      .tpgid = static_cast<int64_t>(numbers.at(4)),
+      .kernel_flags = static_cast<uint64_t>(numbers.at(5)),
+      .minor_faults = static_cast<uint64_t>(numbers.at(6)),
+      .children_minor_faults = static_cast<uint64_t>(numbers.at(7)),
+      .major_faults = static_cast<uint64_t>(numbers.at(8)),
+      .children_major_faults = static_cast<uint64_t>(numbers.at(9)),
+      .user_mode_ticks = static_cast<uint64_t>(numbers.at(10)),
+      .kernel_mode_ticks = static_cast<uint64_t>(numbers.at(11)),
+      .children_user_mode_ticks = static_cast<int64_t>(numbers.at(12)),
+      .children_kernel_mode_ticks = static_cast<int64_t>(numbers.at(13)),
+      .priority = static_cast<int64_t>(numbers.at(14)),
+      .nice = static_cast<int64_t>(numbers.at(15)),
+      .num_threads = static_cast<int64_t>(numbers.at(16)),
+      .itrealvalue = static_cast<int64_t>(numbers.at(17)),
+      .start_time_ticks = static_cast<uint64_t>(numbers.at(18)),
+      .virtual_memory_size = static_cast<uint64_t>(numbers.at(19)),
+      .resident_set_size = static_cast<int64_t>(numbers.at(20)),
+      .rss_soft_limit = static_cast<uint64_t>(numbers.at(21)),
+      .start_code_address = static_cast<uint64_t>(numbers.at(22)),
+      .end_code_address = static_cast<uint64_t>(numbers.at(23)),
+      .start_stack_address = static_cast<uint64_t>(numbers.at(24)),
+      .stack_pointer = static_cast<uint64_t>(numbers.at(25)),
+      .instruction_pointer = static_cast<uint64_t>(numbers.at(26)),
+      .signal_bitmask = static_cast<uint64_t>(numbers.at(27)),
+      .blocked_signals = static_cast<uint64_t>(numbers.at(28)),
+      .ignored_signals = static_cast<uint64_t>(numbers.at(29)),
+      .caught_signals = static_cast<uint64_t>(numbers.at(30)),
+      .wchan = static_cast<uint64_t>(numbers.at(31)),
+      .swap_pages = static_cast<uint64_t>(numbers.at(32)),
+      .children_swap_pages = static_cast<uint64_t>(numbers.at(33)),
+      .exit_signal = static_cast<int64_t>(numbers.at(34)),
+      .processor = static_cast<int64_t>(numbers.at(35)),
+      .rt_priority = static_cast<uint64_t>(numbers.at(36)),
+      .scheduling_policy = static_cast<uint64_t>(numbers.at(37)),
+      .block_io_delay_ticks = static_cast<uint64_t>(numbers.at(38)),
+      .guest_ticks = static_cast<uint64_t>(numbers.at(39)),
+      .children_guest_ticks = static_cast<uint64_t>(numbers.at(40)),
+      .start_data_address = static_cast<uint64_t>(numbers.at(41)),
+      .end_data_address = static_cast<uint64_t>(numbers.at(42)),
+      .start_brk_address = static_cast<uint64_t>(numbers.at(43)),
+      .start_arg_address = static_cast<uint64_t>(numbers.at(44)),
+      .end_arg_address = static_cast<uint64_t>(numbers.at(45)),
+      .start_env_address = static_cast<uint64_t>(numbers.at(46)),
+      .end_env_address = static_cast<uint64_t>(numbers.at(47)),
+      .exit_code = static_cast<int64_t>(numbers.at(48))};
+}
+
+Top::Top(aos::EventLoop *event_loop)
+    : event_loop_(event_loop),
+      clock_tick_(std::chrono::nanoseconds(1000000000 / sysconf(_SC_CLK_TCK))),
+      page_size_(sysconf(_SC_PAGESIZE)) {
+  TimerHandler *timer = event_loop_->AddTimer([this]() { UpdateReadings(); });
+  event_loop_->OnRun([timer, this]() {
+    timer->Setup(event_loop_->monotonic_now(), kSamplePeriod);
+  });
+}
+
+std::chrono::nanoseconds Top::TotalProcessTime(const ProcStat &proc_stat) {
+  return (proc_stat.user_mode_ticks + proc_stat.kernel_mode_ticks) *
+         clock_tick_;
+}
+
+aos::monotonic_clock::time_point Top::ProcessStartTime(
+    const ProcStat &proc_stat) {
+  return aos::monotonic_clock::time_point(proc_stat.start_time_ticks *
+                                          clock_tick_);
+}
+
+uint64_t Top::RealMemoryUsage(const ProcStat &proc_stat) {
+  return proc_stat.resident_set_size * page_size_;
+}
+
+void Top::UpdateReadings() {
+  aos::monotonic_clock::time_point now = event_loop_->monotonic_now();
+  // Get all the processes that we *might* care about.
+  std::set<pid_t> pids = pids_to_track_;
+  if (track_all_) {
+    DIR *const dir = opendir("/proc");
+    if (dir == nullptr) {
+      PLOG(FATAL) << "Failed to open /proc";
+    }
+    while (true) {
+      struct dirent *const dir_entry = readdir(dir);
+      if (dir_entry == nullptr) {
+        break;
+      }
+      pid_t pid;
+      if (dir_entry->d_type == DT_DIR &&
+          absl::SimpleAtoi(dir_entry->d_name, &pid)) {
+        pids.insert(pid);
+      }
+    }
+  }
+
+  for (const pid_t pid : pids) {
+    std::optional<ProcStat> proc_stat = ReadProcStat(pid);
+    // Stop tracking processes that have died.
+    if (!proc_stat.has_value()) {
+      readings_.erase(pid);
+      continue;
+    }
+    const aos::monotonic_clock::time_point start_time =
+        ProcessStartTime(*proc_stat);
+    auto reading_iter = readings_.find(pid);
+    if (reading_iter == readings_.end()) {
+      reading_iter = readings_
+                         .insert(std::make_pair(
+                             pid, ProcessReadings{.name = proc_stat->name,
+                                                  .start_time = start_time,
+                                                  .cpu_percent = 0.0,
+                                                  .readings = {}}))
+                         .first;
+    }
+    ProcessReadings &process = reading_iter->second;
+    // The process associated with the PID has changed; reset the state.
+    if (process.start_time != start_time) {
+      process.name = proc_stat->name;
+      process.start_time = start_time;
+      process.readings.Reset();
+    }
+
+    process.readings.Push(Reading{now, TotalProcessTime(*proc_stat),
+                                  RealMemoryUsage(*proc_stat)});
+    if (process.readings.size() == 2) {
+      process.cpu_percent =
+          aos::time::DurationInSeconds(process.readings[1].total_run_time -
+                                       process.readings[0].total_run_time) /
+          aos::time::DurationInSeconds(process.readings[1].reading_time -
+                                       process.readings[0].reading_time);
+    } else {
+      process.cpu_percent = 0.0;
+    }
+  }
+}
+
+flatbuffers::Offset<ProcessInfo> Top::InfoForProcess(
+    flatbuffers::FlatBufferBuilder *fbb, pid_t pid) {
+  auto reading_iter = readings_.find(pid);
+  if (reading_iter == readings_.end()) {
+    return {};
+  }
+  const ProcessReadings &reading = reading_iter->second;
+  const flatbuffers::Offset<flatbuffers::String> name =
+      fbb->CreateString(reading.name);
+  ProcessInfo::Builder builder(*fbb);
+  builder.add_pid(pid);
+  builder.add_name(name);
+  builder.add_cpu_usage(reading.cpu_percent);
+  builder.add_physical_memory(
+      reading.readings[reading.readings.size() - 1].memory_usage);
+  return builder.Finish();
+}
+
+flatbuffers::Offset<TopProcessesFbs> Top::TopProcesses(
+    flatbuffers::FlatBufferBuilder *fbb, int n) {
+  // Pair is {cpu_usage, pid}.
+  std::priority_queue<std::pair<double, pid_t>> cpu_usages;
+  for (const auto &pair : readings_) {
+    // Deliberately include 0.0 percent CPU things in the usage list so that if
+    // the user asks for an arbitrarily large number of processes they'll get
+    // everything.
+    cpu_usages.push(std::make_pair(pair.second.cpu_percent, pair.first));
+  }
+  std::vector<flatbuffers::Offset<ProcessInfo>> offsets;
+  for (int ii = 0; ii < n && !cpu_usages.empty(); ++ii) {
+    offsets.push_back(InfoForProcess(fbb, cpu_usages.top().second));
+    cpu_usages.pop();
+  }
+  const flatbuffers::Offset<
+      flatbuffers::Vector<flatbuffers::Offset<ProcessInfo>>>
+      vector_offset = fbb->CreateVector(offsets);
+  TopProcessesFbs::Builder builder(*fbb);
+  builder.add_processes(vector_offset);
+  return builder.Finish();
+}
+
+}  // namespace aos::util
diff --git a/aos/util/top.h b/aos/util/top.h
new file mode 100644
index 0000000..32ff65d
--- /dev/null
+++ b/aos/util/top.h
@@ -0,0 +1,157 @@
+#ifndef AOS_UTIL_TOP_H_
+#define AOS_UTIL_TOP_H_
+
+#include <map>
+#include <string>
+
+#include "aos/containers/ring_buffer.h"
+#include "aos/events/event_loop.h"
+#include "aos/util/process_info_generated.h"
+
+namespace aos::util {
+
+// ProcStat is a struct to hold all the fields available in /proc/[pid]/stat.
+// Currently we only use a small subset of the feilds. See man 5 proc for
+// details on what the fields are--these are in the same order as they appear in
+// the stat file.
+//
+// Things are signed or unsigned based on whether they are listed
+// as signed/unsigned in man 5 proc. We just make everything 64 bits wide
+// because otherwise we have to write out way too many casts everywhere.
+struct ProcStat {
+  int pid;
+  std::string name;
+  char state;
+  int64_t parent_pid;
+  int64_t group_id;
+  int64_t session_id;
+  int64_t tty;
+  int64_t tpgid;
+  uint64_t kernel_flags;
+  uint64_t minor_faults;
+  uint64_t children_minor_faults;
+  uint64_t major_faults;
+  uint64_t children_major_faults;
+  uint64_t user_mode_ticks;
+  uint64_t kernel_mode_ticks;
+  int64_t children_user_mode_ticks;
+  int64_t children_kernel_mode_ticks;
+  int64_t priority;
+  int64_t nice;
+  int64_t num_threads;
+  int64_t itrealvalue;  // always zero.
+  uint64_t start_time_ticks;
+  uint64_t virtual_memory_size;
+  // Number of pages in real memory.
+  int64_t resident_set_size;
+  uint64_t rss_soft_limit;
+  uint64_t start_code_address;
+  uint64_t end_code_address;
+  uint64_t start_stack_address;
+  uint64_t stack_pointer;
+  uint64_t instruction_pointer;
+  uint64_t signal_bitmask;
+  uint64_t blocked_signals;
+  uint64_t ignored_signals;
+  uint64_t caught_signals;
+  uint64_t wchan;
+  // swap_pages fields are not maintained.
+  uint64_t swap_pages;
+  uint64_t children_swap_pages;
+  int64_t exit_signal;
+  // CPU number last exitted on.
+  int64_t processor;
+  // Zero for non-realtime processes.
+  uint64_t rt_priority;
+  uint64_t scheduling_policy;
+  // Aggregated block I/O delay.
+  uint64_t block_io_delay_ticks;
+  uint64_t guest_ticks;
+  uint64_t children_guest_ticks;
+  uint64_t start_data_address;
+  uint64_t end_data_address;
+  uint64_t start_brk_address;
+  uint64_t start_arg_address;
+  uint64_t end_arg_address;
+  uint64_t start_env_address;
+  uint64_t end_env_address;
+  int64_t exit_code;
+};
+
+// Retrieves the stats for a particular process (note that there also exists a
+// /proc/[pid]/task/[tid]/stat with the same format for per-thread information;
+// we currently do not read that).
+// Returns nullopt if unable to read/parse the file.
+std::optional<ProcStat> ReadProcStat(int pid);
+
+// This class provides a basic utility for retrieving general performance
+// information on running processes (named after the top utility). It can either
+// be used to directly get information on individual processes (via
+// set_track_pids()) or used to track a list of the top N processes with the
+// highest CPU usage.
+// Note that this currently relies on sampling processes in /proc every second
+// and using the differences between the two readings to calculate CPU usage.
+// For crash-looping processees or other situations with highly variable or
+// extremely short-lived loads, this may do a poor job of capturing information.
+class Top {
+ public:
+  Top(aos::EventLoop *event_loop);
+
+  // Set whether to track all the top processes (this will result in us having
+  // to track every single process on the system, so that we can sort them).
+  void set_track_top_processes(bool track_all) { track_all_ = track_all; }
+
+  // Specify a set of individual processes to track statistics for.
+  // This can be changed at run-time, although it may take up to kSamplePeriod
+  // to have full statistics on all the relevant processes, since we need at
+  // least two samples to estimate CPU usage.
+  void set_track_pids(const std::set<pid_t> &pids) { pids_to_track_ = pids; }
+
+  // Retrieve statistics for the specified process. Will return the null offset
+  // of no such pid is being tracked.
+  flatbuffers::Offset<ProcessInfo> InfoForProcess(
+      flatbuffers::FlatBufferBuilder *fbb, pid_t pid);
+
+  // Returns information on up to n processes, sorted by CPU usage.
+  flatbuffers::Offset<TopProcessesFbs> TopProcesses(
+      flatbuffers::FlatBufferBuilder *fbb, int n);
+
+ private:
+  // Rate at which to sample /proc/[pid]/stat.
+  static constexpr std::chrono::seconds kSamplePeriod{1};
+
+  struct Reading {
+    aos::monotonic_clock::time_point reading_time;
+    std::chrono::nanoseconds total_run_time;
+    uint64_t memory_usage;
+  };
+
+  struct ProcessReadings {
+    std::string name;
+    aos::monotonic_clock::time_point start_time;
+    // CPU usage is based on the past two readings.
+    double cpu_percent;
+    aos::RingBuffer<Reading, 2> readings;
+  };
+
+  std::chrono::nanoseconds TotalProcessTime(const ProcStat &proc_stat);
+  aos::monotonic_clock::time_point ProcessStartTime(const ProcStat &proc_stat);
+  uint64_t RealMemoryUsage(const ProcStat &proc_stat);
+  void UpdateReadings();
+
+  aos::EventLoop *event_loop_;
+
+  // Length of a clock tick (used to convert from raw numbers in /proc to actual
+  // times).
+  const std::chrono::nanoseconds clock_tick_;
+  // Page size, in bytes, on the current system.
+  const long page_size_;
+
+  std::set<pid_t> pids_to_track_;
+  bool track_all_ = false;
+
+  std::map<pid_t, ProcessReadings> readings_;
+};
+
+}  // namespace aos::util
+#endif  // AOS_UTIL_TOP_H_
diff --git a/aos/util/top_test.cc b/aos/util/top_test.cc
new file mode 100644
index 0000000..cf7e03e
--- /dev/null
+++ b/aos/util/top_test.cc
@@ -0,0 +1,173 @@
+#include "aos/util/top.h"
+
+#include <unistd.h>
+
+#include <array>
+#include <string>
+#include <thread>
+
+#include "aos/events/shm_event_loop.h"
+#include "aos/json_to_flatbuffer.h"
+#include "aos/testing/path.h"
+#include "aos/testing/tmpdir.h"
+#include "gtest/gtest.h"
+
+namespace aos::util::testing {
+
+class TopTest : public ::testing::Test {
+ protected:
+  TopTest()
+      : shm_dir_(aos::testing::TestTmpDir() + "/aos"),
+        cpu_consumer_([this]() {
+          while (!stop_flag_.load()) {
+          }
+        }),
+        config_file_(
+            aos::testing::ArtifactPath("aos/events/pingpong_config.json")),
+        config_(aos::configuration::ReadConfig(config_file_)),
+        event_loop_(&config_.message()) {
+    FLAGS_shm_base = shm_dir_;
+
+    // Nuke the shm dir, to ensure we aren't being affected by any preexisting tests.
+    aos::util::UnlinkRecursive(shm_dir_);
+  }
+  ~TopTest() {
+    stop_flag_ = true;
+    cpu_consumer_.join();
+  }
+
+  gflags::FlagSaver flag_saver_;
+  std::string shm_dir_;
+
+  std::thread cpu_consumer_;
+  std::atomic<bool> stop_flag_{false};
+  const std::string config_file_;
+  const aos::FlatbufferDetachedBuffer<aos::Configuration> config_;
+  aos::ShmEventLoop event_loop_;
+};
+
+TEST_F(TopTest, TestSelfStat) {
+  const pid_t pid = getpid();
+  std::optional<ProcStat> proc_stat = ReadProcStat(pid);
+  ASSERT_TRUE(proc_stat.has_value());
+  ASSERT_EQ(pid, proc_stat->pid);
+  ASSERT_EQ("top_test", proc_stat->name);
+  ASSERT_EQ('R', proc_stat->state);
+  ASSERT_LT(1, proc_stat->num_threads);
+}
+
+TEST_F(TopTest, QuerySingleProcess) {
+  const pid_t pid = getpid();
+  Top top(&event_loop_);
+  top.set_track_pids({pid});
+  event_loop_.AddTimer([this]() { event_loop_.Exit(); })
+      ->Setup(event_loop_.monotonic_now() + std::chrono::seconds(2));
+  event_loop_.Run();
+  flatbuffers::FlatBufferBuilder fbb;
+  fbb.ForceDefaults(true);
+  fbb.Finish(top.InfoForProcess(&fbb, pid));
+  aos::FlatbufferDetachedBuffer<ProcessInfo> info = fbb.Release();
+  ASSERT_EQ(pid, info.message().pid());
+  ASSERT_TRUE(info.message().has_name());
+  ASSERT_EQ("top_test", info.message().name()->string_view());
+  // Check that we did indeed consume ~1 CPU core (because we're multi-threaded,
+  // we could've consumed a bit more; and on systems where we are competing with
+  // other processes for CPU time, we may not get a full 100% load).
+  ASSERT_LT(0.5, info.message().cpu_usage());
+  ASSERT_GT(1.1, info.message().cpu_usage());
+  // Sanity check memory usage.
+  ASSERT_LT(1000000, info.message().physical_memory());
+  ASSERT_GT(1000000000, info.message().physical_memory());
+}
+
+TEST_F(TopTest, TopProcesses) {
+  // Make some dummy processes that will just spin and get killed off at the
+  // end, so that we actually have things to query.
+  constexpr int kNProcesses = 2;
+  std::vector<pid_t> children;
+  // This will create kNProcesses children + ourself, which means we have enough
+  // processes to test that we correctly exclude extras when requesting fewer
+  // processes than exist.
+  for (int ii = 0; ii < kNProcesses; ++ii) {
+    const pid_t pid = fork();
+    PCHECK(pid >= 0);
+    if (pid == 0) {
+      while (true) {
+      }
+    } else {
+      children.push_back(pid);
+    }
+  }
+
+  Top top(&event_loop_);
+  top.set_track_top_processes(true);
+  event_loop_.AddTimer([this]() { event_loop_.Exit(); })
+      ->Setup(event_loop_.monotonic_now() + std::chrono::seconds(2));
+  event_loop_.SkipTimingReport();
+  event_loop_.SkipAosLog();
+  event_loop_.Run();
+  flatbuffers::FlatBufferBuilder fbb;
+  fbb.ForceDefaults(true);
+  fbb.Finish(top.TopProcesses(&fbb, kNProcesses));
+  aos::FlatbufferDetachedBuffer<TopProcessesFbs> info = fbb.Release();
+  ASSERT_EQ(kNProcesses, info.message().processes()->size());
+  double last_cpu = std::numeric_limits<double>::infinity();
+  std::set<pid_t> observed_pids;
+  int process_index = 0;
+  for (const ProcessInfo *info : *info.message().processes()) {
+    SCOPED_TRACE(aos::FlatbufferToJson(info));
+    ASSERT_EQ(0, observed_pids.count(info->pid()));
+    observed_pids.insert(info->pid());
+    ASSERT_TRUE(info->has_name());
+    // Confirm that the top process has non-zero CPU usage, but allow the
+    // lower-down processes to have not been scheduled in the last measurement
+    // cycle.
+    if (process_index < 1) {
+      ASSERT_LT(0.0, info->cpu_usage());
+    } else {
+      ASSERT_LE(0.0, info->cpu_usage());
+    }
+    ++process_index;
+    ASSERT_GE(last_cpu, info->cpu_usage());
+    last_cpu = info->cpu_usage();
+    ASSERT_LT(0, info->physical_memory());
+  }
+
+  for (const pid_t child : children) {
+    kill(child, SIGINT);
+  }
+}
+
+// Test thgat if we request arbitrarily many processes that we only get back as
+// many processes as actually exist and that nothing breaks.
+TEST_F(TopTest, AllTopProcesses) {
+  constexpr int kNProcesses = 1000000;
+
+  Top top(&event_loop_);
+  top.set_track_top_processes(true);
+  event_loop_.AddTimer([this]() { event_loop_.Exit(); })
+      ->Setup(event_loop_.monotonic_now() + std::chrono::seconds(2));
+  event_loop_.Run();
+  flatbuffers::FlatBufferBuilder fbb;
+  fbb.ForceDefaults(true);
+  // There should only be at most 2-3 processes visible inside the bazel
+  // sandbox.
+  fbb.Finish(top.TopProcesses(&fbb, kNProcesses));
+  aos::FlatbufferDetachedBuffer<TopProcessesFbs> info = fbb.Release();
+  ASSERT_GT(kNProcesses, info.message().processes()->size());
+  double last_cpu = std::numeric_limits<double>::infinity();
+  std::set<pid_t> observed_pids;
+  for (const ProcessInfo *info : *info.message().processes()) {
+    SCOPED_TRACE(aos::FlatbufferToJson(info));
+    LOG(INFO) << aos::FlatbufferToJson(info);
+    ASSERT_EQ(0, observed_pids.count(info->pid()));
+    observed_pids.insert(info->pid());
+    ASSERT_TRUE(info->has_name());
+    ASSERT_LE(0.0, info->cpu_usage());
+    ASSERT_GE(last_cpu, info->cpu_usage());
+    last_cpu = info->cpu_usage();
+    ASSERT_LE(0, info->physical_memory());
+  }
+}
+
+}  // namespace aos::util::testing