Add event loop application starter

New application starter reads a configuration file with the list of
applications and their properties (arguments, binary name, etc.) and
manages starting and restarting them. The status of applications are
reported on an event loop channel and they can be controlled with a
separate starter_cmd tool.

Change-Id: I7691840be38dc28887e48efcdff7926590710eb7
diff --git a/aos/configuration.fbs b/aos/configuration.fbs
index cdf21b8..bfac6a2 100644
--- a/aos/configuration.fbs
+++ b/aos/configuration.fbs
@@ -111,6 +111,11 @@
 table Application {
   // Name of the application.
   name:string (id: 0);
+
+  // Path of the executable relative to starter. If this field is unset, use
+  // name as the path. Not permitted to change while starter is running.
+  executable_name:string (id: 5);
+
   // List of maps to apply for this specific application.  Application specific
   // maps are applied in reverse order, and before the global maps.
   // For example
@@ -135,6 +140,9 @@
   // The user to run this application as. If this field is unset, run it as
   // the current user of the application starter.
   user:string (id: 3);
+
+  // List of arguments to be passed to application
+  args:[string] (id: 4);
 }
 
 // Per node data and connection information.
diff --git a/aos/events/BUILD b/aos/events/BUILD
index 7c1abd0..b4c2385 100644
--- a/aos/events/BUILD
+++ b/aos/events/BUILD
@@ -123,6 +123,8 @@
     flatbuffers = [
         ":event_loop_fbs",
         "//aos/logging:log_message_fbs",
+        "//aos/starter:starter_fbs",
+        "//aos/starter:starter_rpc_fbs",
     ],
 )
 
diff --git a/aos/events/aos.json b/aos/events/aos.json
index 36b4610..34842b2 100644
--- a/aos/events/aos.json
+++ b/aos/events/aos.json
@@ -13,6 +13,18 @@
       "type": "aos.logging.LogMessageFbs",
       "frequency": 200,
       "num_senders": 20
+    },
+    {
+      "name": "/aos",
+      "type": "aos.starter.Status",
+      "frequency": 50,
+      "num_senders": 20
+    },
+    {
+      "name": "/aos",
+      "type": "aos.starter.StarterRpc",
+      "frequency": 10,
+      "num_senders": 2
     }
   ]
 }
diff --git a/aos/events/pingpong.json b/aos/events/pingpong.json
index db727c6..d7c3bae 100644
--- a/aos/events/pingpong.json
+++ b/aos/events/pingpong.json
@@ -1,4 +1,12 @@
 {
+  "applications": [
+    {
+      "name": "ping"
+    },
+    {
+      "name": "pong"
+    }
+  ],
   "channels": [
     {
       "name": "/test",
diff --git a/aos/starter/BUILD b/aos/starter/BUILD
index 4d9f17d..91061e3 100644
--- a/aos/starter/BUILD
+++ b/aos/starter/BUILD
@@ -1,3 +1,5 @@
+load("@com_github_google_flatbuffers//:build_defs.bzl", "flatbuffer_cc_library")
+
 # This target is everything which should get deployed to the robot.
 filegroup(
     name = "starter",
@@ -25,3 +27,80 @@
         "//third_party/libevent",
     ],
 )
+
+cc_library(
+    name = "starterd_lib",
+    srcs = ["starterd_lib.cc"],
+    hdrs = ["starterd_lib.h"],
+    deps = [
+        ":starter_fbs",
+        ":starter_rpc_fbs",
+        "//aos:configuration",
+        "//aos:macros",
+        "//aos/events:shm_event_loop",
+        "@com_github_google_glog//:glog",
+    ],
+)
+
+cc_test(
+    name = "starter_test",
+    srcs = ["starter_test.cc"],
+    data = [
+        "//aos/events:ping",
+        "//aos/events:pingpong_config",
+        "//aos/events:pong",
+    ],
+    deps = [
+        ":starter_rpc_lib",
+        ":starterd_lib",
+        "//aos/events:ping_fbs",
+        "//aos/events:pong_fbs",
+        "//aos/testing:googletest",
+        "//aos/testing:tmpdir",
+    ],
+)
+
+cc_binary(
+    name = "starterd",
+    srcs = ["starterd.cc"],
+    deps = [
+        ":starterd_lib",
+        "//aos:init",
+    ],
+)
+
+cc_library(
+    name = "starter_rpc_lib",
+    srcs = ["starter_rpc_lib.cc"],
+    hdrs = ["starter_rpc_lib.h"],
+    deps = [
+        ":starter_fbs",
+        ":starter_rpc_fbs",
+        "//aos:configuration",
+        "//aos:init",
+        "//aos/events:shm_event_loop",
+    ],
+)
+
+cc_binary(
+    name = "starter_cmd",
+    srcs = ["starter_cmd.cc"],
+    deps = [
+        ":starter_rpc_lib",
+        "@com_github_google_glog//:glog",
+    ],
+)
+
+flatbuffer_cc_library(
+    name = "starter_fbs",
+    srcs = ["starter.fbs"],
+    gen_reflections = True,
+    visibility = ["//visibility:public"],
+)
+
+flatbuffer_cc_library(
+    name = "starter_rpc_fbs",
+    srcs = ["starter_rpc.fbs"],
+    gen_reflections = True,
+    visibility = ["//visibility:public"],
+)
diff --git a/aos/starter/starter.fbs b/aos/starter/starter.fbs
new file mode 100644
index 0000000..2234518
--- /dev/null
+++ b/aos/starter/starter.fbs
@@ -0,0 +1,75 @@
+namespace aos.starter;
+
+enum State : short {
+  // Process has recently stopped and is waiting to restart.
+  WAITING,
+
+  // Process has forked, waiting to move to RUNNING after verifying it started
+  // successfully.
+  STARTING,
+
+  // Process is running. pid, id, and last_start_time represent the current
+  // running process.
+  RUNNING,
+
+  // Process has been sent SIGTERM to nicely stop and starter is waiting for it
+  // to exit.
+  STOPPING,
+
+  // Process is stopped and will not automatically restart unless sent a command
+  STOPPED
+}
+
+enum LastStopReason : uint {
+  // Application received stop command message
+  STOP_REQUESTED,
+
+  // Application received restart command message
+  RESTART_REQUESTED,
+
+  // Application terminated - only occurs when starter is shutting down
+  TERMINATE,
+
+  // System failed to fork and create a new process
+  FORK_ERR,
+
+  // Failed to set parent death handler on child
+  SET_PRCTL_ERR,
+
+  // Failed to change to the requested user
+  SET_USR_ERR,
+
+  // Failed to execute application - likely due to a missing executable or
+  // invalid permissions. This is not reported if an application dies for
+  // another reason after it is already running.
+  EXECV_ERR
+}
+
+table Status {
+  statuses: [ApplicationStatus];
+}
+
+table ApplicationStatus {
+  name: string;
+
+  state: State;
+
+  // Last exit code of the process. Has a value of 0 if not started.
+  last_exit_code: ubyte;
+
+  // Last pid of the process. Could be associated with a different process
+  // unless status == RUNNING. Not present if the process has not started.
+  pid: uint;
+
+  // Unique id of this application and process
+  id: uint64;
+
+  // Start time in nanoseconds relative to monotonic clock
+  last_start_time: int64;
+
+  // Indicates the reason the application is not running. Only valid if
+  // application is STOPPED.
+  last_stop_reason: LastStopReason;
+}
+
+root_type Status;
diff --git a/aos/starter/starter_cmd.cc b/aos/starter/starter_cmd.cc
new file mode 100644
index 0000000..abc2816
--- /dev/null
+++ b/aos/starter/starter_cmd.cc
@@ -0,0 +1,58 @@
+#include <chrono>
+#include <iostream>
+#include <unordered_map>
+
+#include "aos/init.h"
+#include "aos/json_to_flatbuffer.h"
+#include "gflags/gflags.h"
+#include "starter_rpc_lib.h"
+
+DEFINE_string(config, "./config.json", "File path of aos configuration");
+
+static const std::unordered_map<std::string, aos::starter::Command> kCommands{
+    {"start", aos::starter::Command::START},
+    {"stop", aos::starter::Command::STOP},
+    {"restart", aos::starter::Command::RESTART}};
+
+int main(int argc, char **argv) {
+  aos::InitGoogle(&argc, &argv);
+  aos::InitNRT();
+
+  CHECK(argc == 3) << "Invalid number of command arguments";
+
+  const std::string application_name = argv[1];
+  const std::string command_str = argv[2];
+
+  aos::FlatbufferDetachedBuffer<aos::Configuration> config =
+      aos::configuration::ReadConfig(FLAGS_config);
+
+  if (command_str == "status") {
+    auto status = aos::starter::GetStatus(application_name, &config.message());
+    std::cout << aos::FlatbufferToJson(&status.message()) << '\n';
+
+    return 0;
+  }
+
+  const auto command_search = kCommands.find(command_str);
+  CHECK(command_search != kCommands.end())
+      << "Invalid command \"" << command_str << "\"";
+  const aos::starter::Command command = command_search->second;
+
+  if (aos::starter::SendCommandBlocking(command, application_name,
+                                        &config.message(),
+                                        std::chrono::seconds(3))) {
+    switch (command) {
+      case aos::starter::Command::START:
+        std::cout << "Successfully started " << application_name << '\n';
+        break;
+      case aos::starter::Command::STOP:
+        std::cout << "Successfully stopped " << application_name << '\n';
+        break;
+      case aos::starter::Command::RESTART:
+        std::cout << "Successfully restarted " << application_name << '\n';
+        break;
+    }
+  } else {
+    std::cout << "Failed to " << command_str << ' ' << application_name << '\n';
+  }
+}
diff --git a/aos/starter/starter_rpc.fbs b/aos/starter/starter_rpc.fbs
new file mode 100644
index 0000000..0e72cff
--- /dev/null
+++ b/aos/starter/starter_rpc.fbs
@@ -0,0 +1,27 @@
+namespace aos.starter;
+
+enum Command : short {
+  // Requests that the application move into the RUNNING state. Skips delay if
+  // WAITING, starts if STOPPED, no-op if STARTING or RUNNING.
+  START,
+
+  // Requests that the application shut down and become STOPPED. Application
+  // will not automatically restart unless a START command is sent. Cancels
+  // start if WAITING, kills application gracefully with timeout if STARTING or
+  // RUNNING, no-op if STOPPING or STOPPED.
+  STOP,
+
+  // Performs the equivalent of a STOP, followed by a START operation.
+  // Application restarts immediately (no WAITING delay).
+  RESTART,
+}
+
+table StarterRpc {
+  command : Command;
+
+  // The name of the application to send the command to. Command is ignored if
+  // the given application does not exist.
+  name: string;
+}
+
+root_type StarterRpc;
diff --git a/aos/starter/starter_rpc_lib.cc b/aos/starter/starter_rpc_lib.cc
new file mode 100644
index 0000000..efb4042
--- /dev/null
+++ b/aos/starter/starter_rpc_lib.cc
@@ -0,0 +1,131 @@
+#include "starter_rpc_lib.h"
+
+#include "aos/events/shm_event_loop.h"
+#include "aos/flatbuffer_merge.h"
+
+namespace aos {
+namespace starter {
+
+const aos::starter::ApplicationStatus *FindApplicationStatus(
+    const aos::starter::Status &status, std::string_view name) {
+  if (!status.has_statuses()) {
+    return nullptr;
+  }
+
+  auto statuses = status.statuses();
+
+  auto search =
+      std::find_if(statuses->begin(), statuses->end(),
+                   [name](const aos::starter::ApplicationStatus *app_status) {
+                     return app_status->has_name() &&
+                            app_status->name()->string_view() == name;
+                   });
+  if (search == statuses->end()) {
+    return nullptr;
+  }
+  return *search;
+}
+
+bool SendCommandBlocking(aos::starter::Command command, std::string_view name,
+                         const aos::Configuration *config,
+                         std::chrono::milliseconds timeout) {
+  aos::ShmEventLoop event_loop(config);
+  event_loop.SkipAosLog();
+
+  ::aos::Sender<aos::starter::StarterRpc> cmd_sender =
+      event_loop.MakeSender<aos::starter::StarterRpc>("/aos");
+
+  // Wait until event loop starts to send command so watcher is ready
+  event_loop.OnRun([&cmd_sender, command, name] {
+    aos::Sender<aos::starter::StarterRpc>::Builder builder =
+        cmd_sender.MakeBuilder();
+
+    auto name_str = builder.fbb()->CreateString(name);
+
+    aos::starter::StarterRpc::Builder cmd_builder =
+        builder.MakeBuilder<aos::starter::StarterRpc>();
+
+    cmd_builder.add_name(name_str);
+    cmd_builder.add_command(command);
+
+    builder.Send(cmd_builder.Finish());
+  });
+
+  // If still waiting after timeout milliseconds, exit the loop
+  event_loop.AddTimer([&event_loop] { event_loop.Exit(); })
+      ->Setup(event_loop.monotonic_now() + timeout);
+
+  // Fetch the last list of statuses to compare the requested application's id
+  // against for commands such as restart.
+  auto initial_status_fetcher =
+      event_loop.MakeFetcher<aos::starter::Status>("/aos");
+  initial_status_fetcher.Fetch();
+  auto initial_status =
+      initial_status_fetcher
+          ? FindApplicationStatus(*initial_status_fetcher, name)
+          : nullptr;
+
+  const std::optional<uint64_t> initial_id =
+      (initial_status != nullptr && initial_status->has_id())
+          ? std::make_optional(initial_status->id())
+          : std::nullopt;
+
+  bool success = false;
+  event_loop.MakeWatcher(
+      "/aos", [&event_loop, command, name, initial_id,
+               &success](const aos::starter::Status &status) {
+        const aos::starter::ApplicationStatus *app_status =
+            FindApplicationStatus(status, name);
+
+        const std::optional<aos::starter::State> state =
+            (app_status != nullptr && app_status->has_state())
+                ? std::make_optional(app_status->state())
+                : std::nullopt;
+
+        switch (command) {
+          case aos::starter::Command::START: {
+            if (state == aos::starter::State::RUNNING) {
+              success = true;
+              event_loop.Exit();
+            }
+            break;
+          }
+          case aos::starter::Command::STOP: {
+            if (state == aos::starter::State::STOPPED) {
+              success = true;
+              event_loop.Exit();
+            }
+            break;
+          }
+          case aos::starter::Command::RESTART: {
+            if (state == aos::starter::State::RUNNING && app_status->has_id() &&
+                app_status->id() != initial_id) {
+              success = true;
+              event_loop.Exit();
+            }
+            break;
+          }
+        }
+      });
+
+  event_loop.Run();
+
+  return success;
+}
+
+const FlatbufferDetachedBuffer<aos::starter::ApplicationStatus> GetStatus(
+    std::string_view name, const Configuration *config) {
+  ShmEventLoop event_loop(config);
+  event_loop.SkipAosLog();
+
+  auto status_fetcher = event_loop.MakeFetcher<aos::starter::Status>("/aos");
+  status_fetcher.Fetch();
+  auto status =
+      status_fetcher ? FindApplicationStatus(*status_fetcher, name) : nullptr;
+  return status ? aos::CopyFlatBuffer(status)
+                : FlatbufferDetachedBuffer<
+                      aos::starter::ApplicationStatus>::Empty();
+}
+
+}  // namespace starter
+}  // namespace aos
diff --git a/aos/starter/starter_rpc_lib.h b/aos/starter/starter_rpc_lib.h
new file mode 100644
index 0000000..57c9e6b
--- /dev/null
+++ b/aos/starter/starter_rpc_lib.h
@@ -0,0 +1,36 @@
+#ifndef AOS_STARTER_STARTER_RPC_LIB_H_
+#define AOS_STARTER_STARTER_RPC_LIB_H_
+
+#include <chrono>
+
+#include "aos/configuration.h"
+#include "aos/starter/starter_generated.h"
+#include "aos/starter/starter_rpc_generated.h"
+
+namespace aos {
+namespace starter {
+
+// Finds the status of an individual application within a starter status message
+// Returns nullptr if no application found by the given name.
+const aos::starter::ApplicationStatus *FindApplicationStatus(
+    const aos::starter::Status &status, std::string_view name);
+
+// Sends the given command to the application with the name name. Creates a
+// temporary event loop from the provided config for sending the command and
+// receiving back status messages. Returns true if the command executed
+// successfully, or false otherwise. Returns false if the desired state was not
+// achieved within timeout.
+bool SendCommandBlocking(aos::starter::Command, std::string_view name,
+                         const aos::Configuration *config,
+                         std::chrono::milliseconds timeout);
+
+// Fetches the status of the application with the given name. Creates a
+// temporary event loop from the provided config for fetching. Returns an empty
+// flatbuffer if the application is not found.
+const aos::FlatbufferDetachedBuffer<aos::starter::ApplicationStatus> GetStatus(
+    std::string_view name, const aos::Configuration *config);
+
+}  // namespace starter
+}  // namespace aos
+
+#endif  // AOS_STARTER_STARTER_RPC_LIB_H_
diff --git a/aos/starter/starter_test.cc b/aos/starter/starter_test.cc
new file mode 100644
index 0000000..935c591
--- /dev/null
+++ b/aos/starter/starter_test.cc
@@ -0,0 +1,192 @@
+#include <signal.h>
+
+#include <future>
+#include <thread>
+
+#include "aos/events/ping_generated.h"
+#include "aos/events/pong_generated.h"
+#include "aos/testing/tmpdir.h"
+#include "gtest/gtest.h"
+#include "starter_rpc_lib.h"
+#include "starterd_lib.h"
+
+TEST(StarterdTest, StartStopTest) {
+  const std::string config_file = "aos/events/pingpong_config.json";
+
+  aos::FlatbufferDetachedBuffer<aos::Configuration> config =
+      aos::configuration::ReadConfig(config_file);
+
+  const std::string test_dir = aos::testing::TestTmpDir();
+
+  auto new_config = aos::configuration::MergeWithConfig(
+      &config.message(), absl::StrFormat(
+                             R"({"applications": [
+                                  {
+                                    "name": "ping",
+                                    "executable_name": "aos/events/ping",
+                                    "args": ["--shm_base", "%s/aos"]
+                                  },
+                                  {
+                                    "name": "pong",
+                                    "executable_name": "aos/events/pong",
+                                    "args": ["--shm_base", "%s/aos"]
+                                  }
+                                ]})",
+                             test_dir, test_dir));
+
+  const aos::Configuration *config_msg = &new_config.message();
+
+  // Set up starter with config file
+  aos::starter::Starter starter(config_msg);
+
+  // Create an event loop to watch for ping messages, verifying it actually
+  // started.
+  aos::ShmEventLoop watcher_loop(config_msg);
+  watcher_loop.SkipAosLog();
+
+  watcher_loop
+      .AddTimer([&watcher_loop] {
+        watcher_loop.Exit();
+        FAIL();
+      })
+      ->Setup(watcher_loop.monotonic_now() + std::chrono::seconds(7));
+
+  int test_stage = 0;
+  watcher_loop.MakeWatcher(
+      "/test", [&test_stage, config_msg](const aos::examples::Ping &) {
+        switch (test_stage) {
+          case 1: {
+            test_stage = 2;
+            break;
+          }
+          case 2: {
+            std::thread([config_msg] {
+              LOG(INFO) << "Send command";
+              ASSERT_TRUE(aos::starter::SendCommandBlocking(
+                  aos::starter::Command::STOP, "ping", config_msg,
+                  std::chrono::seconds(3)));
+            }).detach();
+            test_stage = 3;
+            break;
+          }
+        }
+      });
+
+  watcher_loop.MakeWatcher(
+      "/aos", [&test_stage, &watcher_loop](const aos::starter::Status &status) {
+        const aos::starter::ApplicationStatus *app_status =
+            FindApplicationStatus(status, "ping");
+        if (app_status == nullptr) {
+          return;
+        }
+
+        switch (test_stage) {
+          case 0: {
+            if (app_status->has_state() &&
+                app_status->state() == aos::starter::State::RUNNING) {
+              test_stage = 1;
+            }
+            break;
+          }
+
+          case 3: {
+            if (app_status->has_state() &&
+                app_status->state() == aos::starter::State::STOPPED) {
+              watcher_loop.Exit();
+              SUCCEED();
+            }
+            break;
+          }
+        }
+      });
+
+  std::thread starterd_thread([&starter] { starter.Run(); });
+  watcher_loop.Run();
+
+  starter.Cleanup();
+  starterd_thread.join();
+}
+
+TEST(StarterdTest, DeathTest) {
+  const std::string config_file = "aos/events/pingpong_config.json";
+
+  aos::FlatbufferDetachedBuffer<aos::Configuration> config =
+      aos::configuration::ReadConfig(config_file);
+
+  const std::string test_dir = aos::testing::TestTmpDir();
+
+  auto new_config = aos::configuration::MergeWithConfig(
+      &config.message(), absl::StrFormat(
+                             R"({"applications": [
+                                  {
+                                    "name": "ping",
+                                    "executable_name": "aos/events/ping",
+                                    "args": ["--shm_base", "%s/aos"]
+                                  },
+                                  {
+                                    "name": "pong",
+                                    "executable_name": "aos/events/pong",
+                                    "args": ["--shm_base", "%s/aos"]
+                                  }
+                                ]})",
+                             test_dir, test_dir));
+
+  const aos::Configuration *config_msg = &new_config.message();
+
+  // Set up starter with config file
+  aos::starter::Starter starter(config_msg);
+
+  // Create an event loop to watch for ping messages, verifying it actually
+  // started.
+  aos::ShmEventLoop watcher_loop(config_msg);
+  watcher_loop.SkipAosLog();
+
+  watcher_loop
+      .AddTimer([&watcher_loop] {
+        watcher_loop.Exit();
+        FAIL();
+      })
+      ->Setup(watcher_loop.monotonic_now() + std::chrono::seconds(7));
+
+  int test_stage = 0;
+  uint64_t id;
+
+  watcher_loop.MakeWatcher("/aos", [&test_stage, &watcher_loop,
+                                    &id](const aos::starter::Status &status) {
+    const aos::starter::ApplicationStatus *app_status =
+        FindApplicationStatus(status, "ping");
+    if (app_status == nullptr) {
+      return;
+    }
+
+    switch (test_stage) {
+      case 0: {
+        if (app_status->has_state() &&
+            app_status->state() == aos::starter::State::RUNNING) {
+          test_stage = 1;
+          ASSERT_TRUE(app_status->has_pid());
+          ASSERT_TRUE(kill(app_status->pid(), SIGINT) != -1);
+          ASSERT_TRUE(app_status->has_id());
+          id = app_status->id();
+        }
+        break;
+      }
+
+      case 1: {
+        if (app_status->has_state() &&
+            app_status->state() == aos::starter::State::RUNNING &&
+            app_status->has_id() && app_status->id() != id) {
+          watcher_loop.Exit();
+          SUCCEED();
+        }
+        break;
+      }
+    }
+  });
+
+  std::thread starterd_thread([&starter] { starter.Run(); });
+  watcher_loop.Run();
+
+  starter.Cleanup();
+  starterd_thread.join();
+}
diff --git a/aos/starter/starterd.cc b/aos/starter/starterd.cc
new file mode 100644
index 0000000..66786a9
--- /dev/null
+++ b/aos/starter/starterd.cc
@@ -0,0 +1,20 @@
+#include "aos/init.h"
+#include "gflags/gflags.h"
+#include "starterd_lib.h"
+
+DEFINE_string(config, "./config.json", "File path of aos configuration");
+
+int main(int argc, char **argv) {
+  aos::InitGoogle(&argc, &argv);
+
+  aos::FlatbufferDetachedBuffer<aos::Configuration> config =
+      aos::configuration::ReadConfig(FLAGS_config);
+
+  const aos::Configuration *config_msg = &config.message();
+
+  aos::starter::Starter starter(config_msg);
+
+  starter.Run();
+
+  return 0;
+}
diff --git a/aos/starter/starterd_lib.cc b/aos/starter/starterd_lib.cc
new file mode 100644
index 0000000..ea7fdee
--- /dev/null
+++ b/aos/starter/starterd_lib.cc
@@ -0,0 +1,489 @@
+#include "starterd_lib.h"
+
+#include <fcntl.h>
+#include <pwd.h>
+#include <sys/fsuid.h>
+#include <sys/prctl.h>
+
+#include <algorithm>
+#include <utility>
+
+#include "glog/logging.h"
+#include "glog/stl_logging.h"
+
+namespace aos {
+namespace starter {
+
+Application::Application(const aos::Application *application,
+                         aos::ShmEventLoop *event_loop)
+    : name_(application->name()->string_view()),
+      path_(application->has_executable_name()
+                ? application->executable_name()->string_view()
+                : application->name()->string_view()),
+      user_(application->has_user() ? FindUid(application->user()->c_str())
+                                    : std::nullopt),
+      event_loop_(event_loop),
+      start_timer_(event_loop_->AddTimer([this] {
+        status_ = aos::starter::State::RUNNING;
+        LOG(INFO) << "Started " << name_;
+      })),
+      restart_timer_(event_loop_->AddTimer([this] { DoStart(); })),
+      stop_timer_(event_loop_->AddTimer([this] {
+        if (kill(pid_, SIGKILL) == 0) {
+          LOG(WARNING) << "Sent SIGKILL to " << name_ << " pid: " << pid_;
+        }
+      }))
+
+{}
+
+void Application::DoStart() {
+  if (status_ != aos::starter::State::WAITING) {
+    return;
+  }
+
+  start_timer_->Disable();
+  restart_timer_->Disable();
+
+  LOG(INFO) << "Starting " << name_;
+
+  std::tie(read_pipe_, write_pipe_) = ScopedPipe::MakePipe();
+
+  const pid_t pid = fork();
+
+  if (pid != 0) {
+    if (pid == -1) {
+      PLOG(WARNING) << "Failed to fork";
+      stop_reason_ = aos::starter::LastStopReason::FORK_ERR;
+      status_ = aos::starter::State::STOPPED;
+    } else {
+      pid_ = pid;
+      id_ = next_id_++;
+      start_time_ = event_loop_->monotonic_now();
+      status_ = aos::starter::State::STARTING;
+
+      // Setup timer which moves application to RUNNING state if it is still
+      // alive in 1 second.
+      start_timer_->Setup(event_loop_->monotonic_now() +
+                          std::chrono::seconds(1));
+    }
+    return;
+  }
+
+  // Clear out signal mask of parent so forked process receives all signals
+  // normally.
+  sigset_t empty_mask;
+  sigemptyset(&empty_mask);
+  sigprocmask(SIG_SETMASK, &empty_mask, nullptr);
+
+  // Cleanup children if starter dies in a way that is not handled gracefully.
+  if (prctl(PR_SET_PDEATHSIG, SIGKILL) == -1) {
+    write_pipe_.Write(
+        static_cast<uint32_t>(aos::starter::LastStopReason::SET_PRCTL_ERR));
+    PLOG(FATAL) << "Could not set PR_SET_PDEATHSIG to SIGKILL";
+  }
+
+  if (user_) {
+    if (seteuid(*user_) == -1 || setfsuid(*user_) == -1) {
+      write_pipe_.Write(
+          static_cast<uint32_t>(aos::starter::LastStopReason::SET_USR_ERR));
+      PLOG(FATAL) << "Could not set user for " << name_ << " to " << *user_;
+    }
+  }
+
+  // argv[0] should be the program name
+  args_.insert(args_.begin(), path_.data());
+
+  execv(path_.c_str(), args_.data());
+
+  // If we got here, something went wrong
+  write_pipe_.Write(
+      static_cast<uint32_t>(aos::starter::LastStopReason::EXECV_ERR));
+  PLOG(WARNING) << "Could not execute " << name_ << " (" << path_ << ')';
+
+  _exit(EXIT_FAILURE);
+}
+
+void Application::DoStop(bool restart) {
+  // If stop or restart received, the old state of these is no longer applicable
+  // so cancel both.
+  restart_timer_->Disable();
+  start_timer_->Disable();
+
+  switch (status_) {
+    case aos::starter::State::STARTING:
+    case aos::starter::State::RUNNING: {
+      LOG(INFO) << "Killing " << name_ << " pid: " << pid_;
+      status_ = aos::starter::State::STOPPING;
+
+      kill(pid_, SIGINT);
+
+      // Watchdog timer to SIGKILL application if it is still running 1 second
+      // after SIGINT
+      stop_timer_->Setup(event_loop_->monotonic_now() +
+                         std::chrono::seconds(1));
+      queue_restart_ = restart;
+      break;
+    }
+    case aos::starter::State::WAITING: {
+      // If waiting to restart, and receives restart, skip the waiting period
+      // and restart immediately. If stop received, all we have to do is move
+      // to the STOPPED state.
+      if (restart) {
+        DoStart();
+      } else {
+        status_ = aos::starter::State::STOPPED;
+      }
+      break;
+    }
+    case aos::starter::State::STOPPING: {
+      // If the application is already stopping, then we just need to update the
+      // restart flag to the most recent status.
+      queue_restart_ = restart;
+      break;
+    }
+    case aos::starter::State::STOPPED: {
+      // Restart immediately if the application is already stopped
+      if (restart) {
+        status_ = aos::starter::State::WAITING;
+        DoStart();
+      }
+      break;
+    }
+  }
+}
+
+void Application::QueueStart() {
+  status_ = aos::starter::State::WAITING;
+
+  LOG(INFO) << "Restarting " << name_ << " in 1 second";
+  restart_timer_->Setup(event_loop_->monotonic_now() + std::chrono::seconds(1));
+  start_timer_->Disable();
+  stop_timer_->Disable();
+}
+
+void Application::set_args(
+    const flatbuffers::Vector<flatbuffers::Offset<flatbuffers::String>> &v) {
+  args_.clear();
+  std::transform(v.begin(), v.end(), std::back_inserter(args_),
+                 [](const flatbuffers::String *str) {
+                   return const_cast<char *>(str->c_str());
+                 });
+  args_.push_back(nullptr);
+}
+
+std::optional<uid_t> Application::FindUid(const char *name) {
+  struct passwd *user_data = getpwnam(name);
+  if (user_data != nullptr) {
+    return user_data->pw_uid;
+  } else {
+    LOG(FATAL) << "Could not find user " << name;
+    return std::nullopt;
+  }
+}
+
+flatbuffers::Offset<aos::starter::ApplicationStatus>
+Application::PopulateStatus(flatbuffers::FlatBufferBuilder *builder) {
+  CHECK_NOTNULL(builder);
+  auto name_fbs = builder->CreateString(name_);
+
+  aos::starter::ApplicationStatus::Builder status_builder(*builder);
+  status_builder.add_name(name_fbs);
+  status_builder.add_state(status_);
+  status_builder.add_last_exit_code(exit_code_);
+  status_builder.add_last_stop_reason(stop_reason_);
+  if (pid_ != -1) {
+    status_builder.add_pid(pid_);
+    status_builder.add_id(id_);
+  }
+  status_builder.add_last_start_time(start_time_.time_since_epoch().count());
+  return status_builder.Finish();
+}
+
+void Application::Terminate() {
+  stop_reason_ = aos::starter::LastStopReason::TERMINATE;
+  DoStop(false);
+  terminating_ = true;
+}
+
+void Application::HandleCommand(aos::starter::Command cmd) {
+  switch (cmd) {
+    case aos::starter::Command::START: {
+      switch (status_) {
+        case aos::starter::State::WAITING: {
+          restart_timer_->Disable();
+          DoStart();
+          break;
+        }
+        case aos::starter::State::STARTING: {
+          break;
+        }
+        case aos::starter::State::RUNNING: {
+          break;
+        }
+        case aos::starter::State::STOPPING: {
+          queue_restart_ = true;
+          break;
+        }
+        case aos::starter::State::STOPPED: {
+          status_ = aos::starter::State::WAITING;
+          DoStart();
+          break;
+        }
+      }
+      break;
+    }
+    case aos::starter::Command::STOP: {
+      stop_reason_ = aos::starter::LastStopReason::STOP_REQUESTED;
+      DoStop(false);
+      break;
+    }
+    case aos::starter::Command::RESTART: {
+      stop_reason_ = aos::starter::LastStopReason::RESTART_REQUESTED;
+      DoStop(true);
+      break;
+    }
+  }
+}
+
+bool Application::MaybeHandleSignal() {
+  int status;
+
+  // Check if the status of this process has changed
+  if (pid_ == -1 || waitpid(pid_, &status, WNOHANG) != pid_) {
+    return false;
+  }
+
+  // Check that the event was the process exiting
+  if (!WIFEXITED(status) && !WIFSIGNALED(status)) {
+    return false;
+  }
+
+  exit_time_ = event_loop_->monotonic_now();
+  exit_code_ = WIFEXITED(status) ? WEXITSTATUS(status) : WTERMSIG(status);
+
+  if (auto read_result = read_pipe_.Read()) {
+    stop_reason_ = static_cast<aos::starter::LastStopReason>(*read_result);
+  }
+
+  switch (status_) {
+    case aos::starter::State::STARTING: {
+      LOG(WARNING) << "Failed to start " << name_ << " on pid " << pid_
+                   << " : Exited with status " << exit_code_;
+      QueueStart();
+      break;
+    }
+    case aos::starter::State::RUNNING: {
+      QueueStart();
+      break;
+    }
+    case aos::starter::State::STOPPING: {
+      LOG(INFO) << "Successfully stopped " << name_;
+      status_ = aos::starter::State::STOPPED;
+
+      // Disable force stop timer since the process already died
+      stop_timer_->Disable();
+
+      if (terminating_) {
+        return true;
+      }
+
+      if (queue_restart_) {
+        queue_restart_ = false;
+        status_ = aos::starter::State::WAITING;
+        DoStart();
+      }
+      break;
+    }
+    case aos::starter::State::WAITING:
+    case aos::starter::State::STOPPED: {
+      LOG(FATAL)
+          << "Received signal on process that was already stopped : name: "
+          << name_ << " pid: " << pid_;
+      break;
+    }
+  }
+
+  return false;
+}
+
+ScopedPipe::ScopedPipe(int fd) : fd_(fd) {}
+
+ScopedPipe::~ScopedPipe() {
+  if (fd_ != -1) {
+    PCHECK(close(fd_) != -1);
+  }
+}
+
+ScopedPipe::ScopedPipe(ScopedPipe &&scoped_pipe) : fd_(scoped_pipe.fd_) {
+  scoped_pipe.fd_ = -1;
+}
+
+ScopedPipe &ScopedPipe::operator=(ScopedPipe &&scoped_pipe) {
+  if (fd_ != -1) {
+    PCHECK(close(fd_) != -1);
+  }
+  fd_ = scoped_pipe.fd_;
+  scoped_pipe.fd_ = -1;
+  return *this;
+}
+
+std::tuple<ScopedPipe::ScopedReadPipe, ScopedPipe::ScopedWritePipe>
+ScopedPipe::MakePipe() {
+  int fds[2];
+  PCHECK(pipe(fds) != -1);
+  PCHECK(fcntl(fds[0], F_SETFL, fcntl(fds[0], F_GETFL) | O_NONBLOCK) != -1);
+  PCHECK(fcntl(fds[1], F_SETFL, fcntl(fds[1], F_GETFL) | O_NONBLOCK) != -1);
+  return {ScopedReadPipe(fds[0]), ScopedWritePipe(fds[1])};
+}
+
+std::optional<uint32_t> ScopedPipe::ScopedReadPipe::Read() {
+  uint32_t buf;
+  ssize_t result = read(fd(), &buf, sizeof(buf));
+  if (result == sizeof(buf)) {
+    return buf;
+  } else {
+    return std::nullopt;
+  }
+}
+
+void ScopedPipe::ScopedWritePipe::Write(uint32_t data) {
+  ssize_t result = write(fd(), &data, sizeof(data));
+  PCHECK(result != -1);
+  CHECK(result == sizeof(data));
+}
+
+SignalListener::SignalListener(aos::ShmEventLoop *loop,
+                               std::function<void(signalfd_siginfo)> callback)
+    : loop_(loop),
+      callback_(std::move(callback)),
+      signalfd_({SIGHUP, SIGINT, SIGQUIT, SIGABRT, SIGFPE, SIGSEGV, SIGPIPE,
+                 SIGTERM, SIGBUS, SIGXCPU, SIGCHLD}) {
+  loop->epoll()->OnReadable(signalfd_.fd(), [this] {
+    signalfd_siginfo info = signalfd_.Read();
+
+    if (info.ssi_signo == 0) {
+      LOG(WARNING) << "Could not read " << sizeof(signalfd_siginfo) << " bytes";
+      return;
+    }
+
+    callback_(info);
+  });
+}
+
+SignalListener::~SignalListener() { loop_->epoll()->DeleteFd(signalfd_.fd()); }
+
+Starter::Starter(const aos::Configuration *event_loop_config)
+    : config_msg_(event_loop_config),
+      event_loop_(event_loop_config),
+      status_sender_(event_loop_.MakeSender<aos::starter::Status>("/aos")),
+      status_timer_(event_loop_.AddTimer([this] { SendStatus(); })),
+      cleanup_timer_(event_loop_.AddTimer([this] { event_loop_.Exit(); })),
+      listener_(&event_loop_,
+                [this](signalfd_siginfo signal) { OnSignal(signal); }) {
+  event_loop_.SkipTimingReport();
+  event_loop_.SkipAosLog();
+
+  event_loop_.OnRun([this] {
+    status_timer_->Setup(event_loop_.monotonic_now(),
+                         std::chrono::milliseconds(500));
+  });
+
+  event_loop_.MakeWatcher("/aos", [this](const aos::starter::StarterRpc &cmd) {
+    if (!cmd.has_command() || !cmd.has_name() || exiting_) {
+      return;
+    }
+    LOG(INFO) << "Received command "
+              << aos::starter::EnumNameCommand(cmd.command()) << ' '
+              << cmd.name()->string_view();
+
+    auto search = applications_.find(cmd.name()->str());
+    if (search != applications_.end()) {
+      // If an applicatione exists by the given name, dispatch the command
+      search->second.HandleCommand(cmd.command());
+    }
+  });
+
+  if (config_msg_->has_applications()) {
+    const flatbuffers::Vector<flatbuffers::Offset<aos::Application>>
+        *applications = config_msg_->applications();
+    for (const aos::Application *application : *applications) {
+      AddApplication(application);
+    }
+  }
+}
+
+void Starter::Cleanup() {
+  if (exiting_) {
+    return;
+  }
+  exiting_ = true;
+  for (auto &application : applications_) {
+    application.second.Terminate();
+  }
+  cleanup_timer_->Setup(event_loop_.monotonic_now() +
+                        std::chrono::milliseconds(1500));
+}
+
+void Starter::OnSignal(signalfd_siginfo info) {
+  LOG(INFO) << "Received signal " << strsignal(info.ssi_signo);
+
+  if (info.ssi_signo == SIGCHLD) {
+    // SIGCHLD messages can be collapsed if multiple are received, so all
+    // applications must check their status.
+    for (auto iter = applications_.begin(); iter != applications_.end();) {
+      if (iter->second.MaybeHandleSignal()) {
+        iter = applications_.erase(iter);
+      } else {
+        ++iter;
+      }
+    }
+
+    if (exiting_ && applications_.empty()) {
+      event_loop_.Exit();
+    }
+  } else if (std::find(kStarterDeath.begin(), kStarterDeath.end(),
+                       info.ssi_signo) != kStarterDeath.end()) {
+    LOG(WARNING) << "Starter shutting down";
+    Cleanup();
+  }
+}
+
+Application *Starter::AddApplication(const aos::Application *application) {
+  auto [iter, success] = applications_.try_emplace(application->name()->str(),
+                                                   application, &event_loop_);
+  if (success) {
+    if (application->has_args()) {
+      iter->second.set_args(*application->args());
+    }
+    return &(iter->second);
+  }
+  return nullptr;
+}
+
+void Starter::Run() {
+  for (auto &application : applications_) {
+    application.second.Start();
+  }
+
+  event_loop_.Run();
+}
+
+void Starter::SendStatus() {
+  aos::Sender<aos::starter::Status>::Builder builder =
+      status_sender_.MakeBuilder();
+
+  std::vector<flatbuffers::Offset<aos::starter::ApplicationStatus>> statuses;
+
+  for (auto &application : applications_) {
+    statuses.push_back(application.second.PopulateStatus(builder.fbb()));
+  }
+
+  auto statuses_fbs = builder.fbb()->CreateVector(statuses);
+
+  aos::starter::Status::Builder status_builder(*builder.fbb());
+  status_builder.add_statuses(statuses_fbs);
+  CHECK(builder.Send(status_builder.Finish()));
+}
+
+}  // namespace starter
+}  // namespace aos
diff --git a/aos/starter/starterd_lib.h b/aos/starter/starterd_lib.h
new file mode 100644
index 0000000..36109a9
--- /dev/null
+++ b/aos/starter/starterd_lib.h
@@ -0,0 +1,201 @@
+#ifndef AOS_STARTER_STARTERD_LIB_H_
+#define AOS_STARTER_STARTERD_LIB_H_
+
+#include <signal.h>
+#include <stdio.h>
+#include <sys/signalfd.h>
+#include <sys/wait.h>
+
+#include <string>
+#include <unordered_map>
+#include <vector>
+
+#include "aos/configuration.h"
+#include "aos/events/shm_event_loop.h"
+#include "aos/ipc_lib/signalfd.h"
+#include "aos/macros.h"
+#include "aos/starter/starter_generated.h"
+#include "aos/starter/starter_rpc_generated.h"
+
+namespace aos {
+namespace starter {
+
+// RAII Pipe for sending individual ints between reader and writer.
+class ScopedPipe {
+ public:
+  class ScopedReadPipe;
+  class ScopedWritePipe;
+
+  static std::tuple<ScopedReadPipe, ScopedWritePipe> MakePipe();
+
+  virtual ~ScopedPipe();
+
+  int fd() const { return fd_; }
+
+ private:
+  ScopedPipe(int fd = -1);
+
+  int fd_;
+
+  ScopedPipe(const ScopedPipe &) = delete;
+  ScopedPipe &operator=(const ScopedPipe &) = delete;
+  ScopedPipe(ScopedPipe &&);
+  ScopedPipe &operator=(ScopedPipe &&);
+};
+
+class ScopedPipe::ScopedReadPipe : public ScopedPipe {
+ public:
+  std::optional<uint32_t> Read();
+
+ private:
+  using ScopedPipe::ScopedPipe;
+
+  friend class ScopedPipe;
+};
+
+class ScopedPipe::ScopedWritePipe : public ScopedPipe {
+ public:
+  void Write(uint32_t data);
+
+ private:
+  using ScopedPipe::ScopedPipe;
+
+  friend class ScopedPipe;
+};
+
+// Manages a running process, allowing starting and stopping, and restarting
+// automatically.
+class Application {
+ public:
+  Application(const aos::Application *application,
+              aos::ShmEventLoop *event_loop);
+
+  flatbuffers::Offset<aos::starter::ApplicationStatus> PopulateStatus(
+      flatbuffers::FlatBufferBuilder *builder);
+
+  // Returns the last pid of this process. -1 if not started yet.
+  pid_t get_pid() const { return pid_; }
+
+  // Handles a SIGCHLD signal received by the parent. Does nothing if this
+  // process was not the target. Returns true if this Application should be
+  // removed.
+  bool MaybeHandleSignal();
+
+  // Handles a command. May do nothing if application is already in the desired
+  // state.
+  void HandleCommand(aos::starter::Command cmd);
+
+  void Start() { HandleCommand(aos::starter::Command::START); }
+
+  void Stop() { HandleCommand(aos::starter::Command::STOP); }
+
+  void Terminate();
+
+  void set_args(
+      const flatbuffers::Vector<flatbuffers::Offset<flatbuffers::String>>
+          &args);
+
+ private:
+  void DoStart();
+
+  void DoStop(bool restart);
+
+  void QueueStart();
+
+  // Copy flatbuffer vector of strings to vector of std::string.
+  static std::vector<std::string> FbsVectorToVector(
+      const flatbuffers::Vector<flatbuffers::Offset<flatbuffers::String>> &v);
+
+  static std::optional<uid_t> FindUid(const char *name);
+
+  // Next unique id for all applications
+  static inline uint64_t next_id_ = 0;
+
+  std::string name_;
+  std::string path_;
+  std::vector<char *> args_;
+  std::optional<uid_t> user_;
+
+  pid_t pid_ = -1;
+  ScopedPipe::ScopedReadPipe read_pipe_;
+  ScopedPipe::ScopedWritePipe write_pipe_;
+  uint64_t id_;
+  int exit_code_ = 0;
+  aos::monotonic_clock::time_point start_time_, exit_time_;
+  bool queue_restart_ = false;
+  bool terminating_ = false;
+
+  aos::starter::State status_ = aos::starter::State::STOPPED;
+  aos::starter::LastStopReason stop_reason_ =
+      aos::starter::LastStopReason::STOP_REQUESTED;
+
+  aos::ShmEventLoop *event_loop_;
+  aos::TimerHandler *start_timer_, *restart_timer_, *stop_timer_;
+
+  DISALLOW_COPY_AND_ASSIGN(Application);
+};
+
+// Registers a signalfd listener with the given event loop and calls callback
+// whenever a signal is received.
+class SignalListener {
+ public:
+  SignalListener(aos::ShmEventLoop *loop,
+                 std::function<void(signalfd_siginfo)> callback);
+
+  ~SignalListener();
+
+ private:
+  aos::ShmEventLoop *loop_;
+  std::function<void(signalfd_siginfo)> callback_;
+  aos::ipc_lib::SignalFd signalfd_;
+
+  DISALLOW_COPY_AND_ASSIGN(SignalListener);
+};
+
+class Starter {
+ public:
+  Starter(const aos::Configuration *event_loop_config);
+
+  // Inserts a new application from config. Returns the inserted application if
+  // it was successful, otherwise nullptr if an application already exists
+  // with the given name.
+  Application *AddApplication(const aos::Application *application);
+
+  // Runs the event loop and starts all applications
+  void Run();
+
+  void Cleanup();
+
+ private:
+  // Signals which indicate starter has died
+  static const inline std::vector<int> kStarterDeath = {
+      SIGHUP,  SIGINT,  SIGQUIT, SIGILL, SIGABRT, SIGFPE,
+      SIGSEGV, SIGPIPE, SIGTERM, SIGBUS, SIGXCPU};
+
+  void OnSignal(signalfd_siginfo signal);
+
+  void SendStatus();
+
+  const std::string config_path_;
+  const aos::Configuration *config_msg_;
+
+  aos::ShmEventLoop event_loop_;
+  aos::Sender<aos::starter::Status> status_sender_;
+  aos::TimerHandler *status_timer_;
+  aos::TimerHandler *cleanup_timer_;
+
+  std::unordered_map<std::string, Application> applications_;
+
+  // Set to true on cleanup to block rpc commands and ensure cleanup only
+  // happens once.
+  bool exiting_ = false;
+
+  SignalListener listener_;
+
+  DISALLOW_COPY_AND_ASSIGN(Starter);
+};
+
+}  // namespace starter
+}  // namespace aos
+
+#endif  // AOS_STARTER_STARTERD_LIB_H_