TCP server for the vision library.
Change-Id: Id73304cabe3f746d72d51cc7dfe5adbe61c20c16
diff --git a/aos/vision/events/BUILD b/aos/vision/events/BUILD
index 609a864..c3400fe 100644
--- a/aos/vision/events/BUILD
+++ b/aos/vision/events/BUILD
@@ -12,13 +12,30 @@
)
cc_library(
+ name = 'intrusive_free_list',
+ hdrs = ['intrusive_free_list.h'],
+)
+
+cc_library(
+ name = 'tcp_server',
+ srcs = ['tcp_server.cc'],
+ hdrs = ['tcp_server.h'],
+ deps = [':epoll_events', ':intrusive_free_list'],
+)
+
+cc_library(
+ name = 'tcp_client',
+ srcs = ['tcp_client.cc'],
+ hdrs = ['tcp_client.h'],
+ deps = [':epoll_events'],
+)
+
+cc_library(
name = 'udp',
- visibility = ['//visibility:public'],
srcs = ['udp.cc'],
hdrs = ['udp.h'],
deps = [
'//aos/common:macros',
- '//aos/common/logging',
'//aos/common:scoped_fd',
],
)
@@ -28,6 +45,6 @@
srcs = ['udp_test.cc'],
deps = [
':udp',
- '//aos/testing:googletest'
+ '//aos/testing:googletest',
],
)
diff --git a/aos/vision/events/epoll_events.cc b/aos/vision/events/epoll_events.cc
index aaecdbf..f191259 100644
--- a/aos/vision/events/epoll_events.cc
+++ b/aos/vision/events/epoll_events.cc
@@ -1,87 +1,72 @@
#include "aos/vision/events/epoll_events.h"
-#include <string.h>
-#include <sys/types.h>
-#include <sys/socket.h>
#include <fcntl.h>
+#include <string.h>
#include <sys/epoll.h>
-
+#include <sys/socket.h>
+#include <sys/types.h>
#include <vector>
-#include "aos/common/scoped_fd.h"
#include "aos/common/logging/logging.h"
namespace aos {
namespace events {
-class EpollLoop::Impl {
- public:
- Impl() : epoll_fd_(PCHECK(epoll_create1(0))) {}
+EpollLoop::EpollLoop() : epoll_fd_(PCHECK(epoll_create1(0))) {}
- void Add(EpollEvent *event) {
- struct epoll_event temp_event;
- temp_event.data.ptr = static_cast<void *>(event);
- temp_event.events = EPOLLIN;
- PCHECK(epoll_ctl(epoll_fd(), EPOLL_CTL_ADD, event->fd(), &temp_event));
- }
+void EpollLoop::Add(EpollEvent *event) {
+ event->loop_ = this;
+ struct epoll_event temp_event;
+ temp_event.data.ptr = static_cast<void *>(event);
+ temp_event.events = EPOLLIN;
+ PCHECK(epoll_ctl(epoll_fd(), EPOLL_CTL_ADD, event->fd(), &temp_event));
+}
- void Run() {
- while (true) {
- const int timeout = CalculateTimeout();
- static constexpr size_t kNumberOfEvents = 64;
- epoll_event events[kNumberOfEvents];
- const int number_events = PCHECK(
- epoll_wait(epoll_fd(), events, kNumberOfEvents, timeout));
+void EpollLoop::Delete(EpollEvent *event) {
+ PCHECK(epoll_ctl(epoll_fd(), EPOLL_CTL_DEL, event->fd(), NULL));
+}
- for (int i = 0; i < number_events; i++) {
- EpollEvent *event =
- static_cast<EpollEvent *>(events[i].data.ptr);
- if ((events[i].events & ~(EPOLLIN | EPOLLPRI)) != 0) {
- LOG(FATAL, "unexpected epoll events set in %x on %d\n",
- events[i].events, event->fd());
- }
- event->ReadEvent();
+void EpollLoop::Run() {
+ while (true) {
+ const int timeout = CalculateTimeout();
+ static constexpr size_t kNumberOfEvents = 64;
+ epoll_event events[kNumberOfEvents];
+ const int number_events =
+ PCHECK(epoll_wait(epoll_fd(), events, kNumberOfEvents, timeout));
+
+ for (int i = 0; i < number_events; i++) {
+ EpollEvent *event = static_cast<EpollEvent *>(events[i].data.ptr);
+ if ((events[i].events & ~(EPOLLIN | EPOLLPRI)) != 0) {
+ LOG(FATAL, "unexpected epoll events set in %x on %d\n",
+ events[i].events, event->fd());
}
+ event->ReadEvent();
+ }
- for (EpollWatcher *watcher : watchers_) {
- watcher->Wake();
- }
+ for (EpollWatcher *watcher : watchers_) {
+ watcher->Wake();
}
}
+}
- void AddWait(EpollWait *wait) { waits_.push_back(wait); }
- void AddWatcher(EpollWatcher *watcher) { watchers_.push_back(watcher); }
-
- int epoll_fd() { return epoll_fd_.get(); }
-
- private:
- // Calculates the new timeout value to pass to epoll_wait.
- int CalculateTimeout() {
- const monotonic_clock::time_point monotonic_now = monotonic_clock::now();
- int r = -1;
- for (EpollWait *c : waits_) {
- const int new_timeout = c->Recalculate(monotonic_now);
- if (new_timeout >= 0) {
- if (r < 0 || new_timeout < r) {
- r = new_timeout;
- }
- }
- }
- return r;
- }
-
- private:
- ::aos::ScopedFD epoll_fd_;
- ::std::vector<EpollWait *> waits_;
- ::std::vector<EpollWatcher *> watchers_;
-};
-
-EpollLoop::EpollLoop() : impl_(new Impl()) {}
-void EpollLoop::Run() { impl_->Run(); }
-void EpollLoop::Add(EpollEvent *event) { impl_->Add(event); }
-void EpollLoop::AddWait(EpollWait *wait) { impl_->AddWait(wait); }
+void EpollLoop::AddWait(EpollWait *wait) { waits_.push_back(wait); }
void EpollLoop::AddWatcher(EpollWatcher *watcher) {
- impl_->AddWatcher(watcher);
+ watchers_.push_back(watcher);
+}
+
+// Calculates the new timeout value to pass to epoll_wait.
+int EpollLoop::CalculateTimeout() {
+ const monotonic_clock::time_point monotonic_now = monotonic_clock::now();
+ int r = -1;
+ for (EpollWait *c : waits_) {
+ const int new_timeout = c->Recalculate(monotonic_now);
+ if (new_timeout >= 0) {
+ if (r < 0 || new_timeout < r) {
+ r = new_timeout;
+ }
+ }
+ }
+ return r;
}
} // namespace events
diff --git a/aos/vision/events/epoll_events.h b/aos/vision/events/epoll_events.h
index 1cb6842..d7574f9 100644
--- a/aos/vision/events/epoll_events.h
+++ b/aos/vision/events/epoll_events.h
@@ -1,11 +1,12 @@
#ifndef AOS_VISION_EVENTS_EPOLL_EVENTS_H_
#define AOS_VISION_EVENTS_EPOLL_EVENTS_H_
-#include <stdint.h>
#include <limits.h>
-
+#include <stdint.h>
#include <memory>
+#include <vector>
+#include "aos/common/scoped_fd.h"
#include "aos/common/time.h"
namespace aos {
@@ -66,8 +67,12 @@
// loop degrades into a busy loop.
virtual void ReadEvent() = 0;
+ EpollLoop *loop() { return loop_; }
+
private:
const int fd_;
+ friend class EpollLoop;
+ EpollLoop *loop_ = nullptr;
};
// Provides a way for code to be notified every time after events are handled by
@@ -82,11 +87,6 @@
};
// A file descriptor based event loop implemented with epoll.
-//
-// There is currently no way to remove events because that's hard
-// (have to deal with events that come in for a file descriptor after it's
-// removed, which means not closing the fd or destroying the object until the
-// epoll mechanism confirms its removal) and we don't have a use for it.
class EpollLoop {
public:
EpollLoop();
@@ -97,13 +97,28 @@
void Add(EpollEvent *event);
void AddWatcher(EpollWatcher *watcher);
+ // Delete event. Note that there are caveats here as this is
+ // not idiot proof.
+ // ie:
+ // - Do not call from other threads.
+ // - Do not free while the object could still receive events.
+ // - This is safe only because the events are set as edge.
+ // TODO(parker): The thread-safety of this should be investigated and
+ // improved as well as adding support for non-edge events if this is to
+ // be used more generally.
+ void Delete(EpollEvent *event);
+
// Loops forever, handling events.
void Run();
private:
- class Impl;
+ int epoll_fd() { return epoll_fd_.get(); }
- ::std::unique_ptr<Impl> impl_;
+ int CalculateTimeout();
+
+ ::aos::ScopedFD epoll_fd_;
+ ::std::vector<EpollWait *> waits_;
+ ::std::vector<EpollWatcher *> watchers_;
};
} // namespace events
diff --git a/aos/vision/events/intrusive_free_list.h b/aos/vision/events/intrusive_free_list.h
new file mode 100644
index 0000000..0373741
--- /dev/null
+++ b/aos/vision/events/intrusive_free_list.h
@@ -0,0 +1,81 @@
+#ifndef _AOS_VISION_EVENTS_INTRUSIVE_FREE_LIST_H_
+#define _AOS_VISION_EVENTS_INTRUSIVE_FREE_LIST_H_
+
+namespace aos {
+namespace events {
+
+// Hey! Maybe you want a doubly linked list that frees things for you!
+// This allows the entry to delete itself, removing it from the list, or
+// when the list gets destructed all the entries get destructed.
+//
+// class MyType : public intrusive_free_list<MyType>::element {
+// public:
+// MyType(int i, intrusive_free_list<MyType>* list)
+// : element(list, this), i_(i) {}
+// ~MyType() { printf("o%d\n", i_); }
+// private:
+// int i_;
+// };
+//
+// void test_fn() {
+// intrusive_free_list<MyType> free_list;
+// auto o0 = new MyType(0, &free_list);
+// auto o1 = new MyType(1, &free_list);
+// auto o2 = new MyType(2, &free_list);
+// auto o3 = new MyType(2, &free_list);
+//
+// delete o2;
+// delete o1;
+// }
+//
+// // This will print:
+// o2
+// o1
+// o3
+// o0
+//
+// Note that anything that was not manually freed (o0, o3) is
+// freed by the linked list destructor at the end. This ensures everything
+// is always destructed even entities not manually destructed.
+template <class T>
+class intrusive_free_list {
+ public:
+ class element {
+ public:
+ element(intrusive_free_list<T> *list, T *t) : list_(list), prev_(nullptr) {
+ next_ = list_->begin_;
+ if (next_) next_->prev_ = t;
+ list_->begin_ = t;
+ }
+ ~element() {
+ if (next_) next_->prev_ = prev_;
+ if (prev_) {
+ prev_->next_ = next_;
+ } else {
+ list_->begin_ = next_;
+ }
+ }
+ T *next() { return next_; }
+
+ private:
+ friend class intrusive_free_list<T>;
+ // Consider using static casts and a header element to save this pointer.
+ intrusive_free_list<T> *list_;
+ T *next_;
+ T *prev_;
+ };
+ intrusive_free_list() : begin_(nullptr) {}
+ ~intrusive_free_list() {
+ while (begin_) delete begin_;
+ }
+ T *begin() { return begin_; }
+
+ private:
+ friend class element;
+ T *begin_;
+};
+
+} // namespace events
+} // namespace aos
+
+#endif // _AOS_VISION_EVENTS_INTRUSIVE_FREE_LIST_H_
diff --git a/aos/vision/events/tcp_client.cc b/aos/vision/events/tcp_client.cc
new file mode 100644
index 0000000..41485f9
--- /dev/null
+++ b/aos/vision/events/tcp_client.cc
@@ -0,0 +1,67 @@
+#include "aos/vision/events/tcp_client.h"
+
+#include <arpa/inet.h>
+#include <errno.h>
+#include <fcntl.h>
+#include <netdb.h>
+#include <netinet/in.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <unistd.h>
+#include <unistd.h>
+
+#include "aos/common/logging/logging.h"
+
+namespace aos {
+namespace events {
+
+namespace {
+int MakeSocketNonBlocking(int sfd) {
+ int flags;
+
+ PCHECK(flags = fcntl(sfd, F_GETFL, 0));
+
+ flags |= O_NONBLOCK;
+ PCHECK(fcntl(sfd, F_SETFL, flags));
+
+ return 0;
+}
+
+int OpenClient(const char *hostname, int portno) {
+ int sockfd;
+ struct sockaddr_in serveraddr;
+ struct hostent *server;
+ /* socket: create the socket */
+ PCHECK(sockfd = socket(AF_INET, SOCK_STREAM, 0));
+
+ /* gethostbyname: get the server's DNS entry */
+ server = gethostbyname(hostname);
+ if (server == NULL) {
+ fprintf(stderr, "ERROR, no such host as %s\n", hostname);
+ exit(-1);
+ }
+
+ /* build the server's Internet address */
+ bzero((char *)&serveraddr, sizeof(serveraddr));
+ serveraddr.sin_family = AF_INET;
+ bcopy((char *)server->h_addr, (char *)&serveraddr.sin_addr.s_addr,
+ server->h_length);
+ serveraddr.sin_port = htons(portno);
+
+ /* connect: create a connection with the server */
+ PCHECK(connect(sockfd, (const struct sockaddr *)&serveraddr,
+ sizeof(serveraddr)));
+ PCHECK(MakeSocketNonBlocking(sockfd));
+
+ return sockfd;
+}
+} // namespace
+
+TcpClient::TcpClient(const char *hostname, int portno)
+ : EpollEvent(OpenClient(hostname, portno)) {}
+
+} // namespace events
+} // namespace aos
diff --git a/aos/vision/events/tcp_client.h b/aos/vision/events/tcp_client.h
new file mode 100644
index 0000000..e7b80a6
--- /dev/null
+++ b/aos/vision/events/tcp_client.h
@@ -0,0 +1,22 @@
+#ifndef _AOS_VISION_DEBUG_TCP_SERVER_H_
+#define _AOS_VISION_DEBUG_TCP_SERVER_H_
+
+#include "aos/vision/events/epoll_events.h"
+
+#include <memory>
+
+namespace aos {
+namespace events {
+
+// Handles the client connection logic to hostname:portno
+class TcpClient : public EpollEvent {
+ public:
+ TcpClient(const char *hostname, int portno);
+
+ // Implement ReadEvent from EpollEvent to use this class.
+};
+
+} // namespace events
+} // namespace aos
+
+#endif // _AOS_VISION_DEBUG_TCP_SERVER_H_
diff --git a/aos/vision/events/tcp_server.cc b/aos/vision/events/tcp_server.cc
new file mode 100644
index 0000000..06f3a41
--- /dev/null
+++ b/aos/vision/events/tcp_server.cc
@@ -0,0 +1,110 @@
+#include "aos/vision/events/tcp_server.h"
+
+#include <arpa/inet.h>
+#include <errno.h>
+#include <fcntl.h>
+#include <netdb.h>
+#include <netinet/in.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/epoll.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <unistd.h>
+
+#include "aos/common/logging/logging.h"
+
+namespace aos {
+namespace events {
+
+namespace {
+
+int MakeSocketNonBlocking(int sfd) {
+ int flags;
+
+ PCHECK(flags = fcntl(sfd, F_GETFL, 0));
+
+ flags |= O_NONBLOCK;
+ PCHECK(fcntl(sfd, F_SETFL, flags));
+
+ return 0;
+}
+} // namespace
+
+// This is all copied from somewhere.
+int TCPServerBase::SocketBindListenOnPort(int portno) {
+ int parentfd; /* parent socket */
+ struct sockaddr_in serveraddr; /* server's addr */
+ int optval; /* flag value for setsockopt */
+
+ /*
+ * socket: create the parent socket
+ */
+ PCHECK(parentfd = socket(AF_INET, SOCK_STREAM, 0));
+
+ /* setsockopt: Handy debugging trick that lets
+ * us rerun the server immediately after we kill it;
+ * otherwise we have to wait about 20 secs.
+ * Eliminates "ERROR on binding: Address already in use" error.
+ */
+ optval = 1;
+ PCHECK(setsockopt(parentfd, SOL_SOCKET, SO_REUSEADDR, (const void *)&optval,
+ sizeof(int)));
+
+ /*
+ * build the server's Internet address
+ */
+ bzero((char *)&serveraddr, sizeof(serveraddr));
+
+ /* this is an Internet address */
+ serveraddr.sin_family = AF_INET;
+
+ /* Listen on 0.0.0.0 */
+ serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
+
+ /* this is the port we will listen on */
+ serveraddr.sin_port = htons((uint16_t)portno);
+
+ /*
+ * bind: associate the parent socket with a port
+ */
+ PCHECK(bind(parentfd, (struct sockaddr *)&serveraddr, sizeof(serveraddr)));
+
+ PCHECK(listen(parentfd, SOMAXCONN));
+
+ LOG(INFO, "connected to port: %d on fd: %d\n", portno, parentfd);
+ return parentfd;
+}
+
+TCPServerBase::~TCPServerBase() { close(fd()); }
+
+void TCPServerBase::ReadEvent() {
+ /* We have a notification on the listening socket, which
+ means one or more incoming connections. */
+ struct sockaddr in_addr;
+ socklen_t in_len;
+ int infd;
+
+ in_len = sizeof in_addr;
+ infd = accept(fd(), &in_addr, &in_len);
+ if (infd == -1) {
+ if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) {
+ /* We have processed all incoming
+ connections. */
+ return;
+ } else {
+ PLOG(WARNING, "accept");
+ return;
+ }
+ }
+
+ /* Make the incoming socket non-blocking and add it to the
+ list of fds to monitor. */
+ PCHECK(MakeSocketNonBlocking(infd));
+
+ loop()->Add(Construct(infd));
+}
+
+} // namespace events
+} // namespace aos
diff --git a/aos/vision/events/tcp_server.h b/aos/vision/events/tcp_server.h
new file mode 100644
index 0000000..3116f55
--- /dev/null
+++ b/aos/vision/events/tcp_server.h
@@ -0,0 +1,77 @@
+#ifndef _AOS_VISION_EVENTS_TCP_SERVER_H_
+#define _AOS_VISION_EVENTS_TCP_SERVER_H_
+
+#include "aos/vision/events/epoll_events.h"
+#include "aos/vision/events/intrusive_free_list.h"
+
+#include <memory>
+#include <vector>
+
+namespace aos {
+namespace events {
+
+// Non-templatized base class of TCP server.
+// TCPServer implements Construct which specializes the client connection
+// based on the specific use-case.
+template <class T>
+class TCPServer;
+class SocketConnection;
+class TCPServerBase : public EpollEvent {
+ public:
+ TCPServerBase(int fd) : EpollEvent(fd) {}
+ ~TCPServerBase();
+
+ protected:
+ // Listens on port portno. File descriptor to
+ // accept on is returned.
+ static int SocketBindListenOnPort(int portno);
+
+ private:
+ virtual SocketConnection *Construct(int child_fd) = 0;
+ void ReadEvent() override;
+ friend class SocketConnection;
+ template <class T>
+ friend class TCPServer;
+ intrusive_free_list<SocketConnection> free_list;
+};
+
+// Base class for client connections. Clients are responsible for
+// deleting themselves once the connection is broken. This will remove
+// the entry from the free list.
+class SocketConnection : public EpollEvent,
+ public intrusive_free_list<SocketConnection>::element {
+ public:
+ SocketConnection(TCPServerBase *server, int fd)
+ : EpollEvent(fd), element(&server->free_list, this) {}
+};
+
+// T should be a subclass of SocketConnection.
+template <class T>
+class TCPServer : public TCPServerBase {
+ public:
+ TCPServer(int port) : TCPServerBase(SocketBindListenOnPort(port)) {}
+ SocketConnection *Construct(int child_fd) override {
+ return new T(this, child_fd);
+ }
+
+ static std::unique_ptr<TCPServer<T>> Make(int port) {
+ return std::unique_ptr<TCPServer<T>>(new TCPServer<T>(port));
+ }
+
+ // Call blk on each entry of the free-list. This is used to send a message
+ // to all clients.
+ template <typename EachBlock>
+ void Broadcast(const EachBlock &blk) {
+ auto a = free_list.begin();
+ while (a) {
+ auto client = static_cast<T *>(a);
+ blk(client);
+ a = a->next();
+ }
+ }
+};
+
+} // namespace events
+} // namespace aos
+
+#endif // _AOS_VISION_EVENTS_TCP_SERVER_H_