#include "aos/configuration.h"

#include <arpa/inet.h>
#include <ifaddrs.h>
#include <netinet/in.h>
#include <sys/types.h>
#include <unistd.h>

#include <cstdlib>
#include <cstring>
#include <map>
#include <set>
#include <string>
#include <string_view>
#include <vector>

#include "absl/container/btree_set.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/str_join.h"
#include "absl/strings/str_split.h"
#include "gflags/gflags.h"
#include "glog/logging.h"

#include "aos/configuration_generated.h"
#include "aos/flatbuffer_merge.h"
#include "aos/ipc_lib/index.h"
#include "aos/json_to_flatbuffer.h"
#include "aos/network/team_number.h"
#include "aos/unique_malloc_ptr.h"
#include "aos/util/file.h"

DEFINE_uint32(max_queue_size_override, 0,
              "If nonzero, this is the max number of elements in a queue to "
              "enforce.  If zero, use the number that the processor that this "
              "application is compiled for can support.  This is mostly useful "
              "for config validation, and shouldn't be touched.");

namespace aos {
namespace {
namespace chrono = std::chrono;

bool EndsWith(std::string_view str, std::string_view end) {
  if (str.size() < end.size()) {
    return false;
  }
  if (str.substr(str.size() - end.size(), end.size()) != end) {
    return false;
  }
  return true;
}

std::string MaybeReplaceExtension(std::string_view filename,
                                  std::string_view extension,
                                  std::string_view replacement) {
  if (!EndsWith(filename, extension)) {
    return std::string(filename);
  }
  filename.remove_suffix(extension.size());
  return absl::StrCat(filename, replacement);
}

FlatbufferDetachedBuffer<Configuration> ReadConfigFile(std::string_view path,
                                                       bool binary) {
  if (binary) {
    FlatbufferVector<Configuration> config =
        FileToFlatbuffer<Configuration>(path);
    return CopySpanAsDetachedBuffer(config.span());
  }

  flatbuffers::DetachedBuffer buffer = JsonToFlatbuffer(
      util::ReadFileToStringOrDie(path), ConfigurationTypeTable());

  CHECK_GT(buffer.size(), 0u) << ": Failed to parse JSON file: " << path;

  return FlatbufferDetachedBuffer<Configuration>(std::move(buffer));
}

}  // namespace
// Define the compare and equal operators for Channel and Application so we can
// insert them in the btree below.
bool operator<(const FlatbufferDetachedBuffer<Channel> &lhs,
               const FlatbufferDetachedBuffer<Channel> &rhs) {
  int name_compare = lhs.message().name()->string_view().compare(
      rhs.message().name()->string_view());
  if (name_compare == 0) {
    return lhs.message().type()->string_view() <
           rhs.message().type()->string_view();
  } else if (name_compare < 0) {
    return true;
  } else {
    return false;
  }
}

bool operator==(const FlatbufferDetachedBuffer<Channel> &lhs,
                const FlatbufferDetachedBuffer<Channel> &rhs) {
  return lhs.message().name()->string_view() ==
             rhs.message().name()->string_view() &&
         lhs.message().type()->string_view() ==
             rhs.message().type()->string_view();
}

bool operator<(const FlatbufferDetachedBuffer<Connection> &lhs,
               const FlatbufferDetachedBuffer<Connection> &rhs) {
  return lhs.message().name()->string_view() <
         rhs.message().name()->string_view();
}

bool operator==(const FlatbufferDetachedBuffer<Connection> &lhs,
                const FlatbufferDetachedBuffer<Connection> &rhs) {
  return lhs.message().name()->string_view() ==
         rhs.message().name()->string_view();
}

bool operator==(const FlatbufferDetachedBuffer<Application> &lhs,
                const FlatbufferDetachedBuffer<Application> &rhs) {
  return lhs.message().name()->string_view() ==
         rhs.message().name()->string_view();
}

bool operator<(const FlatbufferDetachedBuffer<Application> &lhs,
               const FlatbufferDetachedBuffer<Application> &rhs) {
  return lhs.message().name()->string_view() <
         rhs.message().name()->string_view();
}

bool operator==(const FlatbufferDetachedBuffer<Node> &lhs,
                const FlatbufferDetachedBuffer<Node> &rhs) {
  return lhs.message().name()->string_view() ==
         rhs.message().name()->string_view();
}

bool operator<(const FlatbufferDetachedBuffer<Node> &lhs,
               const FlatbufferDetachedBuffer<Node> &rhs) {
  return lhs.message().name()->string_view() <
         rhs.message().name()->string_view();
}

namespace configuration {
namespace {

template <typename T>
struct FbsContainer {
  FbsContainer(aos::FlatbufferDetachedBuffer<T> table) {
    this->table =
        std::make_unique<FlatbufferDetachedBuffer<T>>(std::move(table));
  }
  std::unique_ptr<FlatbufferDetachedBuffer<T>> table;
  bool operator==(const FbsContainer<T> &other) const {
    return *this->table == *other.table;
  }
  bool operator<(const FbsContainer<T> &other) const {
    return *this->table < *other.table;
  }
};

typedef FbsContainer<Channel> ChannelContainer;
typedef FbsContainer<Connection> ConnectionContainer;
typedef FbsContainer<Application> ApplicationContainer;
typedef FbsContainer<Node> NodeContainer;

// Extracts the folder part of a path.  Returns ./ if there is no path.
std::string_view ExtractFolder(const std::string_view filename) {
  auto last_slash_pos = filename.find_last_of("/\\");

  return last_slash_pos == std::string_view::npos
             ? std::string_view("./")
             : filename.substr(0, last_slash_pos + 1);
}

std::string AbsolutePath(const std::string_view filename) {
  // Uses an std::string so that we know the input will be null-terminated.
  const std::string terminated_file(filename);
  char buffer[PATH_MAX];
  PCHECK(NULL != realpath(terminated_file.c_str(), buffer));
  return buffer;
}

std::string RemoveDotDots(const std::string_view filename) {
  std::vector<std::string> split = absl::StrSplit(filename, '/');
  auto iterator = split.begin();
  while (iterator != split.end()) {
    if (iterator->empty()) {
      iterator = split.erase(iterator);
    } else if (*iterator == ".") {
      iterator = split.erase(iterator);
    } else if (*iterator == "..") {
      CHECK(iterator != split.begin())
          << ": Import path may not start with ..: " << filename;
      auto previous = iterator;
      --previous;
      split.erase(iterator);
      iterator = split.erase(previous);
    } else {
      ++iterator;
    }
  }
  return absl::StrJoin(split, "/");
}

std::optional<FlatbufferDetachedBuffer<Configuration>> MaybeReadConfig(
    const std::string_view path, absl::btree_set<std::string> *visited_paths,
    const std::vector<std::string_view> &extra_import_paths) {
  std::string binary_path = MaybeReplaceExtension(path, ".json", ".bfbs");
  VLOG(1) << "Looking up: " << path << ", starting with: " << binary_path;
  bool binary_path_exists = util::PathExists(binary_path);
  std::string raw_path(path);
  // For each .json file, look and see if we can find a .bfbs file next to it
  // with the same base name.  If we can, assume it is the same and use it
  // instead.  It is much faster to load .bfbs files than .json files.
  if (!binary_path_exists && !util::PathExists(raw_path)) {
    const bool path_is_absolute = raw_path.size() > 0 && raw_path[0] == '/';
    if (path_is_absolute) {
      // Nowhere else to look up an absolute path, so fail now. Note that we
      // always have at least one extra import path based on /proc/self/exe, so
      // warning about those paths existing isn't helpful.
      LOG(ERROR) << ": Failed to find file " << path << ".";
      return std::nullopt;
    }

    bool found_path = false;
    for (const auto &import_path : extra_import_paths) {
      raw_path = std::string(import_path) + "/" + RemoveDotDots(path);
      binary_path = MaybeReplaceExtension(raw_path, ".json", ".bfbs");
      VLOG(1) << "Checking: " << binary_path;
      binary_path_exists = util::PathExists(binary_path);
      if (binary_path_exists) {
        found_path = true;
        break;
      }
      VLOG(1) << "Checking: " << raw_path;
      if (util::PathExists(raw_path)) {
        found_path = true;
        break;
      }
    }
    if (!found_path) {
      LOG(ERROR) << ": Failed to find file " << path << ".";
      return std::nullopt;
    }
  }

  std::optional<FlatbufferDetachedBuffer<Configuration>> config =
      ReadConfigFile(binary_path_exists ? binary_path : raw_path,
                     binary_path_exists);

  // Depth first.  Take the following example:
  //
  // config1.json:
  // {
  //   "channels": [
  //     {
  //       "name": "/foo",
  //       "type": ".aos.bar",
  //       "max_size": 5
  //     }
  //   ],
  //   "imports": [
  //     "config2.json",
  //   ]
  // }
  //
  // config2.json:
  // {
  //   "channels": [
  //     {
  //       "name": "/foo",
  //       "type": ".aos.bar",
  //       "max_size": 7
  //     }
  //   ],
  // }
  //
  // We want the main config (config1.json) to be able to override the imported
  // config.  That means that it needs to be merged into the imported configs,
  // not the other way around.

  const std::string absolute_path =
      AbsolutePath(binary_path_exists ? binary_path : raw_path);
  // Track that we have seen this file before recursing.  Track the path we
  // actually loaded (which should be consistent if imported twice).
  if (!visited_paths->insert(absolute_path).second) {
    for (const auto &visited_path : *visited_paths) {
      LOG(INFO) << "Already visited: " << visited_path;
    }
    LOG(FATAL)
        << "Already imported " << path << " (i.e. " << absolute_path
        << "). See above for the files that have already been processed.";
    return std::nullopt;
  }

  if (config->message().has_imports()) {
    // Capture the imports.
    const flatbuffers::Vector<flatbuffers::Offset<flatbuffers::String>> *v =
        config->message().imports();

    // And then wipe them.  This gets GCed when we merge later.
    config->mutable_message()->clear_imports();

    // Start with an empty configuration to merge into.
    FlatbufferDetachedBuffer<Configuration> merged_config =
        FlatbufferDetachedBuffer<Configuration>::Empty();

    const std::string path_folder(ExtractFolder(path));
    for (const flatbuffers::String *str : *v) {
      const std::string included_config =
          path_folder + "/" + std::string(str->string_view());

      const auto optional_config =
          MaybeReadConfig(included_config, visited_paths, extra_import_paths);
      if (!optional_config.has_value()) {
        return std::nullopt;
      }
      // And them merge everything in.
      merged_config = MergeFlatBuffers(merged_config, *optional_config);
    }

    // Finally, merge this file in.
    config = MergeFlatBuffers(merged_config, *config);
  }
  return config;
}

// Compares (c < p) a channel, and a name, type tuple.
bool CompareChannels(const Channel *c,
                     ::std::pair<std::string_view, std::string_view> p) {
  int name_compare = c->name()->string_view().compare(p.first);
  if (name_compare == 0) {
    return c->type()->string_view() < p.second;
  } else if (name_compare < 0) {
    return true;
  } else {
    return false;
  }
};

// Compares for equality (c == p) a channel, and a name, type tuple.
bool EqualsChannels(const Channel *c,
                    ::std::pair<std::string_view, std::string_view> p) {
  return c->name()->string_view() == p.first &&
         c->type()->string_view() == p.second;
}

// Compares (c < p) an application, and a name;
bool CompareApplications(const Application *a, std::string_view name) {
  return a->name()->string_view() < name;
};

// Compares for equality (c == p) an application, and a name;
bool EqualsApplications(const Application *a, std::string_view name) {
  return a->name()->string_view() == name;
}

void ValidateConfiguration(const Flatbuffer<Configuration> &config) {
  // No imports should be left.
  CHECK(!config.message().has_imports());

  // Check that if there is a node list, all the source nodes are filled out and
  // valid, and all the destination nodes are valid (and not the source).  This
  // is a basic consistency check.
  if (config.message().has_channels()) {
    const Channel *last_channel = nullptr;
    for (const Channel *c : *config.message().channels()) {
      CHECK(c->has_name());
      CHECK(c->has_type());
      if (c->name()->string_view().back() == '/') {
        LOG(FATAL) << "Channel names can't end with '/'";
      }
      if (c->name()->string_view().front() != '/') {
        LOG(FATAL) << "Channel names must start with '/'";
      }
      if (c->name()->string_view().find("//") != std::string_view::npos) {
        LOG(FATAL) << ": Invalid channel name " << c->name()->string_view()
                   << ", can't use //.";
      }
      for (const char data : c->name()->string_view()) {
        if (data >= '0' && data <= '9') {
          continue;
        }
        if (data >= 'a' && data <= 'z') {
          continue;
        }
        if (data >= 'A' && data <= 'Z') {
          continue;
        }
        if (data == '-' || data == '_' || data == '/') {
          continue;
        }
        LOG(FATAL) << "Invalid channel name " << c->name()->string_view()
                   << ", can only use [-a-zA-Z0-9_/]";
      }

      CHECK_LT(QueueSize(&config.message(), c) + QueueScratchBufferSize(c),
               FLAGS_max_queue_size_override != 0
                   ? FLAGS_max_queue_size_override
                   : std::numeric_limits<
                         ipc_lib::QueueIndex::PackedIndexType>::max())
          << ": More messages/second configured than the queue can hold on "
          << CleanedChannelToString(c) << ", " << c->frequency() << "hz for "
          << ChannelStorageDuration(&config.message(), c).count() << "ns";

      if (c->has_logger_nodes()) {
        // Confirm that we don't have duplicate logger nodes.
        absl::btree_set<std::string_view> logger_nodes;
        for (const flatbuffers::String *s : *c->logger_nodes()) {
          logger_nodes.insert(s->string_view());
        }
        CHECK_EQ(static_cast<size_t>(logger_nodes.size()),
                 c->logger_nodes()->size())
            << ": Found duplicate logger_nodes in "
            << CleanedChannelToString(c);
      }

      if (c->has_destination_nodes()) {
        // Confirm that we don't have duplicate timestamp logger nodes.
        for (const Connection *d : *c->destination_nodes()) {
          if (d->has_timestamp_logger_nodes()) {
            absl::btree_set<std::string_view> timestamp_logger_nodes;
            for (const flatbuffers::String *s : *d->timestamp_logger_nodes()) {
              timestamp_logger_nodes.insert(s->string_view());
            }
            CHECK_EQ(static_cast<size_t>(timestamp_logger_nodes.size()),
                     d->timestamp_logger_nodes()->size())
                << ": Found duplicate timestamp_logger_nodes in "
                << CleanedChannelToString(c);
          }
        }

        // There is no good use case today for logging timestamps but not the
        // corresponding data.  Instead of plumbing through all of this on the
        // reader side, let'd just disallow it for now.
        if (c->logger() == LoggerConfig::NOT_LOGGED) {
          for (const Connection *d : *c->destination_nodes()) {
            CHECK(d->timestamp_logger() == LoggerConfig::NOT_LOGGED)
                << ": Logging timestamps without data is not supported.  If "
                   "you have a good use case, let's talk.  "
                << CleanedChannelToString(c);
          }
        }
      }

      // Make sure everything is sorted while we are here...  If this fails,
      // there will be a bunch of weird errors.
      if (last_channel != nullptr) {
        CHECK(CompareChannels(
            last_channel,
            std::make_pair(c->name()->string_view(), c->type()->string_view())))
            << ": Channels not sorted!";
      }
      last_channel = c;
    }
  }

  if (config.message().has_nodes() && config.message().has_channels()) {
    for (const Channel *c : *config.message().channels()) {
      CHECK(c->has_source_node()) << ": Channel " << FlatbufferToJson(c)
                                  << " is missing \"source_node\"";
      CHECK(GetNode(&config.message(), c->source_node()->string_view()) !=
            nullptr)
          << ": Channel " << FlatbufferToJson(c)
          << " has an unknown \"source_node\"";

      if (c->has_destination_nodes()) {
        for (const Connection *connection : *c->destination_nodes()) {
          CHECK(connection->has_name());
          CHECK(GetNode(&config.message(), connection->name()->string_view()) !=
                nullptr)
              << ": Channel " << FlatbufferToJson(c)
              << " has an unknown \"destination_nodes\" "
              << connection->name()->string_view();

          switch (connection->timestamp_logger()) {
            case LoggerConfig::LOCAL_LOGGER:
            case LoggerConfig::NOT_LOGGED:
              CHECK(!connection->has_timestamp_logger_nodes())
                  << ": " << CleanedChannelToString(c);
              break;
            case LoggerConfig::REMOTE_LOGGER:
            case LoggerConfig::LOCAL_AND_REMOTE_LOGGER:
              CHECK(connection->has_timestamp_logger_nodes());
              CHECK_GT(connection->timestamp_logger_nodes()->size(), 0u);
              for (const flatbuffers::String *timestamp_logger_node :
                   *connection->timestamp_logger_nodes()) {
                CHECK(GetNode(&config.message(),
                              timestamp_logger_node->string_view()) != nullptr)
                    << ": Channel " << FlatbufferToJson(c)
                    << " has an unknown \"timestamp_logger_node\""
                    << connection->name()->string_view();
              }
              break;
          }

          CHECK_NE(connection->name()->string_view(),
                   c->source_node()->string_view())
              << ": Channel " << FlatbufferToJson(c)
              << " is forwarding data to itself";
        }
      }
    }
  }
}

void HandleReverseMaps(
    const flatbuffers::Vector<flatbuffers::Offset<aos::Map>> *maps,
    std::string_view type, const Node *node, std::set<std::string> *names) {
  for (const Map *map : *maps) {
    CHECK_NOTNULL(map);
    const Channel *const match = CHECK_NOTNULL(map->match());
    const Channel *const rename = CHECK_NOTNULL(map->rename());

    // Handle type specific maps.
    const flatbuffers::String *const match_type_string = match->type();
    if (match_type_string != nullptr &&
        match_type_string->string_view() != type) {
      continue;
    }

    // Now handle node specific maps.
    const flatbuffers::String *const match_source_node_string =
        match->source_node();
    if (node != nullptr && match_source_node_string != nullptr &&
        match_source_node_string->string_view() !=
            node->name()->string_view()) {
      continue;
    }

    const flatbuffers::String *const match_name_string = match->name();
    const flatbuffers::String *const rename_name_string = rename->name();
    if (match_name_string == nullptr || rename_name_string == nullptr) {
      continue;
    }

    const std::string rename_name = rename_name_string->str();
    const std::string_view match_name = match_name_string->string_view();

    std::set<std::string> possible_renames;

    // Check if the current name(s) could have been reached using the provided
    // rename.
    if (match_name.back() == '*') {
      for (const std::string &option : *names) {
        if (option.substr(0, rename_name.size()) == rename_name) {
          possible_renames.insert(
              absl::StrCat(match_name.substr(0, match_name.size() - 1),
                           option.substr(rename_name.size())));
        }
      }
      names->insert(possible_renames.begin(), possible_renames.end());
    } else if (names->count(rename_name) != 0) {
      names->insert(std::string(match_name));
    }
  }
}

}  // namespace

// Maps name for the provided maps.  Modifies name.
//
// This is called many times during startup, and it dereferences a lot of
// pointers. These combine to make it a performance hotspot during many tests
// under msan, so there is some optimizing around caching intermediates instead
// of dereferencing the pointer multiple times.
//
// Deliberately not in an anonymous namespace so that the log-reading code can
// reference it.
void HandleMaps(const flatbuffers::Vector<flatbuffers::Offset<aos::Map>> *maps,
                std::string *name, std::string_view type, const Node *node) {
  // For the same reason we merge configs in reverse order, we want to process
  // maps in reverse order.  That lets the outer config overwrite channels from
  // the inner configs.
  for (auto i = maps->rbegin(); i != maps->rend(); ++i) {
    const Channel *const match = i->match();
    if (!match) {
      continue;
    }
    const flatbuffers::String *const match_name_string = match->name();
    if (!match_name_string) {
      continue;
    }
    const Channel *const rename = i->rename();
    if (!rename) {
      continue;
    }
    const flatbuffers::String *const rename_name_string = rename->name();
    if (!rename_name_string) {
      continue;
    }

    // Handle normal maps (now that we know that match and rename are filled
    // out).
    const std::string_view match_name = match_name_string->string_view();
    if (match_name != *name) {
      if (match_name.back() == '*' &&
          std::string_view(*name).substr(
              0, std::min(name->size(), match_name.size() - 1)) ==
              match_name.substr(0, match_name.size() - 1)) {
        CHECK_EQ(match_name.find('*'), match_name.size() - 1);
      } else {
        continue;
      }
    }

    // Handle type specific maps.
    const flatbuffers::String *const match_type_string = match->type();
    if (match_type_string && match_type_string->string_view() != type) {
      continue;
    }

    // Now handle node specific maps.
    const flatbuffers::String *const match_source_node_string =
        match->source_node();
    if (node && match_source_node_string &&
        match_source_node_string->string_view() !=
            node->name()->string_view()) {
      continue;
    }

    std::string new_name(rename_name_string->string_view());
    if (match_name.back() == '*') {
      new_name += std::string(name->substr(match_name.size() - 1));
    }
    VLOG(1) << "Renamed \"" << *name << "\" to \"" << new_name << "\"";
    *name = std::move(new_name);
  }
}

std::set<std::string> GetChannelAliases(const Configuration *config,
                                        std::string_view name,
                                        std::string_view type,
                                        const std::string_view application_name,
                                        const Node *node) {
  std::set<std::string> names{std::string(name)};
  if (config->has_maps()) {
    HandleReverseMaps(config->maps(), type, node, &names);
  }
  {
    const Application *application =
        GetApplication(config, node, application_name);
    if (application != nullptr && application->has_maps()) {
      HandleReverseMaps(application->maps(), type, node, &names);
    }
  }
  return names;
}

FlatbufferDetachedBuffer<Configuration> MergeConfiguration(
    const Flatbuffer<Configuration> &config) {
  // auto_merge_config will contain all the fields of the Configuration that are
  // to be passed through unmodified to the result of MergeConfiguration().
  // In the processing below, we mutate auto_merge_config to remove any fields
  // which we do need to alter (hence why we can't use the input config
  // directly), and then merge auto_merge_config back in at the end.
  aos::FlatbufferDetachedBuffer<aos::Configuration> auto_merge_config =
      aos::RecursiveCopyFlatBuffer(&config.message());

  // Store all the channels in a sorted set.  This lets us track channels we
  // have seen before and merge the updates in.
  absl::btree_set<ChannelContainer> channels;

  if (config.message().has_channels()) {
    auto_merge_config.mutable_message()->clear_channels();
    for (const Channel *c : *config.message().channels()) {
      // Ignore malformed entries.
      if (!c->has_name()) {
        continue;
      }
      if (!c->has_type()) {
        continue;
      }

      CHECK_EQ(c->read_method() == ReadMethod::PIN, c->num_readers() != 0)
          << ": num_readers may be set if and only if read_method is PIN,"
             " if you want 0 readers do not set PIN: "
          << CleanedChannelToString(c);

      // Attempt to insert the channel.
      auto result = channels.insert(RecursiveCopyFlatBuffer(c));
      if (!result.second) {
        // Already there, so merge the new table into the original.
        // Schemas merge poorly, so pick the newest one.
        if (result.first->table->message().has_schema() && c->has_schema()) {
          result.first->table->mutable_message()->clear_schema();
        }
        auto merged =
            MergeFlatBuffers(*result.first->table, RecursiveCopyFlatBuffer(c));

        if (merged.message().has_destination_nodes()) {
          absl::btree_set<ConnectionContainer> connections;
          for (const Connection *connection :
               *merged.message().destination_nodes()) {
            auto connection_result =
                connections.insert(RecursiveCopyFlatBuffer(connection));
            if (!connection_result.second) {
              *connection_result.first->table =
                  MergeFlatBuffers(*connection_result.first->table,
                                   RecursiveCopyFlatBuffer(connection));
            }
          }
          if (static_cast<size_t>(connections.size()) !=
              merged.message().destination_nodes()->size()) {
            merged.mutable_message()->clear_destination_nodes();
            flatbuffers::FlatBufferBuilder fbb;
            fbb.ForceDefaults(true);
            std::vector<flatbuffers::Offset<Connection>> connection_offsets;
            for (const ConnectionContainer &connection : connections) {
              connection_offsets.push_back(
                  RecursiveCopyFlatBuffer(&connection.table->message(), &fbb));
            }
            flatbuffers::Offset<
                flatbuffers::Vector<flatbuffers::Offset<Connection>>>
                destination_nodes_offset = fbb.CreateVector(connection_offsets);
            Channel::Builder channel_builder(fbb);
            channel_builder.add_destination_nodes(destination_nodes_offset);
            fbb.Finish(channel_builder.Finish());
            FlatbufferDetachedBuffer<Channel> destinations_channel(
                fbb.Release());
            merged = MergeFlatBuffers(merged, destinations_channel);
          }
        }

        *result.first->table = std::move(merged);
      }
    }
  }

  // Now repeat this for the application list.
  absl::btree_set<ApplicationContainer> applications;
  if (config.message().has_applications()) {
    auto_merge_config.mutable_message()->clear_applications();
    for (const Application *a : *config.message().applications()) {
      if (!a->has_name()) {
        continue;
      }

      auto result = applications.insert(RecursiveCopyFlatBuffer(a));
      if (!result.second) {
        if (a->has_args()) {
          result.first->table->mutable_message()->clear_args();
        }
        *result.first->table =
            MergeFlatBuffers(*result.first->table, RecursiveCopyFlatBuffer(a));
      }
    }
  }

  // Now repeat this for the node list.
  absl::btree_set<NodeContainer> nodes;
  if (config.message().has_nodes()) {
    auto_merge_config.mutable_message()->clear_nodes();
    for (const Node *n : *config.message().nodes()) {
      if (!n->has_name()) {
        continue;
      }

      auto result = nodes.insert(RecursiveCopyFlatBuffer(n));
      if (!result.second) {
        *result.first->table =
            MergeFlatBuffers(*result.first->table, RecursiveCopyFlatBuffer(n));
      }
    }
  }

  flatbuffers::FlatBufferBuilder fbb;
  fbb.ForceDefaults(true);

  // Start by building the vectors.  They need to come before the final table.
  // Channels
  flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<Channel>>>
      channels_offset;
  {
    ::std::vector<flatbuffers::Offset<Channel>> channel_offsets;
    for (const ChannelContainer &c : channels) {
      channel_offsets.emplace_back(
          RecursiveCopyFlatBuffer<Channel>(&c.table->message(), &fbb));
    }
    channels_offset = fbb.CreateVector(channel_offsets);
  }

  // Applications
  flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<Application>>>
      applications_offset;
  {
    ::std::vector<flatbuffers::Offset<Application>> applications_offsets;
    for (const ApplicationContainer &a : applications) {
      applications_offsets.emplace_back(
          RecursiveCopyFlatBuffer<Application>(&a.table->message(), &fbb));
    }
    applications_offset = fbb.CreateVector(applications_offsets);
  }

  // Nodes
  flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<Node>>>
      nodes_offset;
  {
    ::std::vector<flatbuffers::Offset<Node>> node_offsets;
    for (const NodeContainer &n : nodes) {
      node_offsets.emplace_back(
          RecursiveCopyFlatBuffer<Node>(&n.table->message(), &fbb));
    }
    nodes_offset = fbb.CreateVector(node_offsets);
  }

  // And then build a Configuration with them all.
  ConfigurationBuilder configuration_builder(fbb);
  configuration_builder.add_channels(channels_offset);
  if (config.message().has_applications()) {
    configuration_builder.add_applications(applications_offset);
  }
  if (config.message().has_nodes()) {
    configuration_builder.add_nodes(nodes_offset);
  }

  fbb.Finish(configuration_builder.Finish());

  aos::FlatbufferDetachedBuffer<aos::Configuration> modified_config(
      fbb.Release());

  // Now, validate that if there is a node list, every channel has a source
  // node.
  FlatbufferDetachedBuffer<Configuration> result =
      MergeFlatBuffers(modified_config, auto_merge_config);

  ValidateConfiguration(result);

  return result;
}

std::optional<FlatbufferDetachedBuffer<Configuration>> MaybeReadConfig(
    const std::string_view path,
    const std::vector<std::string_view> &extra_import_paths) {
  // Add the executable directory to the search path.  That makes it so that
  // tools can be run from any directory without hard-coding an absolute path to
  // the config into all binaries.
  std::vector<std::string_view> extra_import_paths_with_exe =
      extra_import_paths;
  char proc_self_exec_buffer[PATH_MAX + 1];
  std::memset(proc_self_exec_buffer, 0, sizeof(proc_self_exec_buffer));
  ssize_t s = readlink("/proc/self/exe", proc_self_exec_buffer, PATH_MAX);
  if (s > 0) {
    // If the readlink call fails, the worst thing that happens is that we don't
    // automatically find the config next to the binary.  VLOG to make it easier
    // to debug.
    std::string_view proc_self_exec(proc_self_exec_buffer);

    extra_import_paths_with_exe.emplace_back(
        proc_self_exec.substr(0, proc_self_exec.rfind("/")));
  } else {
    VLOG(1) << "Failed to read /proc/self/exe";
  }

  // We only want to read a file once.  So track the visited files in a set.
  absl::btree_set<std::string> visited_paths;
  std::optional<FlatbufferDetachedBuffer<Configuration>> read_config =
      MaybeReadConfig(path, &visited_paths, extra_import_paths_with_exe);

  if (read_config == std::nullopt) {
    return read_config;
  }

  // If we only read one file, and it had a .bfbs extension, it has to be a
  // fully formatted config.  Do a quick verification and return it.
  if (visited_paths.size() == 1 && EndsWith(*visited_paths.begin(), ".bfbs")) {
    ValidateConfiguration(*read_config);
    return read_config;
  }

  return MergeConfiguration(*read_config);
}

FlatbufferDetachedBuffer<Configuration> ReadConfig(
    const std::string_view path,
    const std::vector<std::string_view> &extra_import_paths) {
  auto optional_config = MaybeReadConfig(path, extra_import_paths);
  CHECK(optional_config) << "Could not read config. See above errors";
  return std::move(*optional_config);
}

FlatbufferDetachedBuffer<Configuration> MergeWithConfig(
    const Configuration *config, const Flatbuffer<Configuration> &addition) {
  return MergeConfiguration(MergeFlatBuffers(config, &addition.message()));
}

FlatbufferDetachedBuffer<Configuration> MergeWithConfig(
    const Configuration *config, std::string_view json) {
  FlatbufferDetachedBuffer<Configuration> addition =
      JsonToFlatbuffer(json, Configuration::MiniReflectTypeTable());

  return MergeWithConfig(config, addition);
}

const Channel *GetChannel(const Configuration *config, std::string_view name,
                          std::string_view type,
                          std::string_view application_name, const Node *node,
                          bool quiet) {
  if (!config->has_channels()) {
    return nullptr;
  }

  const std::string_view original_name = name;
  std::string mutable_name;
  if (node != nullptr) {
    VLOG(1) << "Looking up { \"name\": \"" << name << "\", \"type\": \"" << type
            << "\" } on " << aos::FlatbufferToJson(node);
  } else {
    VLOG(1) << "Looking up { \"name\": \"" << name << "\", \"type\": \"" << type
            << "\" }";
  }

  // First handle application specific maps.  Only do this if we have a matching
  // application name, and it has maps.
  {
    const Application *application =
        GetApplication(config, node, application_name);
    if (application != nullptr && application->has_maps()) {
      mutable_name = std::string(name);
      HandleMaps(application->maps(), &mutable_name, type, node);
      name = std::string_view(mutable_name);
    }
  }

  // Now do global maps.
  if (config->has_maps()) {
    mutable_name = std::string(name);
    HandleMaps(config->maps(), &mutable_name, type, node);
    name = std::string_view(mutable_name);
  }

  if (original_name != name) {
    VLOG(1) << "Remapped to { \"name\": \"" << name << "\", \"type\": \""
            << type << "\" }";
  }

  // Then look for the channel (note that this relies on the channels being
  // sorted in the config).
  auto channel_iterator =
      std::lower_bound(config->channels()->cbegin(), config->channels()->cend(),
                       std::make_pair(name, type), CompareChannels);

  // Make sure we actually found it, and it matches.
  if (channel_iterator != config->channels()->cend() &&
      EqualsChannels(*channel_iterator, std::make_pair(name, type))) {
    if (VLOG_IS_ON(2)) {
      VLOG(2) << "Found: " << FlatbufferToJson(*channel_iterator);
    } else if (VLOG_IS_ON(1)) {
      VLOG(1) << "Found: " << CleanedChannelToString(*channel_iterator);
    }
    return *channel_iterator;
  } else {
    VLOG(1) << "No match for { \"name\": \"" << name << "\", \"type\": \""
            << type << "\" }";
    if (original_name != name && !quiet) {
      VLOG(1) << "Remapped from {\"name\": \"" << original_name
              << "\", \"type\": \"" << type << "\"}, to {\"name\": \"" << name
              << "\", \"type\": \"" << type
              << "\"}, but no channel by that name exists.";
    }
    return nullptr;
  }
}

size_t ChannelIndex(const Configuration *configuration,
                    const Channel *channel) {
  CHECK(configuration->channels() != nullptr) << ": No channels";

  const auto c = std::lower_bound(
      configuration->channels()->cbegin(), configuration->channels()->cend(),
      std::make_pair(channel->name()->string_view(),
                     channel->type()->string_view()),
      CompareChannels);
  CHECK(c != configuration->channels()->cend())
      << ": Channel pointer not found in configuration()->channels()";
  CHECK(*c == channel)
      << ": Channel pointer not found in configuration()->channels()";

  return std::distance(configuration->channels()->cbegin(), c);
}

std::string CleanedChannelToString(const Channel *channel) {
  FlatbufferDetachedBuffer<Channel> cleaned_channel = CopyFlatBuffer(channel);
  cleaned_channel.mutable_message()->clear_schema();
  return FlatbufferToJson(cleaned_channel);
}

std::string StrippedChannelToString(const Channel *channel) {
  return absl::StrCat("{ \"name\": \"", channel->name()->string_view(),
                      "\", \"type\": \"", channel->type()->string_view(),
                      "\" }");
}

FlatbufferDetachedBuffer<Configuration> MergeConfiguration(
    const Flatbuffer<Configuration> &config,
    const std::vector<aos::FlatbufferVector<reflection::Schema>> &schemas) {
  flatbuffers::FlatBufferBuilder fbb;
  fbb.ForceDefaults(true);

  // Cache for holding already inserted schemas.
  std::map<std::string_view, flatbuffers::Offset<reflection::Schema>>
      schema_cache;

  CHECK_EQ(Channel::MiniReflectTypeTable()->num_elems, 14u)
      << ": Merging logic needs to be updated when the number of channel "
         "fields changes.";

  flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<Channel>>>
      channels_offset;
  if (config.message().has_channels()) {
    std::vector<flatbuffers::Offset<Channel>> channel_offsets;
    for (const Channel *c : *config.message().channels()) {
      // Search for a schema with a matching type.
      const aos::FlatbufferVector<reflection::Schema> *found_schema = nullptr;
      for (const aos::FlatbufferVector<reflection::Schema> &schema : schemas) {
        if (schema.message().root_table() != nullptr) {
          if (schema.message().root_table()->name()->string_view() ==
              c->type()->string_view()) {
            found_schema = &schema;
          }
        }
      }

      if (found_schema == nullptr) {
        std::stringstream ss;
        for (const aos::FlatbufferVector<reflection::Schema> &schema :
             schemas) {
          if (schema.message().root_table() == nullptr) {
            continue;
          }
          auto name = schema.message().root_table()->name()->string_view();
          ss << "\n\tname: " << name;
        }
        LOG(FATAL) << ": Failed to find schema for " << FlatbufferToJson(c)
                   << "\n\tThe following schemas were found:\n"
                   << ss.str();
      }

      // Now copy the message manually.
      auto cached_schema = schema_cache.find(c->type()->string_view());
      flatbuffers::Offset<reflection::Schema> schema_offset;
      if (cached_schema != schema_cache.end()) {
        schema_offset = cached_schema->second;
      } else {
        schema_offset = RecursiveCopyFlatBuffer<reflection::Schema>(
            &found_schema->message(), &fbb);
        schema_cache.emplace(c->type()->string_view(), schema_offset);
      }

      flatbuffers::Offset<flatbuffers::String> name_offset =
          fbb.CreateSharedString(c->name()->str());
      flatbuffers::Offset<flatbuffers::String> type_offset =
          fbb.CreateSharedString(c->type()->str());
      flatbuffers::Offset<flatbuffers::String> source_node_offset =
          c->has_source_node() ? fbb.CreateSharedString(c->source_node()->str())
                               : 0;

      flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<Connection>>>
          destination_nodes_offset =
              aos::RecursiveCopyVectorTable(c->destination_nodes(), &fbb);

      flatbuffers::Offset<
          flatbuffers::Vector<flatbuffers::Offset<flatbuffers::String>>>
          logger_nodes_offset =
              aos::CopyVectorSharedString(c->logger_nodes(), &fbb);

      Channel::Builder channel_builder(fbb);
      channel_builder.add_name(name_offset);
      channel_builder.add_type(type_offset);
      if (c->has_frequency()) {
        channel_builder.add_frequency(c->frequency());
      }
      if (c->has_max_size()) {
        channel_builder.add_max_size(c->max_size());
      }
      if (c->has_num_senders()) {
        channel_builder.add_num_senders(c->num_senders());
      }
      if (c->has_num_watchers()) {
        channel_builder.add_num_watchers(c->num_watchers());
      }
      channel_builder.add_schema(schema_offset);
      if (!source_node_offset.IsNull()) {
        channel_builder.add_source_node(source_node_offset);
      }
      if (!destination_nodes_offset.IsNull()) {
        channel_builder.add_destination_nodes(destination_nodes_offset);
      }
      if (c->has_logger()) {
        channel_builder.add_logger(c->logger());
      }
      if (!logger_nodes_offset.IsNull()) {
        channel_builder.add_logger_nodes(logger_nodes_offset);
      }
      if (c->has_read_method()) {
        channel_builder.add_read_method(c->read_method());
      }
      if (c->has_num_readers()) {
        channel_builder.add_num_readers(c->num_readers());
      }
      if (c->has_channel_storage_duration()) {
        channel_builder.add_channel_storage_duration(
            c->channel_storage_duration());
      }
      channel_offsets.emplace_back(channel_builder.Finish());
    }
    channels_offset = fbb.CreateVector(channel_offsets);
  }

  flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<Map>>>
      maps_offset =
          aos::RecursiveCopyVectorTable(config.message().maps(), &fbb);

  flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<Node>>>
      nodes_offset =
          aos::RecursiveCopyVectorTable(config.message().nodes(), &fbb);

  flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<Application>>>
      applications_offset =
          aos::RecursiveCopyVectorTable(config.message().applications(), &fbb);

  // Now insert everything else in unmodified.
  ConfigurationBuilder configuration_builder(fbb);
  if (config.message().has_channels()) {
    configuration_builder.add_channels(channels_offset);
  }
  if (!maps_offset.IsNull()) {
    configuration_builder.add_maps(maps_offset);
  }
  if (!nodes_offset.IsNull()) {
    configuration_builder.add_nodes(nodes_offset);
  }
  if (!applications_offset.IsNull()) {
    configuration_builder.add_applications(applications_offset);
  }

  if (config.message().has_channel_storage_duration()) {
    configuration_builder.add_channel_storage_duration(
        config.message().channel_storage_duration());
  }

  CHECK_EQ(Configuration::MiniReflectTypeTable()->num_elems, 6u)
      << ": Merging logic needs to be updated when the number of configuration "
         "fields changes.";

  fbb.Finish(configuration_builder.Finish());
  aos::FlatbufferDetachedBuffer<aos::Configuration> modified_config(
      fbb.Release());

  return modified_config;
}

const Node *GetNodeFromHostname(const Configuration *config,
                                std::string_view hostname) {
  for (const Node *node : *config->nodes()) {
    if (node->has_hostname() && node->hostname()->string_view() == hostname) {
      return node;
    }
    if (node->has_hostnames()) {
      for (const auto &candidate : *node->hostnames()) {
        if (candidate->string_view() == hostname) {
          return node;
        }
      }
    }
  }
  return nullptr;
}

std::string_view NodeName(const Configuration *config, size_t node_index) {
  if (!configuration::MultiNode(config)) {
    return "(singlenode)";
  }
  return config->nodes()->Get(node_index)->name()->string_view();
}

const Node *GetMyNode(const Configuration *config) {
  const std::string hostname = (FLAGS_override_hostname.size() > 0)
                                   ? FLAGS_override_hostname
                                   : network::GetHostname();
  const Node *node = GetNodeFromHostname(config, hostname);
  if (node != nullptr) return node;

  LOG(FATAL) << "Unknown node for host: " << hostname
             << ".  Consider using --override_hostname if hostname detection "
                "is wrong.";
  return nullptr;
}

const Node *GetNode(const Configuration *config, const Node *node) {
  if (!MultiNode(config)) {
    CHECK(node == nullptr) << ": Provided a node in a single node world.";
    return nullptr;
  } else {
    CHECK(node != nullptr);
    CHECK(node->has_name());
    return GetNode(config, node->name()->string_view());
  }
}

const Node *GetNode(const Configuration *config, std::string_view name) {
  if (!MultiNode(config)) {
    if (name.empty()) {
      return nullptr;
    }
    LOG(FATAL) << ": Asking for a named node from a single node configuration.";
  }
  for (const Node *node : *config->nodes()) {
    CHECK(node->has_name()) << ": Malformed node " << FlatbufferToJson(node);
    if (node->name()->string_view() == name) {
      return node;
    }
  }
  return nullptr;
}

const Node *GetNode(const Configuration *config, size_t node_index) {
  if (!MultiNode(config)) {
    CHECK_EQ(node_index, 0u) << ": Invalid node in a single node world.";
    return nullptr;
  } else {
    CHECK_LT(node_index, config->nodes()->size());
    return config->nodes()->Get(node_index);
  }
}

const Node *GetNodeOrDie(const Configuration *config, const Node *node) {
  if (!MultiNode(config)) {
    CHECK(node == nullptr) << ": Provided a node in a single node world.";
    return nullptr;
  } else {
    const Node *config_node = GetNode(config, node);
    if (config_node == nullptr) {
      LOG(FATAL) << "Couldn't find node matching " << FlatbufferToJson(node);
    }
    return config_node;
  }
}

namespace {
int GetNodeIndexFromConfig(const Configuration *config, const Node *node) {
  int node_index = 0;
  for (const Node *iterated_node : *config->nodes()) {
    if (iterated_node == node) {
      return node_index;
    }
    ++node_index;
  }
  return -1;
}
}  // namespace

aos::FlatbufferDetachedBuffer<aos::Configuration> AddSchema(
    std::string_view json,
    const std::vector<aos::FlatbufferVector<reflection::Schema>> &schemas) {
  FlatbufferDetachedBuffer<Configuration> addition =
      JsonToFlatbuffer(json, Configuration::MiniReflectTypeTable());
  return MergeConfiguration(addition, schemas);
}

int GetNodeIndex(const Configuration *config, const Node *node) {
  if (!MultiNode(config)) {
    return 0;
  }

  {
    int node_index = GetNodeIndexFromConfig(config, node);
    if (node_index != -1) {
      return node_index;
    }
  }

  const Node *result = GetNode(config, node);
  CHECK(result != nullptr);

  {
    int node_index = GetNodeIndexFromConfig(config, result);
    if (node_index != -1) {
      return node_index;
    }
  }

  LOG(FATAL) << "Node " << FlatbufferToJson(node)
             << " not found in the configuration.";
}

int GetNodeIndex(const Configuration *config, std::string_view name) {
  if (!MultiNode(config)) {
    return 0;
  }

  {
    int node_index = 0;
    for (const Node *iterated_node : *config->nodes()) {
      if (iterated_node->name()->string_view() == name) {
        return node_index;
      }
      ++node_index;
    }
  }
  LOG(FATAL) << "Node " << name << " not found in the configuration.";
}

size_t NodesCount(const Configuration *config) {
  if (!MultiNode(config)) {
    return 1u;
  }

  return config->nodes()->size();
}

std::vector<const Node *> GetNodes(const Configuration *config) {
  std::vector<const Node *> nodes;
  if (MultiNode(config)) {
    for (const Node *node : *config->nodes()) {
      nodes.emplace_back(node);
    }
  } else {
    nodes.emplace_back(nullptr);
  }
  return nodes;
}

std::vector<const Node *> GetNodesWithTag(const Configuration *config,
                                          std::string_view tag) {
  std::vector<const Node *> nodes;
  if (!MultiNode(config)) {
    nodes.emplace_back(nullptr);
  } else {
    for (const Node *node : *config->nodes()) {
      if (!node->has_tags()) {
        continue;
      }
      bool did_found_tag = false;
      for (const flatbuffers::String *found_tag : *node->tags()) {
        if (found_tag->string_view() == tag) {
          did_found_tag = true;
          break;
        }
      }
      if (did_found_tag) {
        nodes.emplace_back(node);
      }
    }
  }
  return nodes;
}

bool NodeHasTag(const Node *node, std::string_view tag) {
  if (node == nullptr) {
    return true;
  }

  if (!node->has_tags()) {
    return false;
  }

  const auto *const tags = node->tags();
  return std::find_if(tags->begin(), tags->end(),
                      [tag](const flatbuffers::String *candidate) {
                        return candidate->string_view() == tag;
                      }) != tags->end();
}

bool MultiNode(const Configuration *config) { return config->has_nodes(); }

bool ChannelIsSendableOnNode(const Channel *channel, const Node *node) {
  if (node == nullptr) {
    return true;
  }
  CHECK(channel->has_source_node()) << FlatbufferToJson(channel);
  CHECK(node->has_name()) << FlatbufferToJson(node);
  return (CHECK_NOTNULL(channel)->source_node()->string_view() ==
          node->name()->string_view());
}

bool ChannelIsReadableOnNode(const Channel *channel, const Node *node) {
  if (node == nullptr) {
    return true;
  }

  if (channel->source_node()->string_view() == node->name()->string_view()) {
    return true;
  }

  if (!channel->has_destination_nodes()) {
    return false;
  }

  for (const Connection *connection : *channel->destination_nodes()) {
    CHECK(connection->has_name());
    if (connection->name()->string_view() == node->name()->string_view()) {
      return true;
    }
  }

  return false;
}

bool ChannelIsForwardedFromNode(const Channel *channel, const Node *node) {
  if (node == nullptr) {
    return false;
  }
  return ChannelIsSendableOnNode(channel, node) &&
         channel->has_destination_nodes() &&
         channel->destination_nodes()->size() > 0u;
}

bool ChannelMessageIsLoggedOnNode(const Channel *channel, const Node *node) {
  if (node == nullptr) {
    // Single node world.  If there is a local logger, then we want to use
    // it.
    if (channel->logger() == LoggerConfig::LOCAL_LOGGER) {
      return true;
    } else if (channel->logger() == LoggerConfig::NOT_LOGGED) {
      return false;
    }
    LOG(FATAL) << "Unsupported logging configuration in a single node world: "
               << CleanedChannelToString(channel);
  }
  return ChannelMessageIsLoggedOnNode(
      channel, CHECK_NOTNULL(node)->name()->string_view());
}

bool ChannelMessageIsLoggedOnNode(const Channel *channel,
                                  std::string_view node_name) {
  switch (channel->logger()) {
    case LoggerConfig::LOCAL_LOGGER:
      return channel->source_node()->string_view() == node_name;
    case LoggerConfig::LOCAL_AND_REMOTE_LOGGER:
      CHECK(channel->has_logger_nodes())
          << "Missing logger nodes on " << StrippedChannelToString(channel);
      CHECK_GT(channel->logger_nodes()->size(), 0u)
          << "Missing logger nodes on " << StrippedChannelToString(channel);

      if (channel->source_node()->string_view() == node_name) {
        return true;
      }

      [[fallthrough]];
    case LoggerConfig::REMOTE_LOGGER:
      CHECK(channel->has_logger_nodes())
          << "Missing logger nodes on " << StrippedChannelToString(channel);
      CHECK_GT(channel->logger_nodes()->size(), 0u)
          << "Missing logger nodes on " << StrippedChannelToString(channel);
      for (const flatbuffers::String *logger_node : *channel->logger_nodes()) {
        if (logger_node->string_view() == node_name) {
          return true;
        }
      }

      return false;
    case LoggerConfig::NOT_LOGGED:
      return false;
  }

  LOG(FATAL) << "Unknown logger config " << static_cast<int>(channel->logger());
}

size_t ConnectionCount(const Channel *channel) {
  if (!channel->has_destination_nodes()) {
    return 0;
  }
  return channel->destination_nodes()->size();
}

const Connection *ConnectionToNode(const Channel *channel, const Node *node) {
  if (!channel->has_destination_nodes()) {
    return nullptr;
  }
  for (const Connection *connection : *channel->destination_nodes()) {
    if (connection->name()->string_view() == node->name()->string_view()) {
      return connection;
    }
  }
  return nullptr;
}

bool ConnectionDeliveryTimeIsLoggedOnNode(const Channel *channel,
                                          const Node *node,
                                          const Node *logger_node) {
  return ConnectionDeliveryTimeIsLoggedOnNode(ConnectionToNode(channel, node),
                                              logger_node);
}

bool ConnectionDeliveryTimeIsLoggedOnNode(const Connection *connection,
                                          const Node *node) {
  if (connection == nullptr) {
    return false;
  }
  switch (connection->timestamp_logger()) {
    case LoggerConfig::LOCAL_AND_REMOTE_LOGGER:
      CHECK(connection->has_timestamp_logger_nodes());
      CHECK_GT(connection->timestamp_logger_nodes()->size(), 0u);
      if (connection->name()->string_view() == node->name()->string_view()) {
        return true;
      }

      [[fallthrough]];
    case LoggerConfig::REMOTE_LOGGER:
      CHECK(connection->has_timestamp_logger_nodes());
      CHECK_GT(connection->timestamp_logger_nodes()->size(), 0u);
      for (const flatbuffers::String *timestamp_logger_node :
           *connection->timestamp_logger_nodes()) {
        if (timestamp_logger_node->string_view() ==
            node->name()->string_view()) {
          return true;
        }
      }

      return false;
    case LoggerConfig::LOCAL_LOGGER:
      return connection->name()->string_view() == node->name()->string_view();
    case LoggerConfig::NOT_LOGGED:
      return false;
  }

  LOG(FATAL) << "Unknown logger config "
             << static_cast<int>(connection->timestamp_logger());
}

std::vector<std::string_view> SourceNodeNames(const Configuration *config,
                                              const Node *my_node) {
  std::set<std::string_view> result_set;

  for (const Channel *channel : *config->channels()) {
    if (channel->has_destination_nodes()) {
      for (const Connection *connection : *channel->destination_nodes()) {
        if (connection->name()->string_view() ==
            my_node->name()->string_view()) {
          result_set.insert(channel->source_node()->string_view());
        }
      }
    }
  }

  std::vector<std::string_view> result;
  for (const std::string_view source : result_set) {
    VLOG(1) << "Found a source node of " << source;
    result.emplace_back(source);
  }
  return result;
}

std::vector<std::string_view> DestinationNodeNames(const Configuration *config,
                                                   const Node *my_node) {
  std::vector<std::string_view> result;

  for (const Channel *channel : *config->channels()) {
    if (channel->has_source_node() && channel->source_node()->string_view() ==
                                          my_node->name()->string_view()) {
      if (!channel->has_destination_nodes()) continue;

      if (channel->source_node()->string_view() !=
          my_node->name()->string_view()) {
        continue;
      }

      for (const Connection *connection : *channel->destination_nodes()) {
        if (std::find(result.begin(), result.end(),
                      connection->name()->string_view()) == result.end()) {
          result.emplace_back(connection->name()->string_view());
        }
      }
    }
  }

  for (const std::string_view destination : result) {
    VLOG(1) << "Found a destination node of " << destination;
  }
  return result;
}

std::vector<const Node *> TimestampNodes(const Configuration *config,
                                         const Node *my_node) {
  if (!configuration::MultiNode(config)) {
    CHECK(my_node == nullptr);
    return std::vector<const Node *>{};
  }

  std::set<const Node *> timestamp_logger_nodes;
  for (const Channel *channel : *config->channels()) {
    if (!configuration::ChannelIsSendableOnNode(channel, my_node)) {
      continue;
    }
    if (!channel->has_destination_nodes()) {
      continue;
    }
    for (const Connection *connection : *channel->destination_nodes()) {
      const Node *other_node =
          configuration::GetNode(config, connection->name()->string_view());

      if (configuration::ConnectionDeliveryTimeIsLoggedOnNode(connection,
                                                              my_node)) {
        VLOG(1) << "Timestamps are logged from "
                << FlatbufferToJson(other_node);
        timestamp_logger_nodes.insert(other_node);
      }
    }
  }

  std::vector<const Node *> result;
  for (const Node *node : timestamp_logger_nodes) {
    result.emplace_back(node);
  }
  return result;
}

bool ApplicationShouldStart(const Configuration *config, const Node *my_node,
                            const Application *application) {
  if (MultiNode(config)) {
    // Ok, we need
    CHECK(application->has_nodes());
    CHECK(my_node != nullptr);
    for (const flatbuffers::String *str : *application->nodes()) {
      if (str->string_view() == my_node->name()->string_view()) {
        return true;
      }
    }
    return false;
  } else {
    return true;
  }
}

const Application *GetApplication(const Configuration *config,
                                  const Node *my_node,
                                  std::string_view application_name) {
  if (config->has_applications()) {
    auto application_iterator = std::lower_bound(
        config->applications()->cbegin(), config->applications()->cend(),
        application_name, CompareApplications);
    if (application_iterator != config->applications()->cend() &&
        EqualsApplications(*application_iterator, application_name)) {
      if (ApplicationShouldStart(config, my_node, *application_iterator)) {
        return *application_iterator;
      }
    }
  }
  return nullptr;
}

std::vector<size_t> SourceNodeIndex(const Configuration *config) {
  CHECK(config->has_channels());
  std::vector<size_t> result;
  result.resize(config->channels()->size(), 0u);
  if (MultiNode(config)) {
    for (size_t i = 0; i < config->channels()->size(); ++i) {
      result[i] = GetNodeIndex(
          config, config->channels()->Get(i)->source_node()->string_view());
    }
  }
  return result;
}

chrono::nanoseconds ChannelStorageDuration(const Configuration *config,
                                           const Channel *channel) {
  CHECK(channel != nullptr);
  if (channel->has_channel_storage_duration()) {
    return chrono::nanoseconds(channel->channel_storage_duration());
  }
  return chrono::nanoseconds(config->channel_storage_duration());
}

size_t QueueSize(const Configuration *config, const Channel *channel) {
  return QueueSize(channel->frequency(),
                   ChannelStorageDuration(config, channel));
}

size_t QueueSize(size_t frequency,
                 chrono::nanoseconds channel_storage_duration) {
  // Use integer arithmetic and round up at all cost.
  return static_cast<int>(
      (999'999'999 +
       static_cast<int64_t>(frequency) *
           static_cast<int64_t>(channel_storage_duration.count())) /
      static_cast<int64_t>(1'000'000'000));
}

int QueueScratchBufferSize(const Channel *channel) {
  return channel->num_readers() + channel->num_senders();
}

// Searches through configurations for schemas that include a certain type
const reflection::Schema *GetSchema(const Configuration *config,
                                    std::string_view schema_type) {
  if (config->has_channels()) {
    std::vector<flatbuffers::Offset<Channel>> channel_offsets;
    for (const Channel *c : *config->channels()) {
      if (schema_type == c->type()->string_view()) {
        return c->schema();
      }
    }
  }
  return nullptr;
}

// Copy schema reflection into detached flatbuffer
std::optional<FlatbufferDetachedBuffer<reflection::Schema>>
GetSchemaDetachedBuffer(const Configuration *config,
                        std::string_view schema_type) {
  const reflection::Schema *found_schema = GetSchema(config, schema_type);
  if (found_schema == nullptr) {
    return std::nullopt;
  }
  return RecursiveCopyFlatBuffer(found_schema);
}

aos::FlatbufferDetachedBuffer<Configuration> AddChannelToConfiguration(
    const Configuration *config, std::string_view name,
    aos::FlatbufferVector<reflection::Schema> schema, const aos::Node *node,
    ChannelT overrides) {
  overrides.name = name;
  CHECK(schema.message().has_root_table());
  overrides.type = schema.message().root_table()->name()->string_view();
  if (node != nullptr) {
    CHECK(node->has_name());
    overrides.source_node = node->name()->string_view();
  }
  flatbuffers::FlatBufferBuilder fbb;
  // Don't populate fields from overrides that the user doesn't explicitly
  // override.
  fbb.ForceDefaults(false);
  const flatbuffers::Offset<Channel> channel_offset =
      Channel::Pack(fbb, &overrides);
  const flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<Channel>>>
      channels_offset = fbb.CreateVector({channel_offset});
  Configuration::Builder config_builder(fbb);
  config_builder.add_channels(channels_offset);
  fbb.Finish(config_builder.Finish());
  FlatbufferDetachedBuffer<Configuration> new_channel_config = fbb.Release();
  new_channel_config = MergeConfiguration(new_channel_config, {schema});
  return MergeWithConfig(config, new_channel_config);
}

FlatbufferDetachedBuffer<Configuration> GetPartialConfiguration(
    const Configuration &configuration,
    std::function<bool(const Channel &)> should_include_channel) {
  // create new_configuration1, containing everything except the `channels`
  // field.
  FlatbufferDetachedBuffer<Configuration> new_configuration1 =
      RecursiveCopyFlatBuffer(&configuration);
  new_configuration1.mutable_message()->clear_channels();

  // create new_configuration2, containing only the `channels` field.
  flatbuffers::FlatBufferBuilder fbb;
  std::vector<flatbuffers::Offset<Channel>> new_channels_vec;
  for (const auto &channel : *configuration.channels()) {
    CHECK_NOTNULL(channel);
    if (should_include_channel(*channel)) {
      new_channels_vec.push_back(RecursiveCopyFlatBuffer(channel, &fbb));
    }
  }
  flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<Channel>>>
      new_channels_offset = fbb.CreateVector(new_channels_vec);
  Configuration::Builder new_configuration2_builder(fbb);
  new_configuration2_builder.add_channels(new_channels_offset);
  fbb.Finish(new_configuration2_builder.Finish());
  FlatbufferDetachedBuffer<Configuration> new_configuration2 = fbb.Release();

  // Merge the configuration containing channels with the configuration
  // containing everything else, creating a complete configuration.
  const aos::FlatbufferDetachedBuffer<Configuration> raw_subset_configuration =
      MergeFlatBuffers(&new_configuration1.message(),
                       &new_configuration2.message());

  // Use MergeConfiguration to clean up redundant schemas.
  return configuration::MergeConfiguration(raw_subset_configuration);
}
}  // namespace configuration
}  // namespace aos
