Make starter client & server work across nodes

So that we can readily start on one node from another node, add in logic
to make it so that starterd will listen to StarterRpc calls from any
available other nodes. Add a StarterClient class that then supports
interacting across nodes.

Also updates the aos_starter command line utility with --node and
--all_nodes flags to support this.

Change-Id: I234b08b3a3a836966f117f55b9e6a321d2b2ff6c
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/starter/starter_cmd.cc b/aos/starter/starter_cmd.cc
index d306306..ae65ff0 100644
--- a/aos/starter/starter_cmd.cc
+++ b/aos/starter/starter_cmd.cc
@@ -6,6 +6,7 @@
 #include <unordered_map>
 
 #include "absl/strings/str_format.h"
+#include "absl/strings/str_join.h"
 #include "aos/init.h"
 #include "aos/json_to_flatbuffer.h"
 #include "aos/time/time.h"
@@ -13,6 +14,11 @@
 #include "starter_rpc_lib.h"
 
 DEFINE_string(config, "./config.json", "File path of aos configuration");
+// TODO(james): Bash autocompletion for node names.
+DEFINE_string(
+    node, "",
+    "Node to interact with. If empty, just interact with local node.");
+DEFINE_bool(all_nodes, false, "Interact with all nodes.");
 
 DEFINE_bool(_bash_autocomplete, false,
             "Internal use: Outputs commands or applications for use with "
@@ -29,47 +35,71 @@
                         {"stop", aos::starter::Command::STOP},
                         {"restart", aos::starter::Command::RESTART}};
 
-const aos::Node *MaybeMyNode(const aos::Configuration *configuration) {
+std::vector<const aos::Node *> InteractNodes(
+    const aos::Configuration *configuration) {
   if (!configuration->has_nodes()) {
-    return nullptr;
+    return {nullptr};
   }
 
-  return aos::configuration::GetMyNode(configuration);
+  if (!FLAGS_node.empty()) {
+    CHECK(!FLAGS_all_nodes) << "Can't specify both --node and --all_nodes.";
+    return {aos::configuration::GetNode(configuration, FLAGS_node)};
+  }
+
+  if (FLAGS_all_nodes) {
+    return aos::configuration::GetNodes(configuration);
+  }
+
+  return {aos::configuration::GetMyNode(configuration)};
 }
 
-bool ValidApplication(const aos::Configuration *config,
-                      std::string_view application_name) {
-  const aos::Node *node = MaybeMyNode(config);
-  const aos::Application *application =
-      aos::configuration::GetApplication(config, node, application_name);
-  if (application == nullptr) {
-    if (node) {
-      std::cout << "Unknown application '" << application_name << "' on node '"
-                << node->name()->string_view() << "'" << std::endl;
-    } else {
-      std::cout << "Unknown application '" << application_name << "'"
-                << std::endl;
+std::vector<const aos::Node *> InteractNodesForApplication(
+    const aos::Configuration *config, std::string_view application_name) {
+  const std::vector<const aos::Node *> interact_nodes = InteractNodes(config);
+  std::vector<const aos::Node *> application_nodes;
+  std::vector<std::string> debug_node_names;
+  for (const aos::Node *node : interact_nodes) {
+    if (aos::configuration::GetApplication(config, node, application_name) !=
+        nullptr) {
+      application_nodes.push_back(node);
     }
-    return false;
+    if (node != nullptr) {
+      debug_node_names.push_back(node->name()->str());
+    }
   }
-  return true;
+
+  if (application_nodes.empty()) {
+    if (interact_nodes.size() == 1 && interact_nodes[0] == nullptr) {
+      std::cout << "Unknown application " << application_name << std::endl;
+    } else {
+      std::cout << "Unknown application " << application_name
+                << " on any of node(s) "
+                << absl::StrJoin(debug_node_names, ", ") << std::endl;
+    }
+  }
+  return application_nodes;
 }
 
 void PrintKey() {
-  absl::PrintF("%-30s %-8s %-6s %-9s\n", "Name", "State", "PID", "Uptime");
+  absl::PrintF("%-30s %-10s %-8s %-6s %-9s\n", "Name", "Node", "State", "PID",
+               "Uptime");
 }
 
 void PrintApplicationStatus(const aos::starter::ApplicationStatus *app_status,
-                            const aos::monotonic_clock::time_point &time) {
+                            const aos::monotonic_clock::time_point &time,
+                            const aos::Node *node) {
   const auto last_start_time = aos::monotonic_clock::time_point(
       chrono::nanoseconds(app_status->last_start_time()));
   const auto time_running =
       chrono::duration_cast<chrono::seconds>(time - last_start_time);
   if (app_status->state() == aos::starter::State::STOPPED) {
-    absl::PrintF("%-30s %-8s\n", app_status->name()->string_view(),
+    absl::PrintF("%-30s %-10s %-8s\n", app_status->name()->string_view(),
+                 (node == nullptr) ? "none" : node->name()->string_view(),
                  aos::starter::EnumNameState(app_status->state()));
   } else {
-    absl::PrintF("%-30s %-8s %-6d %-9s\n", app_status->name()->string_view(),
+    absl::PrintF("%-30s %-10s %-8s %-6d %-9s\n",
+                 app_status->name()->string_view(),
+                 (node == nullptr) ? "none" : node->name()->string_view(),
                  aos::starter::EnumNameState(app_status->state()),
                  app_status->pid(), std::to_string(time_running.count()) + 's');
   }
@@ -77,18 +107,30 @@
 
 // Prints the status for all applications.
 void GetAllStarterStatus(const aos::Configuration *config) {
-  // Print status for all processes.
-  const auto optional_status = aos::starter::GetStarterStatus(config);
-  if (optional_status) {
-    auto status = *optional_status;
-    const auto time = aos::monotonic_clock::now();
-    PrintKey();
-    for (const aos::starter::ApplicationStatus *app_status :
-         *status.message().statuses()) {
-      PrintApplicationStatus(app_status, time);
+  PrintKey();
+  std::vector<const aos::Node *> missing_nodes;
+  for (const aos::Node *node : InteractNodes(config)) {
+    // Print status for all processes.
+    const auto optional_status = aos::starter::GetStarterStatus(config, node);
+    if (optional_status) {
+      const aos::FlatbufferVector<aos::starter::Status> &status =
+          optional_status->second;
+      const aos::monotonic_clock::time_point time = optional_status->first;
+      for (const aos::starter::ApplicationStatus *app_status :
+           *status.message().statuses()) {
+        PrintApplicationStatus(app_status, time, node);
+      }
+    } else {
+      missing_nodes.push_back(node);
     }
-  } else {
-    LOG(WARNING) << "No status found";
+  }
+  for (const aos::Node *node : missing_nodes) {
+    if (node == nullptr) {
+      LOG(WARNING) << "No status found.";
+    } else {
+      LOG(WARNING) << "No status found for node "
+                   << node->name()->string_view();
+    }
   }
 }
 
@@ -106,12 +148,17 @@
       return false;
     }
 
-    if (!ValidApplication(config, application_name)) {
+    const std::vector<const aos::Node *> application_nodes =
+        InteractNodesForApplication(config, application_name);
+    if (application_nodes.empty()) {
       return false;
     }
-    auto status = aos::starter::GetStatus(application_name, config);
     PrintKey();
-    PrintApplicationStatus(&status.message(), aos::monotonic_clock::now());
+    for (const aos::Node *node : application_nodes) {
+      auto status = aos::starter::GetStatus(application_name, config, node);
+      PrintApplicationStatus(&status.message(), aos::monotonic_clock::now(),
+                             node);
+    }
   } else {
     LOG(ERROR) << "The \"status\" command requires zero or one arguments.";
     return true;
@@ -125,34 +172,69 @@
                      const aos::starter::Command command,
                      std::string_view success_text,
                      std::string_view failure_text) {
-  const auto optional_status = aos::starter::GetStarterStatus(config);
-  if (optional_status) {
-    auto status = *optional_status;
-    const aos::Node *my_node = MaybeMyNode(config);
-    std::vector<std::pair<aos::starter::Command, std::string_view>> commands;
+  std::map<const aos::Node *,
+           std::unique_ptr<aos::FlatbufferVector<aos::starter::Status>>>
+      statuses;
+
+  for (const aos::Node *node : InteractNodes(config)) {
+    std::optional<std::pair<aos::monotonic_clock::time_point,
+                            const aos::FlatbufferVector<aos::starter::Status>>>
+        optional_status = aos::starter::GetStarterStatus(config, node);
+    if (optional_status.has_value()) {
+      statuses[node] =
+          std::make_unique<aos::FlatbufferVector<aos::starter::Status>>(
+              optional_status.value().second);
+    } else {
+      if (node == nullptr) {
+        LOG(WARNING) << "Starter not running";
+      } else {
+        LOG(WARNING) << "Starter not running on node "
+                     << node->name()->string_view();
+      }
+    }
+  }
+
+  if (!statuses.empty()) {
+    std::vector<aos::starter::ApplicationCommand> commands;
 
     for (const aos::Application *application : *config->applications()) {
-      // Ignore any applications which aren't supposed to be started on this
-      // node.
-      if (!aos::configuration::ApplicationShouldStart(config, my_node,
-                                                      application)) {
+      const std::string_view application_name =
+          application->name()->string_view();
+      const std::vector<const aos::Node *> application_nodes =
+          InteractNodesForApplication(config, application_name);
+      // Ignore any applications which aren't supposed to be started.
+      if (application_nodes.empty()) {
         continue;
       }
 
-      const std::string_view application_name =
-          application->name()->string_view();
-      if (!application->autostart()) {
-        const aos::starter::ApplicationStatus *application_status =
-            aos::starter::FindApplicationStatus(status.message(),
-                                                application_name);
-        if (application_status->state() == aos::starter::State::STOPPED) {
-          std::cout << "Skipping " << application_name
-                    << " because it is STOPPED\n";
-          continue;
+      std::vector<const aos::Node *> running_nodes;
+      if (application->autostart()) {
+        running_nodes = application_nodes;
+      } else {
+        for (const aos::Node *node : application_nodes) {
+          const aos::starter::ApplicationStatus *application_status =
+              aos::starter::FindApplicationStatus(statuses[node]->message(),
+                                                  application_name);
+          if (application_status->state() == aos::starter::State::STOPPED) {
+            if (node == nullptr) {
+              std::cout << "Skipping " << application_name
+                        << " because it is STOPPED\n";
+            } else {
+              std::cout << "Skipping " << application_name << " on "
+                        << node->name()->string_view()
+                        << " because it is STOPPED\n";
+            }
+            continue;
+          } else {
+            running_nodes.push_back(node);
+          }
         }
       }
 
-      commands.emplace_back(command, application_name);
+      if (!running_nodes.empty()) {
+        commands.emplace_back(aos::starter::ApplicationCommand{
+            command, application_name, running_nodes});
+      }
     }
 
     // Restart each running process
@@ -163,7 +245,7 @@
       std::cout << failure_text << "all \n";
     }
   } else {
-    LOG(WARNING) << "Starter not running";
+    LOG(WARNING) << "None of the starters we care about are running.";
   }
 }
 
@@ -206,12 +288,16 @@
     InteractWithAll(config, command, success_text, failure_text);
     return false;
   }
-  if (!ValidApplication(config, application_name)) {
+
+  const std::vector<const aos::Node *> application_nodes =
+      InteractNodesForApplication(config, application_name);
+  if (application_nodes.empty()) {
     return false;
   }
 
   if (aos::starter::SendCommandBlocking(command, application_name, config,
-                                        chrono::seconds(5))) {
+                                        chrono::seconds(5),
+                                        application_nodes)) {
     std::cout << success_text << application_name << '\n';
   } else {
     std::cout << failure_text << application_name << '\n';