Make web proxy use EventLoop instead of ShmEventLoop

Change-Id: Ibee5190b51fae46ae40c9234cbf5e34b60f77bca
diff --git a/aos/network/BUILD b/aos/network/BUILD
index 2f36854..5b5e84b 100644
--- a/aos/network/BUILD
+++ b/aos/network/BUILD
@@ -338,7 +338,6 @@
         "-Wno-unused-parameter",
     ],
     data = [
-        "//aos/events:pingpong_config.json",
         "//aos/network/www:files",
         "//aos/network/www:flatbuffers",
         "//aos/network/www:main_bundle",
diff --git a/aos/network/web_proxy.cc b/aos/network/web_proxy.cc
index 3e9cd8f..507f73f 100644
--- a/aos/network/web_proxy.cc
+++ b/aos/network/web_proxy.cc
@@ -26,11 +26,35 @@
 
 }  // namespace
 
-WebsocketHandler::WebsocketHandler(
-    ::seasocks::Server *server,
-    const std::vector<std::unique_ptr<Subscriber>> &subscribers,
-    const aos::FlatbufferDetachedBuffer<aos::Configuration> &config)
-    : server_(server), subscribers_(subscribers), config_(config) {}
+WebsocketHandler::WebsocketHandler(::seasocks::Server *server,
+                                   aos::EventLoop *event_loop)
+    : server_(server),
+      config_(aos::CopyFlatBuffer(event_loop->configuration())) {
+  const bool is_multi_node =
+      aos::configuration::MultiNode(event_loop->configuration());
+  const aos::Node *self =
+      is_multi_node ? aos::configuration::GetMyNode(event_loop->configuration())
+                    : nullptr;
+
+  for (uint i = 0; i < event_loop->configuration()->channels()->size(); ++i) {
+    auto channel = event_loop->configuration()->channels()->Get(i);
+    if (aos::configuration::ChannelIsReadableOnNode(channel, self)) {
+      auto fetcher = event_loop->MakeRawFetcher(channel);
+      subscribers_.emplace_back(
+          std::make_unique<aos::web_proxy::Subscriber>(std::move(fetcher), i));
+    }
+  }
+
+  TimerHandler *const timer = event_loop->AddTimer([this]() {
+    for (auto &subscriber : subscribers_) {
+      subscriber->RunIteration();
+    }
+  });
+
+  event_loop->OnRun([timer, event_loop]() {
+    timer->Setup(event_loop->monotonic_now(), std::chrono::milliseconds(100));
+  });
+}
 
 void WebsocketHandler::onConnect(::seasocks::WebSocket *sock) {
   std::unique_ptr<Connection> conn =
@@ -224,11 +248,6 @@
       flatbuffers::GetRoot<message_bridge::Connect>(buffer.data.data());
   VLOG(2) << "Got a connect message " << aos::FlatbufferToJson(message);
   for (auto &subscriber : subscribers_) {
-    // Make sure the subscriber is for a channel on this node.
-    if (subscriber.get() == nullptr) {
-      VLOG(2) << ": Null subscriber";
-      continue;
-    }
     bool found_match = false;
     for (auto channel : *message->channels_to_transfer()) {
       if (subscriber->Compare(channel)) {
diff --git a/aos/network/web_proxy.h b/aos/network/web_proxy.h
index 549a7ae..749e8fc 100644
--- a/aos/network/web_proxy.h
+++ b/aos/network/web_proxy.h
@@ -2,7 +2,7 @@
 #define AOS_NETWORK_WEB_PROXY_H_
 #include <map>
 #include <set>
-#include "aos/events/shm_event_loop.h"
+#include "aos/events/event_loop.h"
 #include "aos/network/connect_generated.h"
 #include "aos/network/web_proxy_generated.h"
 #include "aos/seasocks/seasocks_logger.h"
@@ -24,10 +24,7 @@
 // websocket closes, it deletes the Connection.
 class WebsocketHandler : public ::seasocks::WebSocket::Handler {
  public:
-  WebsocketHandler(
-      ::seasocks::Server *server,
-      const std::vector<std::unique_ptr<Subscriber>> &subscribers,
-      const aos::FlatbufferDetachedBuffer<aos::Configuration> &config);
+  WebsocketHandler(::seasocks::Server *server, aos::EventLoop *event_loop);
   void onConnect(::seasocks::WebSocket *sock) override;
   void onData(::seasocks::WebSocket *sock, const uint8_t *data,
               size_t size) override;
@@ -36,8 +33,8 @@
  private:
   std::map<::seasocks::WebSocket *, std::unique_ptr<Connection>> connections_;
   ::seasocks::Server *server_;
-  const std::vector<std::unique_ptr<Subscriber>> &subscribers_;
-  const aos::FlatbufferDetachedBuffer<aos::Configuration> &config_;
+  std::vector<std::unique_ptr<Subscriber>> subscribers_;
+  const aos::FlatbufferDetachedBuffer<aos::Configuration> config_;
 };
 
 // Seasocks requires that sends happen on the correct thread. This class takes a
@@ -62,6 +59,8 @@
 // Represents a fetcher and all the Connections that care about it.
 // Handles building the message and telling each connection to send it.
 // indexed by location of the channel it handles in the config.
+// TODO(james): Make it so that Subscriber can optionally maintain an infinite
+// backlog of messages.
 class Subscriber {
  public:
   Subscriber(std::unique_ptr<RawFetcher> fetcher, int channel_index)
diff --git a/aos/network/web_proxy_main.cc b/aos/network/web_proxy_main.cc
index 227f980..2d8b97b 100644
--- a/aos/network/web_proxy_main.cc
+++ b/aos/network/web_proxy_main.cc
@@ -12,49 +12,9 @@
 DEFINE_string(config, "./config.json", "File path of aos configuration");
 DEFINE_string(data_dir, "www", "Directory to serve data files from");
 
-void RunDataThread(
-    std::vector<std::unique_ptr<aos::web_proxy::Subscriber>> *subscribers,
-    const aos::FlatbufferDetachedBuffer<aos::Configuration> &config) {
-  aos::ShmEventLoop event_loop(&config.message());
-  const bool is_multi_node =
-      aos::configuration::MultiNode(event_loop.configuration());
-  const aos::Node *self =
-      is_multi_node ? aos::configuration::GetMyNode(event_loop.configuration())
-                    : nullptr;
-
-  LOG(INFO) << "My node is " << aos::FlatbufferToJson(self);
-
-  for (uint i = 0; i < config.message().channels()->size(); ++i) {
-    auto channel = config.message().channels()->Get(i);
-    if (aos::configuration::ChannelIsReadableOnNode(channel, self)) {
-      auto fetcher = event_loop.MakeRawFetcher(channel);
-      subscribers->emplace_back(
-          std::make_unique<aos::web_proxy::Subscriber>(std::move(fetcher), i));
-    } else {
-      subscribers->emplace_back(nullptr);
-    }
-  }
-
-  flatbuffers::FlatBufferBuilder fbb(1024);
-
-  auto timer = event_loop.AddTimer([&]() {
-    for (auto &subscriber : *subscribers) {
-      if (subscriber != nullptr) {
-        subscriber->RunIteration();
-      }
-    }
-  });
-
-  event_loop.OnRun([&]() {
-    timer->Setup(event_loop.monotonic_now(), std::chrono::milliseconds(100));
-  });
-
-  event_loop.Run();
-}
-
 int main(int argc, char **argv) {
-  // Make sure to reference this to force the linker to include it.
   aos::InitGoogle(&argc, &argv);
+  // Make sure to reference this to force the linker to include it.
   findEmbeddedContent("");
 
   aos::InitNRT();
@@ -62,17 +22,16 @@
   aos::FlatbufferDetachedBuffer<aos::Configuration> config =
       aos::configuration::ReadConfig(FLAGS_config);
 
-  std::vector<std::unique_ptr<aos::web_proxy::Subscriber>> subscribers;
-
-  std::thread data_thread{
-      [&subscribers, &config]() { RunDataThread(&subscribers, config); }};
+  aos::ShmEventLoop event_loop(&config.message());
 
   seasocks::Server server(std::shared_ptr<seasocks::Logger>(
       new aos::seasocks::SeasocksLogger(seasocks::Logger::Level::Info)));
 
-  auto websocket_handler = std::make_shared<aos::web_proxy::WebsocketHandler>(
-      &server, subscribers, config);
+  auto websocket_handler =
+      std::make_shared<aos::web_proxy::WebsocketHandler>(&server, &event_loop);
   server.addWebSocketHandler("/ws", websocket_handler);
 
+  std::thread data_thread{[&event_loop]() { event_loop.Run(); }};
+
   server.serve(FLAGS_data_dir.c_str(), 8080);
 }