TCP server for the vision library.

Change-Id: Id73304cabe3f746d72d51cc7dfe5adbe61c20c16
diff --git a/aos/vision/events/BUILD b/aos/vision/events/BUILD
index 609a864..c3400fe 100644
--- a/aos/vision/events/BUILD
+++ b/aos/vision/events/BUILD
@@ -12,13 +12,30 @@
 )
 
 cc_library(
+  name = 'intrusive_free_list',
+  hdrs = ['intrusive_free_list.h'],
+)
+
+cc_library(
+  name = 'tcp_server',
+  srcs = ['tcp_server.cc'],
+  hdrs = ['tcp_server.h'],
+  deps = [':epoll_events', ':intrusive_free_list'],
+)
+
+cc_library(
+  name = 'tcp_client',
+  srcs = ['tcp_client.cc'],
+  hdrs = ['tcp_client.h'],
+  deps = [':epoll_events'],
+)
+
+cc_library(
   name = 'udp',
-  visibility = ['//visibility:public'],
   srcs = ['udp.cc'],
   hdrs = ['udp.h'],
   deps = [
     '//aos/common:macros',
-    '//aos/common/logging',
     '//aos/common:scoped_fd',
   ],
 )
@@ -28,6 +45,6 @@
   srcs = ['udp_test.cc'],
   deps = [
     ':udp',
-    '//aos/testing:googletest'
+    '//aos/testing:googletest',
   ],
 )
diff --git a/aos/vision/events/epoll_events.cc b/aos/vision/events/epoll_events.cc
index aaecdbf..f191259 100644
--- a/aos/vision/events/epoll_events.cc
+++ b/aos/vision/events/epoll_events.cc
@@ -1,87 +1,72 @@
 #include "aos/vision/events/epoll_events.h"
 
-#include <string.h>
-#include <sys/types.h>
-#include <sys/socket.h>
 #include <fcntl.h>
+#include <string.h>
 #include <sys/epoll.h>
-
+#include <sys/socket.h>
+#include <sys/types.h>
 #include <vector>
 
-#include "aos/common/scoped_fd.h"
 #include "aos/common/logging/logging.h"
 
 namespace aos {
 namespace events {
 
-class EpollLoop::Impl {
- public:
-  Impl() : epoll_fd_(PCHECK(epoll_create1(0))) {}
+EpollLoop::EpollLoop() : epoll_fd_(PCHECK(epoll_create1(0))) {}
 
-  void Add(EpollEvent *event) {
-    struct epoll_event temp_event;
-    temp_event.data.ptr = static_cast<void *>(event);
-    temp_event.events = EPOLLIN;
-    PCHECK(epoll_ctl(epoll_fd(), EPOLL_CTL_ADD, event->fd(), &temp_event));
-  }
+void EpollLoop::Add(EpollEvent *event) {
+  event->loop_ = this;
+  struct epoll_event temp_event;
+  temp_event.data.ptr = static_cast<void *>(event);
+  temp_event.events = EPOLLIN;
+  PCHECK(epoll_ctl(epoll_fd(), EPOLL_CTL_ADD, event->fd(), &temp_event));
+}
 
-  void Run() {
-    while (true) {
-      const int timeout = CalculateTimeout();
-      static constexpr size_t kNumberOfEvents = 64;
-      epoll_event events[kNumberOfEvents];
-      const int number_events = PCHECK(
-          epoll_wait(epoll_fd(), events, kNumberOfEvents, timeout));
+void EpollLoop::Delete(EpollEvent *event) {
+  PCHECK(epoll_ctl(epoll_fd(), EPOLL_CTL_DEL, event->fd(), NULL));
+}
 
-      for (int i = 0; i < number_events; i++) {
-        EpollEvent *event =
-            static_cast<EpollEvent *>(events[i].data.ptr);
-        if ((events[i].events & ~(EPOLLIN | EPOLLPRI)) != 0) {
-          LOG(FATAL, "unexpected epoll events set in %x on %d\n",
-              events[i].events, event->fd());
-        }
-        event->ReadEvent();
+void EpollLoop::Run() {
+  while (true) {
+    const int timeout = CalculateTimeout();
+    static constexpr size_t kNumberOfEvents = 64;
+    epoll_event events[kNumberOfEvents];
+    const int number_events =
+        PCHECK(epoll_wait(epoll_fd(), events, kNumberOfEvents, timeout));
+
+    for (int i = 0; i < number_events; i++) {
+      EpollEvent *event = static_cast<EpollEvent *>(events[i].data.ptr);
+      if ((events[i].events & ~(EPOLLIN | EPOLLPRI)) != 0) {
+        LOG(FATAL, "unexpected epoll events set in %x on %d\n",
+            events[i].events, event->fd());
       }
+      event->ReadEvent();
+    }
 
-      for (EpollWatcher *watcher : watchers_) {
-        watcher->Wake();
-      }
+    for (EpollWatcher *watcher : watchers_) {
+      watcher->Wake();
     }
   }
+}
 
-  void AddWait(EpollWait *wait) { waits_.push_back(wait); }
-  void AddWatcher(EpollWatcher *watcher) { watchers_.push_back(watcher); }
-
-  int epoll_fd() { return epoll_fd_.get(); }
-
- private:
-  // Calculates the new timeout value to pass to epoll_wait.
-  int CalculateTimeout() {
-    const monotonic_clock::time_point monotonic_now = monotonic_clock::now();
-    int r = -1;
-    for (EpollWait *c : waits_) {
-      const int new_timeout = c->Recalculate(monotonic_now);
-      if (new_timeout >= 0) {
-        if (r < 0 || new_timeout < r) {
-          r = new_timeout;
-        }
-      }
-    }
-    return r;
-  }
-
- private:
-  ::aos::ScopedFD epoll_fd_;
-  ::std::vector<EpollWait *> waits_;
-  ::std::vector<EpollWatcher *> watchers_;
-};
-
-EpollLoop::EpollLoop() : impl_(new Impl()) {}
-void EpollLoop::Run() { impl_->Run(); }
-void EpollLoop::Add(EpollEvent *event) { impl_->Add(event); }
-void EpollLoop::AddWait(EpollWait *wait) { impl_->AddWait(wait); }
+void EpollLoop::AddWait(EpollWait *wait) { waits_.push_back(wait); }
 void EpollLoop::AddWatcher(EpollWatcher *watcher) {
-  impl_->AddWatcher(watcher);
+  watchers_.push_back(watcher);
+}
+
+// Calculates the new timeout value to pass to epoll_wait.
+int EpollLoop::CalculateTimeout() {
+  const monotonic_clock::time_point monotonic_now = monotonic_clock::now();
+  int r = -1;
+  for (EpollWait *c : waits_) {
+    const int new_timeout = c->Recalculate(monotonic_now);
+    if (new_timeout >= 0) {
+      if (r < 0 || new_timeout < r) {
+        r = new_timeout;
+      }
+    }
+  }
+  return r;
 }
 
 }  // namespace events
diff --git a/aos/vision/events/epoll_events.h b/aos/vision/events/epoll_events.h
index 1cb6842..d7574f9 100644
--- a/aos/vision/events/epoll_events.h
+++ b/aos/vision/events/epoll_events.h
@@ -1,11 +1,12 @@
 #ifndef AOS_VISION_EVENTS_EPOLL_EVENTS_H_
 #define AOS_VISION_EVENTS_EPOLL_EVENTS_H_
 
-#include <stdint.h>
 #include <limits.h>
-
+#include <stdint.h>
 #include <memory>
+#include <vector>
 
+#include "aos/common/scoped_fd.h"
 #include "aos/common/time.h"
 
 namespace aos {
@@ -66,8 +67,12 @@
   // loop degrades into a busy loop.
   virtual void ReadEvent() = 0;
 
+  EpollLoop *loop() { return loop_; }
+
  private:
   const int fd_;
+  friend class EpollLoop;
+  EpollLoop *loop_ = nullptr;
 };
 
 // Provides a way for code to be notified every time after events are handled by
@@ -82,11 +87,6 @@
 };
 
 // A file descriptor based event loop implemented with epoll.
-//
-// There is currently no way to remove events because that's hard
-// (have to deal with events that come in for a file descriptor after it's
-// removed, which means not closing the fd or destroying the object until the
-// epoll mechanism confirms its removal) and we don't have a use for it.
 class EpollLoop {
  public:
   EpollLoop();
@@ -97,13 +97,28 @@
   void Add(EpollEvent *event);
   void AddWatcher(EpollWatcher *watcher);
 
+  // Delete event. Note that there are caveats here as this is
+  // not idiot proof.
+  // ie:
+  //  - Do not call from other threads.
+  //  - Do not free while the object could still receive events.
+  //  - This is safe only because the events are set as edge.
+  //  TODO(parker): The thread-safety of this should be investigated and
+  //  improved as well as adding support for non-edge events if this is to
+  //  be used more generally.
+  void Delete(EpollEvent *event);
+
   // Loops forever, handling events.
   void Run();
 
  private:
-  class Impl;
+  int epoll_fd() { return epoll_fd_.get(); }
 
-  ::std::unique_ptr<Impl> impl_;
+  int CalculateTimeout();
+
+  ::aos::ScopedFD epoll_fd_;
+  ::std::vector<EpollWait *> waits_;
+  ::std::vector<EpollWatcher *> watchers_;
 };
 
 }  // namespace events
diff --git a/aos/vision/events/intrusive_free_list.h b/aos/vision/events/intrusive_free_list.h
new file mode 100644
index 0000000..0373741
--- /dev/null
+++ b/aos/vision/events/intrusive_free_list.h
@@ -0,0 +1,81 @@
+#ifndef _AOS_VISION_EVENTS_INTRUSIVE_FREE_LIST_H_
+#define _AOS_VISION_EVENTS_INTRUSIVE_FREE_LIST_H_
+
+namespace aos {
+namespace events {
+
+// Hey! Maybe you want a doubly linked list that frees things for you!
+// This allows the entry to delete itself, removing it from the list, or
+// when the list gets destructed all the entries get destructed.
+//
+// class MyType : public intrusive_free_list<MyType>::element {
+//  public:
+//   MyType(int i, intrusive_free_list<MyType>* list)
+//       : element(list, this), i_(i) {}
+//   ~MyType() { printf("o%d\n", i_); }
+//  private:
+//   int i_;
+// };
+//
+// void test_fn() {
+//   intrusive_free_list<MyType> free_list;
+//   auto o0 = new MyType(0, &free_list);
+//   auto o1 = new MyType(1, &free_list);
+//   auto o2 = new MyType(2, &free_list);
+//   auto o3 = new MyType(2, &free_list);
+//
+//   delete o2;
+//   delete o1;
+// }
+//
+// // This will print:
+// o2
+// o1
+// o3
+// o0
+//
+// Note that anything that was not manually freed (o0, o3) is
+// freed by the linked list destructor at the end. This ensures everything
+// is always destructed even entities not manually destructed.
+template <class T>
+class intrusive_free_list {
+ public:
+  class element {
+   public:
+    element(intrusive_free_list<T> *list, T *t) : list_(list), prev_(nullptr) {
+      next_ = list_->begin_;
+      if (next_) next_->prev_ = t;
+      list_->begin_ = t;
+    }
+    ~element() {
+      if (next_) next_->prev_ = prev_;
+      if (prev_) {
+        prev_->next_ = next_;
+      } else {
+        list_->begin_ = next_;
+      }
+    }
+    T *next() { return next_; }
+
+   private:
+    friend class intrusive_free_list<T>;
+    // Consider using static casts and a header element to save this pointer.
+    intrusive_free_list<T> *list_;
+    T *next_;
+    T *prev_;
+  };
+  intrusive_free_list() : begin_(nullptr) {}
+  ~intrusive_free_list() {
+    while (begin_) delete begin_;
+  }
+  T *begin() { return begin_; }
+
+ private:
+  friend class element;
+  T *begin_;
+};
+
+}  // namespace events
+}  // namespace aos
+
+#endif  // _AOS_VISION_EVENTS_INTRUSIVE_FREE_LIST_H_
diff --git a/aos/vision/events/tcp_client.cc b/aos/vision/events/tcp_client.cc
new file mode 100644
index 0000000..41485f9
--- /dev/null
+++ b/aos/vision/events/tcp_client.cc
@@ -0,0 +1,67 @@
+#include "aos/vision/events/tcp_client.h"
+
+#include <arpa/inet.h>
+#include <errno.h>
+#include <fcntl.h>
+#include <netdb.h>
+#include <netinet/in.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <unistd.h>
+#include <unistd.h>
+
+#include "aos/common/logging/logging.h"
+
+namespace aos {
+namespace events {
+
+namespace {
+int MakeSocketNonBlocking(int sfd) {
+  int flags;
+
+  PCHECK(flags = fcntl(sfd, F_GETFL, 0));
+
+  flags |= O_NONBLOCK;
+  PCHECK(fcntl(sfd, F_SETFL, flags));
+
+  return 0;
+}
+
+int OpenClient(const char *hostname, int portno) {
+  int sockfd;
+  struct sockaddr_in serveraddr;
+  struct hostent *server;
+  /* socket: create the socket */
+  PCHECK(sockfd = socket(AF_INET, SOCK_STREAM, 0));
+
+  /* gethostbyname: get the server's DNS entry */
+  server = gethostbyname(hostname);
+  if (server == NULL) {
+    fprintf(stderr, "ERROR, no such host as %s\n", hostname);
+    exit(-1);
+  }
+
+  /* build the server's Internet address */
+  bzero((char *)&serveraddr, sizeof(serveraddr));
+  serveraddr.sin_family = AF_INET;
+  bcopy((char *)server->h_addr, (char *)&serveraddr.sin_addr.s_addr,
+        server->h_length);
+  serveraddr.sin_port = htons(portno);
+
+  /* connect: create a connection with the server */
+  PCHECK(connect(sockfd, (const struct sockaddr *)&serveraddr,
+                 sizeof(serveraddr)));
+  PCHECK(MakeSocketNonBlocking(sockfd));
+
+  return sockfd;
+}
+}  // namespace
+
+TcpClient::TcpClient(const char *hostname, int portno)
+    : EpollEvent(OpenClient(hostname, portno)) {}
+
+}  // namespace events
+}  // namespace aos
diff --git a/aos/vision/events/tcp_client.h b/aos/vision/events/tcp_client.h
new file mode 100644
index 0000000..e7b80a6
--- /dev/null
+++ b/aos/vision/events/tcp_client.h
@@ -0,0 +1,22 @@
+#ifndef _AOS_VISION_DEBUG_TCP_SERVER_H_
+#define _AOS_VISION_DEBUG_TCP_SERVER_H_
+
+#include "aos/vision/events/epoll_events.h"
+
+#include <memory>
+
+namespace aos {
+namespace events {
+
+// Handles the client connection logic to hostname:portno
+class TcpClient : public EpollEvent {
+ public:
+  TcpClient(const char *hostname, int portno);
+
+  // Implement ReadEvent from EpollEvent to use this class.
+};
+
+}  // namespace events
+}  // namespace aos
+
+#endif  // _AOS_VISION_DEBUG_TCP_SERVER_H_
diff --git a/aos/vision/events/tcp_server.cc b/aos/vision/events/tcp_server.cc
new file mode 100644
index 0000000..06f3a41
--- /dev/null
+++ b/aos/vision/events/tcp_server.cc
@@ -0,0 +1,110 @@
+#include "aos/vision/events/tcp_server.h"
+
+#include <arpa/inet.h>
+#include <errno.h>
+#include <fcntl.h>
+#include <netdb.h>
+#include <netinet/in.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/epoll.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <unistd.h>
+
+#include "aos/common/logging/logging.h"
+
+namespace aos {
+namespace events {
+
+namespace {
+
+int MakeSocketNonBlocking(int sfd) {
+  int flags;
+
+  PCHECK(flags = fcntl(sfd, F_GETFL, 0));
+
+  flags |= O_NONBLOCK;
+  PCHECK(fcntl(sfd, F_SETFL, flags));
+
+  return 0;
+}
+}  // namespace
+
+// This is all copied from somewhere.
+int TCPServerBase::SocketBindListenOnPort(int portno) {
+  int parentfd;                  /* parent socket */
+  struct sockaddr_in serveraddr; /* server's addr */
+  int optval;                    /* flag value for setsockopt */
+
+  /*
+   * socket: create the parent socket
+   */
+  PCHECK(parentfd = socket(AF_INET, SOCK_STREAM, 0));
+
+  /* setsockopt: Handy debugging trick that lets
+   * us rerun the server immediately after we kill it;
+   * otherwise we have to wait about 20 secs.
+   * Eliminates "ERROR on binding: Address already in use" error.
+   */
+  optval = 1;
+  PCHECK(setsockopt(parentfd, SOL_SOCKET, SO_REUSEADDR, (const void *)&optval,
+                    sizeof(int)));
+
+  /*
+   * build the server's Internet address
+   */
+  bzero((char *)&serveraddr, sizeof(serveraddr));
+
+  /* this is an Internet address */
+  serveraddr.sin_family = AF_INET;
+
+  /* Listen on 0.0.0.0 */
+  serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
+
+  /* this is the port we will listen on */
+  serveraddr.sin_port = htons((uint16_t)portno);
+
+  /*
+   * bind: associate the parent socket with a port
+   */
+  PCHECK(bind(parentfd, (struct sockaddr *)&serveraddr, sizeof(serveraddr)));
+
+  PCHECK(listen(parentfd, SOMAXCONN));
+
+  LOG(INFO, "connected to port: %d on fd: %d\n", portno, parentfd);
+  return parentfd;
+}
+
+TCPServerBase::~TCPServerBase() { close(fd()); }
+
+void TCPServerBase::ReadEvent() {
+  /* We have a notification on the listening socket, which
+   means one or more incoming connections. */
+  struct sockaddr in_addr;
+  socklen_t in_len;
+  int infd;
+
+  in_len = sizeof in_addr;
+  infd = accept(fd(), &in_addr, &in_len);
+  if (infd == -1) {
+    if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) {
+      /* We have processed all incoming
+         connections. */
+      return;
+    } else {
+      PLOG(WARNING, "accept");
+      return;
+    }
+  }
+
+  /* Make the incoming socket non-blocking and add it to the
+     list of fds to monitor. */
+  PCHECK(MakeSocketNonBlocking(infd));
+
+  loop()->Add(Construct(infd));
+}
+
+}  // namespace events
+}  // namespace aos
diff --git a/aos/vision/events/tcp_server.h b/aos/vision/events/tcp_server.h
new file mode 100644
index 0000000..3116f55
--- /dev/null
+++ b/aos/vision/events/tcp_server.h
@@ -0,0 +1,77 @@
+#ifndef _AOS_VISION_EVENTS_TCP_SERVER_H_
+#define _AOS_VISION_EVENTS_TCP_SERVER_H_
+
+#include "aos/vision/events/epoll_events.h"
+#include "aos/vision/events/intrusive_free_list.h"
+
+#include <memory>
+#include <vector>
+
+namespace aos {
+namespace events {
+
+// Non-templatized base class of TCP server.
+// TCPServer implements Construct which specializes the client connection
+// based on the specific use-case.
+template <class T>
+class TCPServer;
+class SocketConnection;
+class TCPServerBase : public EpollEvent {
+ public:
+  TCPServerBase(int fd) : EpollEvent(fd) {}
+  ~TCPServerBase();
+
+ protected:
+  // Listens on port portno. File descriptor to
+  // accept on is returned.
+  static int SocketBindListenOnPort(int portno);
+
+ private:
+  virtual SocketConnection *Construct(int child_fd) = 0;
+  void ReadEvent() override;
+  friend class SocketConnection;
+  template <class T>
+  friend class TCPServer;
+  intrusive_free_list<SocketConnection> free_list;
+};
+
+// Base class for client connections. Clients are responsible for
+// deleting themselves once the connection is broken. This will remove
+// the entry from the free list.
+class SocketConnection : public EpollEvent,
+                         public intrusive_free_list<SocketConnection>::element {
+ public:
+  SocketConnection(TCPServerBase *server, int fd)
+      : EpollEvent(fd), element(&server->free_list, this) {}
+};
+
+// T should be a subclass of SocketConnection.
+template <class T>
+class TCPServer : public TCPServerBase {
+ public:
+  TCPServer(int port) : TCPServerBase(SocketBindListenOnPort(port)) {}
+  SocketConnection *Construct(int child_fd) override {
+    return new T(this, child_fd);
+  }
+
+  static std::unique_ptr<TCPServer<T>> Make(int port) {
+    return std::unique_ptr<TCPServer<T>>(new TCPServer<T>(port));
+  }
+
+  // Call blk on each entry of the free-list. This is used to send a message
+  // to all clients.
+  template <typename EachBlock>
+  void Broadcast(const EachBlock &blk) {
+    auto a = free_list.begin();
+    while (a) {
+      auto client = static_cast<T *>(a);
+      blk(client);
+      a = a->next();
+    }
+  }
+};
+
+}  // namespace events
+}  // namespace aos
+
+#endif  // _AOS_VISION_EVENTS_TCP_SERVER_H_