Initial message_bridge client and server

These will forward data, and track what made it across and what didn't
when configured correctly.  This should be off if nothing is requested
to be logged remotely.

It implements ttl, reconnects, and has a basic smoke test.

We still need to handle forwarding data for logging.

Change-Id: I7daebe8cef54029a5733b7f81ee6b68367c80d82
diff --git a/aos/events/BUILD b/aos/events/BUILD
index e40ab99..79200bd 100644
--- a/aos/events/BUILD
+++ b/aos/events/BUILD
@@ -121,6 +121,8 @@
     flatbuffers = [
         ":ping_fbs",
         ":pong_fbs",
+        "//aos/network:message_bridge_client_fbs",
+        "//aos/network:message_bridge_server_fbs",
     ],
     deps = [":config"],
 )
diff --git a/aos/events/logging/BUILD b/aos/events/logging/BUILD
index 97d5783..b2be972 100644
--- a/aos/events/logging/BUILD
+++ b/aos/events/logging/BUILD
@@ -1,4 +1,5 @@
 load("@com_github_google_flatbuffers//:build_defs.bzl", "flatbuffer_cc_library")
+load("//aos:config.bzl", "aos_config")
 
 flatbuffer_cc_library(
     name = "logger_fbs",
@@ -84,11 +85,21 @@
     ],
 )
 
+aos_config(
+    name = "multinode_pingpong_config",
+    src = "multinode_pingpong.json",
+    flatbuffers = [
+        "//aos/events:ping_fbs",
+        "//aos/events:pong_fbs",
+    ],
+    deps = ["//aos/events:config"],
+)
+
 cc_test(
     name = "logger_test",
     srcs = ["logger_test.cc"],
     data = [
-        "//aos/events:multinode_pingpong_config.json",
+        ":multinode_pingpong_config.json",
         "//aos/events:pingpong_config.json",
     ],
     deps = [
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index 91190e7..70c520d 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -144,7 +144,7 @@
  public:
   MultinodeLoggerTest()
       : config_(aos::configuration::ReadConfig(
-            "aos/events/multinode_pingpong_config.json")),
+            "aos/events/logging/multinode_pingpong_config.json")),
         event_loop_factory_(&config_.message(), "pi1"),
         ping_event_loop_(event_loop_factory_.MakeEventLoop("ping")),
         ping_(ping_event_loop_.get()) {}
diff --git a/aos/events/logging/multinode_pingpong.json b/aos/events/logging/multinode_pingpong.json
new file mode 100644
index 0000000..f85a4e1
--- /dev/null
+++ b/aos/events/logging/multinode_pingpong.json
@@ -0,0 +1,93 @@
+{
+  "channels": [
+    /* Logged on pi1 locally */
+    {
+      "name": "/aos/pi1",
+      "type": "aos.timing.Report",
+      "source_node": "pi1",
+      "frequency": 50,
+      "num_senders": 20,
+      "max_size": 2048
+    },
+    {
+      "name": "/aos/pi2",
+      "type": "aos.timing.Report",
+      "source_node": "pi2",
+      "frequency": 50,
+      "num_senders": 20,
+      "max_size": 2048
+    },
+    /* Forwarded to pi2.
+     * Doesn't matter where timestamps are logged for the test.
+     */
+    {
+      "name": "/test",
+      "type": "aos.examples.Ping",
+      "source_node": "pi1",
+      "destination_nodes": [
+        {
+          "name": "pi2",
+          "priority": 1,
+          "timestamp_logger": "REMOTE_LOGGER",
+          "timestamp_logger_node": "pi1",
+          "time_to_live": 5000000
+        }
+      ]
+    },
+    /* Forwarded back to pi1.
+     * The message is logged both on the sending node and the receiving node
+     * (to make it easier to look at the results for now).
+     *
+     * The timestamps are logged on the receiving node.
+     */
+    {
+      "name": "/test",
+      "type": "aos.examples.Pong",
+      "source_node": "pi2",
+      "logger": "LOCAL_AND_REMOTE_LOGGER",
+      "logger_node": "pi1",
+      "destination_nodes": [
+        {
+          "name": "pi1",
+          "priority": 1,
+          "timestamp_logger": "LOCAL_LOGGER",
+          "time_to_live": 5000000
+        }
+      ]
+    }
+  ],
+  "maps": [
+    {
+      "match": {
+        "name": "/aos",
+        "type": "aos.timing.Report",
+        "source_node": "pi1"
+      },
+      "rename": {
+        "name": "/aos/pi1"
+      }
+    },
+    {
+      "match": {
+        "name": "/aos",
+        "type": "aos.timing.Report",
+        "source_node": "pi2"
+      },
+      "rename": {
+        "name": "/aos/pi2"
+      }
+    }
+  ],
+  "nodes": [
+    {
+      "name": "pi1",
+      "hostname": "raspberrypi",
+      "port": 9971
+    },
+    {
+      "name": "pi2",
+      "hostname": "raspberrypi2",
+      "port": 9971
+    }
+  ]
+}
diff --git a/aos/events/multinode_pingpong.json b/aos/events/multinode_pingpong.json
index f0e532e..c0c5087 100644
--- a/aos/events/multinode_pingpong.json
+++ b/aos/events/multinode_pingpong.json
@@ -2,6 +2,54 @@
   "channels": [
     {
       "name": "/aos/pi1",
+      "type": "aos.message_bridge.ServerStatistics",
+      "source_node": "pi1",
+      "frequency": 2
+    },
+    {
+      "name": "/aos/pi2",
+      "type": "aos.message_bridge.ServerStatistics",
+      "source_node": "pi2",
+      "frequency": 2
+    },
+    {
+      "name": "/aos/pi3",
+      "type": "aos.message_bridge.ServerStatistics",
+      "source_node": "pi3",
+      "frequency": 2
+    },
+    {
+      "name": "/aos/roborio",
+      "type": "aos.message_bridge.ServerStatistics",
+      "source_node": "roborio",
+      "frequency": 2
+    },
+    {
+      "name": "/aos/pi1",
+      "type": "aos.message_bridge.ClientStatistics",
+      "source_node": "pi1",
+      "frequency": 2
+    },
+    {
+      "name": "/aos/pi2",
+      "type": "aos.message_bridge.ClientStatistics",
+      "source_node": "pi2",
+      "frequency": 2
+    },
+    {
+      "name": "/aos/pi3",
+      "type": "aos.message_bridge.ClientStatistics",
+      "source_node": "pi3",
+      "frequency": 2
+    },
+    {
+      "name": "/aos/roborio",
+      "type": "aos.message_bridge.ClientStatistics",
+      "source_node": "roborio",
+      "frequency": 2
+    },
+    {
+      "name": "/aos/pi1",
       "type": "aos.timing.Report",
       "source_node": "pi1",
       "frequency": 50,
@@ -25,6 +73,14 @@
       "max_size": 2048
     },
     {
+      "name": "/aos/roborio",
+      "type": "aos.timing.Report",
+      "source_node": "roborio",
+      "frequency": 50,
+      "num_senders": 20,
+      "max_size": 2048
+    },
+    {
       "name": "/test",
       "type": "aos.examples.Ping",
       "source_node": "pi1",
@@ -32,8 +88,8 @@
         {
           "name": "pi2",
           "priority": 1,
-          "timestamp_logger": "REMOTE_LOGGER",
-          "timestamp_logger_node": "pi1"
+          "timestamp_logger": "LOCAL_LOGGER",
+          "time_to_live": 5000000
         }
       ]
     },
@@ -41,14 +97,12 @@
       "name": "/test",
       "type": "aos.examples.Pong",
       "source_node": "pi2",
-      "logger": "LOCAL_AND_REMOTE_LOGGER",
-      "logger_node": "pi1",
       "destination_nodes": [
         {
           "name": "pi1",
           "priority": 1,
-          "timestamp_logger": "REMOTE_LOGGER",
-          "timestamp_logger_node": "pi1"
+          "timestamp_logger": "LOCAL_LOGGER",
+          "time_to_live": 5000000
         }
       ]
     },
@@ -60,8 +114,8 @@
         {
           "name": "pi3",
           "priority": 1,
-          "timestamp_logger": "REMOTE_LOGGER",
-          "timestamp_logger_node": "pi1"
+          "timestamp_logger": "LOCAL_LOGGER",
+          "time_to_live": 5000000
         }
       ]
     },
@@ -69,14 +123,12 @@
       "name": "/test2",
       "type": "aos.examples.Pong",
       "source_node": "pi3",
-      "logger": "LOCAL_AND_REMOTE_LOGGER",
-      "logger_node": "pi1",
       "destination_nodes": [
         {
           "name": "pi1",
           "priority": 1,
-          "timestamp_logger": "REMOTE_LOGGER",
-          "timestamp_logger_node": "pi1"
+          "timestamp_logger": "LOCAL_LOGGER",
+          "time_to_live": 5000000
         }
       ]
     }
@@ -111,6 +163,96 @@
       "rename": {
         "name": "/aos/pi3"
       }
+    },
+    {
+      "match": {
+        "name": "/aos",
+        "type": "aos.timing.Report",
+        "source_node": "roborio"
+      },
+      "rename": {
+        "name": "/aos/roborio"
+      }
+    },
+    {
+      "match": {
+        "name": "/aos",
+        "type": "aos.message_bridge.ServerStatistics",
+        "source_node": "pi1"
+      },
+      "rename": {
+        "name": "/aos/pi1"
+      }
+    },
+    {
+      "match": {
+        "name": "/aos",
+        "type": "aos.message_bridge.ServerStatistics",
+        "source_node": "pi2"
+      },
+      "rename": {
+        "name": "/aos/pi2"
+      }
+    },
+    {
+      "match": {
+        "name": "/aos",
+        "type": "aos.message_bridge.ServerStatistics",
+        "source_node": "pi3"
+      },
+      "rename": {
+        "name": "/aos/pi3"
+      }
+    },
+    {
+      "match": {
+        "name": "/aos",
+        "type": "aos.message_bridge.ServerStatistics",
+        "source_node": "roborio"
+      },
+      "rename": {
+        "name": "/aos/roborio"
+      }
+    },
+    {
+      "match": {
+        "name": "/aos",
+        "type": "aos.message_bridge.ClientStatistics",
+        "source_node": "pi1"
+      },
+      "rename": {
+        "name": "/aos/pi1"
+      }
+    },
+    {
+      "match": {
+        "name": "/aos",
+        "type": "aos.message_bridge.ClientStatistics",
+        "source_node": "pi2"
+      },
+      "rename": {
+        "name": "/aos/pi2"
+      }
+    },
+    {
+      "match": {
+        "name": "/aos",
+        "type": "aos.message_bridge.ClientStatistics",
+        "source_node": "pi3"
+      },
+      "rename": {
+        "name": "/aos/pi3"
+      }
+    },
+    {
+      "match": {
+        "name": "/aos",
+        "type": "aos.message_bridge.ClientStatistics",
+        "source_node": "roborio"
+      },
+      "rename": {
+        "name": "/aos/roborio"
+      }
     }
   ],
   "nodes": [
@@ -128,6 +270,11 @@
       "name": "pi3",
       "hostname": "raspberrypi3",
       "port": 9971
+    },
+    {
+      "name": "roborio",
+      "hostname": "roboRIO-6971-FRC",
+      "port": 9971
     }
   ],
   "applications": [
@@ -156,6 +303,32 @@
           }
         }
       ]
+    },
+    {
+      "name": "ping3",
+      "maps": [
+        {
+          "match": {
+            "name": "/test"
+          },
+          "rename": {
+            "name": "/test3"
+          }
+        }
+      ]
+    },
+    {
+      "name": "pong3",
+      "maps": [
+        {
+          "match": {
+            "name": "/test"
+          },
+          "rename": {
+            "name": "/test3"
+          }
+        }
+      ]
     }
   ]
 }
diff --git a/aos/events/shm_event_loop.cc b/aos/events/shm_event_loop.cc
index 3ced933..bdee4f2 100644
--- a/aos/events/shm_event_loop.cc
+++ b/aos/events/shm_event_loop.cc
@@ -21,10 +21,26 @@
 #include "aos/util/phased_loop.h"
 #include "glog/logging.h"
 
+namespace {
+
+// Returns the portion of the path after the last /.  This very much assumes
+// that the application name is null terminated.
+const char *Filename(const char *path) {
+  const std::string_view path_string_view = path;
+  auto last_slash_pos = path_string_view.find_last_of("/");
+
+  return last_slash_pos == std::string_view::npos ? path
+                                                  : path + last_slash_pos + 1;
+}
+
+}  // namespace
+
 DEFINE_string(shm_base, "/dev/shm/aos",
               "Directory to place queue backing mmaped files in.");
 DEFINE_uint32(permissions, 0770,
               "Permissions to make shared memory files and folders.");
+DEFINE_string(application_name, Filename(program_invocation_name),
+              "The application name");
 
 namespace aos {
 
@@ -135,15 +151,6 @@
 
 namespace {
 
-// Returns the portion of the path after the last /.
-std::string_view Filename(std::string_view path) {
-  auto last_slash_pos = path.find_last_of("/");
-
-  return last_slash_pos == std::string_view::npos
-             ? path
-             : path.substr(last_slash_pos + 1, path.size());
-}
-
 const Node *MaybeMyNode(const Configuration *configuration) {
   if (!configuration->has_nodes()) {
     return nullptr;
@@ -158,7 +165,7 @@
 
 ShmEventLoop::ShmEventLoop(const Configuration *configuration)
     : EventLoop(configuration),
-      name_(Filename(program_invocation_name)),
+      name_(FLAGS_application_name),
       node_(MaybeMyNode(configuration)) {
   if (configuration->has_nodes()) {
     CHECK(node_ != nullptr) << ": Couldn't find node in config.";
@@ -802,15 +809,16 @@
   }
 
   SignalHandler::global()->Unregister(this);
+
+  // Trigger any remaining senders or fetchers to be cleared before destroying
+  // the event loop so the book keeping matches.  Do this in the thread that
+  // created the timing reporter.
+  timing_report_sender_.reset();
 }
 
 void ShmEventLoop::Exit() { epoll_.Quit(); }
 
 ShmEventLoop::~ShmEventLoop() {
-  // Trigger any remaining senders or fetchers to be cleared before destroying
-  // the event loop so the book keeping matches.
-  timing_report_sender_.reset();
-
   // Force everything with a registered fd with epoll to be destroyed now.
   timers_.clear();
   phased_loops_.clear();
diff --git a/aos/events/shm_event_loop.h b/aos/events/shm_event_loop.h
index d10989e..bb4ad77 100644
--- a/aos/events/shm_event_loop.h
+++ b/aos/events/shm_event_loop.h
@@ -71,6 +71,8 @@
 
   int priority() const override { return priority_; }
 
+  internal::EPoll *epoll() { return &epoll_; }
+
  private:
   friend class internal::WatcherState;
   friend class internal::TimerHandlerState;