Merge Connections as well as Channels

Someone tried to make a config with duplicate Connections.  The end
result was message_gateway reconnecting continually, which isn't the
right behavior.  Dedup these.

Change-Id: I26f5d5ee34d7811df8a95dbe80341fb014a4f270
Signed-off-by: Austin Schuh <austin.schuh@bluerivertech.com>
diff --git a/aos/configuration.cc b/aos/configuration.cc
index a00c8e7..87abf15 100644
--- a/aos/configuration.cc
+++ b/aos/configuration.cc
@@ -91,6 +91,18 @@
              rhs.message().type()->string_view();
 }
 
+bool operator<(const FlatbufferDetachedBuffer<Connection> &lhs,
+               const FlatbufferDetachedBuffer<Connection> &rhs) {
+  return lhs.message().name()->string_view() <
+         rhs.message().name()->string_view();
+}
+
+bool operator==(const FlatbufferDetachedBuffer<Connection> &lhs,
+                const FlatbufferDetachedBuffer<Connection> &rhs) {
+  return lhs.message().name()->string_view() ==
+         rhs.message().name()->string_view();
+}
+
 bool operator==(const FlatbufferDetachedBuffer<Application> &lhs,
                 const FlatbufferDetachedBuffer<Application> &rhs) {
   return lhs.message().name()->string_view() ==
@@ -495,8 +507,45 @@
       auto result = channels.insert(RecursiveCopyFlatBuffer(c));
       if (!result.second) {
         // Already there, so merge the new table into the original.
-        *result.first =
+        auto merged =
             MergeFlatBuffers(*result.first, RecursiveCopyFlatBuffer(c));
+
+        if (merged.message().has_destination_nodes()) {
+          absl::btree_set<FlatbufferDetachedBuffer<Connection>> connections;
+          for (const Connection *connection :
+               *merged.message().destination_nodes()) {
+            auto connection_result =
+                connections.insert(RecursiveCopyFlatBuffer(connection));
+            if (!connection_result.second) {
+              *connection_result.first =
+                  MergeFlatBuffers(*connection_result.first,
+                                   RecursiveCopyFlatBuffer(connection));
+            }
+          }
+          if (static_cast<size_t>(connections.size()) !=
+              merged.message().destination_nodes()->size()) {
+            merged.mutable_message()->clear_destination_nodes();
+            flatbuffers::FlatBufferBuilder fbb;
+            fbb.ForceDefaults(true);
+            std::vector<flatbuffers::Offset<Connection>> connection_offsets;
+            for (const FlatbufferDetachedBuffer<Connection> &connection :
+                 connections) {
+              connection_offsets.push_back(
+                  RecursiveCopyFlatBuffer(&connection.message(), &fbb));
+            }
+            flatbuffers::Offset<
+                flatbuffers::Vector<flatbuffers::Offset<Connection>>>
+                destination_nodes_offset = fbb.CreateVector(connection_offsets);
+            Channel::Builder channel_builder(fbb);
+            channel_builder.add_destination_nodes(destination_nodes_offset);
+            fbb.Finish(channel_builder.Finish());
+            FlatbufferDetachedBuffer<Channel> destinations_channel(
+                fbb.Release());
+            merged = MergeFlatBuffers(merged, destinations_channel);
+          }
+        }
+
+        *result.first = std::move(merged);
       }
     }
   }