Add tests for individual node loggers to config_validators

This makes it so that you can easily test that each individual node's
logs can be used to replay all of the state on that log. While we
already theoretically had logic to exercise this in the config
validator, said logic was both incorrectly written and not actually
being used by any of the config validators.

Simultaneously: While upstreaming this, start reducing the number of
explicit @repo_name that we have written out in the repository.

Change-Id: I0c7cb634b3d7bbc63503c607a5f8d91bc184d416
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/util/config_validator_config.fbs b/aos/util/config_validator_config.fbs
index cda84b2..2a64670 100644
--- a/aos/util/config_validator_config.fbs
+++ b/aos/util/config_validator_config.fbs
@@ -46,6 +46,10 @@
   // should at least include an entry that says that "logs from all nodes should
   // combine to allow you to replay all nodes."
   logger_sets:[LoggerNodeSetValidation] (id: 1);
+  // This provides a shortcut that implicitly adds
+  // {"loggers": ["node_name"], "replay_nodes": ["node_name"]}
+  // to logger_sets for every node_name in the set of nodes for the config.
+  validate_individual_node_loggers:bool = false (id: 2);
 }
 
 table ConfigValidatorConfig {
diff --git a/aos/util/config_validator_lib.cc b/aos/util/config_validator_lib.cc
index e388bb7..ad85027 100644
--- a/aos/util/config_validator_lib.cc
+++ b/aos/util/config_validator_lib.cc
@@ -8,6 +8,7 @@
 #include "aos/network/remote_message_generated.h"
 #include "aos/network/timestamp_channel.h"
 #include "aos/testing/tmpdir.h"
+#include "aos/util/config_validator_config_static.h"
 #include "aos/util/simulation_logger.h"
 
 DECLARE_bool(validate_timestamp_logger_nodes);
@@ -42,11 +43,31 @@
 }  // namespace
 
 void ConfigIsValid(const aos::Configuration *config,
-                   const ConfigValidatorConfig *validation_config) {
+                   const ConfigValidatorConfig *validation_config_raw) {
   ASSERT_TRUE(config->has_channels())
       << "An AOS config must have channels. If you have a valid use-case for "
          "channels with no channels, please write a design proposal.";
 
+  aos::fbs::Builder<ConfigValidatorConfigStatic> validation_config;
+  CHECK(validation_config->FromFlatbuffer(validation_config_raw));
+
+  if (validation_config_raw->has_logging() &&
+      validation_config_raw->logging()->validate_individual_node_loggers() &&
+      configuration::MultiNode(config)) {
+    if (!validation_config->logging()->has_logger_sets()) {
+      validation_config->mutable_logging()->add_logger_sets();
+    }
+    auto logger_sets =
+        validation_config->mutable_logging()->mutable_logger_sets();
+    for (const aos::Node *node : configuration::GetNodes(config)) {
+      CHECK(logger_sets->reserve(logger_sets->size() + 1));
+      auto logger_set = logger_sets->emplace_back();
+      CHECK(logger_set->add_loggers()->FromFlatbuffer({node->name()->str()}));
+      CHECK(logger_set->add_replay_nodes()->FromFlatbuffer(
+          {node->name()->str()}));
+    }
+  }
+
   // First, we do some sanity checks--these are likely to indicate a malformed
   // config, and so catching them early with a clear error message is likely to
   // help.
@@ -72,7 +93,7 @@
 
     const bool check_for_not_logged_channels =
         !validation_config->has_logging() ||
-        validation_config->logging()->all_channels_logged();
+        validation_config->AsFlatbuffer().logging()->all_channels_logged();
     const bool channel_is_not_logged =
         channel->logger() == aos::LoggerConfig::NOT_LOGGED;
     if (check_for_not_logged_channels) {
@@ -184,15 +205,15 @@
     }
     // Send timing report when we are sending data.
     const bool do_skip_timing_report = !send_data_on_channels;
-    for (const LoggerNodeSetValidation *logger_set :
+    for (const LoggerNodeSetValidationStatic &logger_set :
          *validation_config->logging()->logger_sets()) {
-      SCOPED_TRACE(aos::FlatbufferToJson(logger_set));
+      SCOPED_TRACE(aos::FlatbufferToJson(&logger_set.AsFlatbuffer()));
       aos::SimulatedEventLoopFactory factory(config);
       std::vector<std::unique_ptr<LoggerState>> loggers;
-      if (logger_set->has_loggers() && logger_set->loggers()->size() > 0) {
+      if (logger_set.has_loggers() && logger_set.loggers()->size() > 0) {
         std::vector<std::string> logger_nodes;
-        for (const auto &node : *logger_set->loggers()) {
-          logger_nodes.push_back(node->str());
+        for (const auto &node : *logger_set.loggers()) {
+          logger_nodes.push_back(node.str());
         }
         loggers = MakeLoggersForNodes(&factory, logger_nodes, log_path,
                                       do_skip_timing_report);
@@ -247,47 +268,47 @@
       // Find every channel we deliberately sent data on, and if it is for a
       // node that we care about, confirm that we get it during replay.
       std::vector<std::unique_ptr<EventLoop>> replay_loops;
-      std::vector<std::unique_ptr<RawFetcher>> fetchers;
       for (const aos::Node *node :
            configuration::GetNodes(replay_factory.configuration())) {
         // If the user doesn't care about this node, don't check it.
-        if (!NodeInList(logger_set->replay_nodes(), node)) {
+        if (!NodeInList(logger_set.has_replay_nodes()
+                            ? logger_set.replay_nodes()->AsFlatbufferVector()
+                            : nullptr,
+                        node)) {
           continue;
         }
         replay_loops.emplace_back(replay_factory.MakeEventLoop("", node));
-        for (const auto &sender : test_senders[node->name()->str()]) {
-          const aos::Channel *channel = configuration::GetChannel(
-              replay_factory.configuration(), sender->channel(), "", node);
-          fetchers.emplace_back(replay_loops.back()->MakeRawFetcher(channel));
-        }
       }
 
       std::vector<std::pair<const aos::Node *, std::unique_ptr<RawFetcher>>>
-          remote_fetchers;
-      for (const auto &fetcher : fetchers) {
-        for (auto &loop : replay_loops) {
-          const Connection *connection =
-              configuration::ConnectionToNode(fetcher->channel(), loop->node());
-          if (connection != nullptr) {
-            remote_fetchers.push_back(std::make_pair(
-                loop->node(), loop->MakeRawFetcher(fetcher->channel())));
+          fetchers;
+      for (const auto &node_senders : test_senders) {
+        for (const auto &sender : node_senders.second) {
+          for (auto &loop : replay_loops) {
+            if (configuration::ChannelIsReadableOnNode(sender->channel(),
+                                                       loop->node())) {
+              fetchers.push_back(std::make_pair(
+                  loop->node(),
+                  loop->MakeRawFetcher(configuration::GetChannel(
+                      replay_factory.configuration(), sender->channel(),
+                      loop->name(), loop->node()))));
+            }
           }
         }
       }
 
       replay_factory.Run();
 
-      for (auto &fetcher : fetchers) {
-        EXPECT_TRUE(fetcher->Fetch())
-            << "Failed to log or replay any data on "
-            << configuration::StrippedChannelToString(fetcher->channel());
-      }
-
-      for (auto &pair : remote_fetchers) {
+      for (auto &pair : fetchers) {
         EXPECT_TRUE(pair.second->Fetch())
             << "Failed to log or replay any data on "
             << configuration::StrippedChannelToString(pair.second->channel())
-            << " from remote node " << logger::MaybeNodeName(pair.first) << ".";
+            << " reading from " << logger::MaybeNodeName(pair.first)
+            << " with source node "
+            << (pair.second->channel()->has_source_node()
+                    ? pair.second->channel()->source_node()->string_view()
+                    : "")
+            << ".";
       }
 
       reader.Deregister();
diff --git a/aos/util/config_validator_lib.h b/aos/util/config_validator_lib.h
index 30a956a..aa855e5 100644
--- a/aos/util/config_validator_lib.h
+++ b/aos/util/config_validator_lib.h
@@ -8,6 +8,6 @@
 namespace aos::util {
 
 void ConfigIsValid(const aos::Configuration *config,
-                   const ConfigValidatorConfig *validation_config);
+                   const ConfigValidatorConfig *validation_config_raw);
 }  // namespace aos::util
 #endif  // AOS_UTIL_CONFIG_VALIDATOR_H_
diff --git a/aos/util/config_validator_lib_test.cc b/aos/util/config_validator_lib_test.cc
index ee80ac4..55b723a 100644
--- a/aos/util/config_validator_lib_test.cc
+++ b/aos/util/config_validator_lib_test.cc
@@ -184,7 +184,7 @@
       ]}})json");
         ConfigIsValid(&config.message(), &validator_config.message());
       },
-      R"json(Failed to log or replay any data on { "name": "/test", "type": "aos.examples.Ping" } from remote node pi2)json");
+      R"json(Failed to log or replay any data on { "name": "/test", "type": "aos.examples.Ping" } reading from pi2 with source node pi1)json");
 }
 
 }  // namespace aos::util::testing
diff --git a/aos/util/config_validator_macro.bzl b/aos/util/config_validator_macro.bzl
index cd05bab..989ece9 100644
--- a/aos/util/config_validator_macro.bzl
+++ b/aos/util/config_validator_macro.bzl
@@ -1,4 +1,6 @@
-def config_validator_test(name, config, logger_sets = [{}], check_for_not_logged_channels = False, extension = ".bfbs", visibility = None):
+load("//tools/build_rules:clean_dep.bzl", "clean_dep")
+
+def config_validator_test(name, config, logger_sets = [{}], check_for_not_logged_channels = False, validate_individual_node_loggers = False, extension = ".bfbs", visibility = None):
     '''
     Macro to take a config and pass it to the config validator to validate that it will work on a real system.
 
@@ -7,10 +9,10 @@
         config: config rule that needs to be validated, e.g. "//aos/events:pingpong_config",
     '''
     config_file = config + extension
-    config_json = json.encode({"logging": {"all_channels_logged": check_for_not_logged_channels, "logger_sets": logger_sets}})
+    config_json = json.encode({"logging": {"all_channels_logged": check_for_not_logged_channels, "validate_individual_node_loggers": validate_individual_node_loggers, "logger_sets": logger_sets}})
     native.cc_test(
         name = name,
-        deps = ["//aos/util:config_validator"],
+        deps = [clean_dep("//aos/util:config_validator")],
         args = ["--config=$(location %s)" % config_file, "--validation_config='%s'" % config_json],
         data = [config_file],
         visibility = visibility,
diff --git a/aos/util/test_data/valid_multinode_config_source.json b/aos/util/test_data/valid_multinode_config_source.json
index 0c35251..5849a26 100644
--- a/aos/util/test_data/valid_multinode_config_source.json
+++ b/aos/util/test_data/valid_multinode_config_source.json
@@ -21,6 +21,10 @@
       "type": "aos.message_bridge.Timestamp",
       "source_node": "pi1",
       "frequency": 15,
+      "logger": "LOCAL_AND_REMOTE_LOGGER",
+      "logger_nodes": [
+        "pi2"
+      ],
       "max_size": 200,
       "destination_nodes": [
         {
@@ -37,6 +41,10 @@
       "type": "aos.message_bridge.Timestamp",
       "source_node": "pi2",
       "frequency": 15,
+      "logger": "LOCAL_AND_REMOTE_LOGGER",
+      "logger_nodes": [
+        "pi1"
+      ],
       "max_size": 200,
       "destination_nodes": [
         {
@@ -109,14 +117,28 @@
       "max_size": 2048
     },
     {
+      "name": "/pi1/test/local",
+      "type": "aos.examples.Ping",
+      "source_node": "pi1"
+    },
+    {
+      "name": "/pi2/test/local",
+      "type": "aos.examples.Ping",
+      "source_node": "pi2"
+    },
+    {
       "name": "/test",
       "type": "aos.examples.Ping",
       "source_node": "pi1",
+      "logger": "LOCAL_AND_REMOTE_LOGGER",
+      "logger_nodes": [
+        "pi2"
+      ],
       "destination_nodes": [
         {
           "name": "pi2",
           "priority": 1,
-          "timestamp_logger": "REMOTE_LOGGER",
+          "timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
           "timestamp_logger_nodes": ["pi1"]
         }
       ],