Squashed 'third_party/seasocks/' content from commit 016dc60
Change-Id: I195fa5bfd0c0e3cc66fbbefcc7b5170bafcf7a36
git-subtree-dir: third_party/seasocks
git-subtree-split: 016dc60b247e0d1d563aea6d22a9075e6884ab9f
diff --git a/src/main/c/Server.cpp b/src/main/c/Server.cpp
new file mode 100644
index 0000000..5ad1c30
--- /dev/null
+++ b/src/main/c/Server.cpp
@@ -0,0 +1,568 @@
+// Copyright (c) 2013, Matt Godbolt
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are met:
+//
+// Redistributions of source code must retain the above copyright notice, this
+// list of conditions and the following disclaimer.
+//
+// Redistributions in binary form must reproduce the above copyright notice,
+// this list of conditions and the following disclaimer in the documentation
+// and/or other materials provided with the distribution.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
+// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+// POSSIBILITY OF SUCH DAMAGE.
+
+#include "internal/LogStream.h"
+
+#include "seasocks/Connection.h"
+#include "seasocks/Logger.h"
+#include "seasocks/Server.h"
+#include "seasocks/PageHandler.h"
+#include "seasocks/StringUtil.h"
+#include "seasocks/util/Json.h"
+
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+
+#include <sys/epoll.h>
+#include <sys/eventfd.h>
+#include <sys/ioctl.h>
+#include <sys/socket.h>
+#include <sys/syscall.h>
+
+#include <memory>
+#include <stdexcept>
+#include <string.h>
+#include <unistd.h>
+
+namespace {
+
+struct EventBits {
+ uint32_t bits;
+ explicit EventBits(uint32_t bits) : bits(bits) {}
+};
+
+std::ostream& operator <<(std::ostream& o, const EventBits& b) {
+ uint32_t bits = b.bits;
+#define DO_BIT(NAME) \
+ do { if (bits & (NAME)) { if (bits != b.bits) {o << ", "; } o << #NAME; bits &= ~(NAME); } } while (0)
+ DO_BIT(EPOLLIN);
+ DO_BIT(EPOLLPRI);
+ DO_BIT(EPOLLOUT);
+ DO_BIT(EPOLLRDNORM);
+ DO_BIT(EPOLLRDBAND);
+ DO_BIT(EPOLLWRNORM);
+ DO_BIT(EPOLLWRBAND);
+ DO_BIT(EPOLLMSG);
+ DO_BIT(EPOLLERR);
+ DO_BIT(EPOLLHUP);
+#ifdef EPOLLRDHUP
+ DO_BIT(EPOLLRDHUP);
+#endif
+ DO_BIT(EPOLLONESHOT);
+ DO_BIT(EPOLLET);
+#undef DO_BIT
+ return o;
+}
+
+const int EpollTimeoutMillis = 500; // Twice a second is ample.
+const int DefaultLameConnectionTimeoutSeconds = 10;
+int gettid() {
+ return syscall(SYS_gettid);
+}
+
+}
+
+namespace seasocks {
+
+Server::Server(std::shared_ptr<Logger> logger)
+: _logger(logger), _listenSock(-1), _epollFd(-1), _eventFd(-1),
+ _maxKeepAliveDrops(0),
+ _lameConnectionTimeoutSeconds(DefaultLameConnectionTimeoutSeconds),
+ _nextDeadConnectionCheck(0), _threadId(0), _terminate(false),
+ _expectedTerminate(false) {
+
+ _epollFd = epoll_create(10);
+ if (_epollFd == -1) {
+ LS_ERROR(_logger, "Unable to create epoll: " << getLastError());
+ return;
+ }
+
+ _eventFd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
+ if (_eventFd == -1) {
+ LS_ERROR(_logger, "Unable to create event FD: " << getLastError());
+ return;
+ }
+
+ epoll_event eventWake = { EPOLLIN, { &_eventFd } };
+ if (epoll_ctl(_epollFd, EPOLL_CTL_ADD, _eventFd, &eventWake) == -1) {
+ LS_ERROR(_logger, "Unable to add wake socket to epoll: " << getLastError());
+ return;
+ }
+}
+
+Server::~Server() {
+ LS_INFO(_logger, "Server destruction");
+ shutdown();
+ // Only shut the eventfd and epoll at the very end
+ if (_eventFd != -1) {
+ close(_eventFd);
+ }
+ if (_epollFd != -1) {
+ close(_epollFd);
+ }
+}
+
+void Server::shutdown() {
+ // Stop listening to any further incoming connections.
+ if (_listenSock != -1) {
+ close(_listenSock);
+ _listenSock = -1;
+ }
+ // Disconnect and close any current connections.
+ while (!_connections.empty()) {
+ // Deleting the connection closes it and removes it from 'this'.
+ Connection* toBeClosed = _connections.begin()->first;
+ toBeClosed->setLinger();
+ delete toBeClosed;
+ }
+}
+
+bool Server::makeNonBlocking(int fd) const {
+ int yesPlease = 1;
+ if (ioctl(fd, FIONBIO, &yesPlease) != 0) {
+ LS_ERROR(_logger, "Unable to make FD non-blocking: " << getLastError());
+ return false;
+ }
+ return true;
+}
+
+bool Server::configureSocket(int fd) const {
+ if (!makeNonBlocking(fd)) {
+ return false;
+ }
+ const int yesPlease = 1;
+ if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yesPlease, sizeof(yesPlease)) == -1) {
+ LS_ERROR(_logger, "Unable to set reuse socket option: " << getLastError());
+ return false;
+ }
+ if (_maxKeepAliveDrops > 0) {
+ if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &yesPlease, sizeof(yesPlease)) == -1) {
+ LS_ERROR(_logger, "Unable to enable keepalive: " << getLastError());
+ return false;
+ }
+ const int oneSecond = 1;
+ if (setsockopt(fd, IPPROTO_TCP, TCP_KEEPIDLE, &oneSecond, sizeof(oneSecond)) == -1) {
+ LS_ERROR(_logger, "Unable to set idle probe: " << getLastError());
+ return false;
+ }
+ if (setsockopt(fd, IPPROTO_TCP, TCP_KEEPINTVL, &oneSecond, sizeof(oneSecond)) == -1) {
+ LS_ERROR(_logger, "Unable to set idle interval: " << getLastError());
+ return false;
+ }
+ if (setsockopt(fd, IPPROTO_TCP, TCP_KEEPCNT, &_maxKeepAliveDrops, sizeof(_maxKeepAliveDrops)) == -1) {
+ LS_ERROR(_logger, "Unable to set keep alive count: " << getLastError());
+ return false;
+ }
+ }
+ return true;
+}
+
+void Server::terminate() {
+ _expectedTerminate = true;
+ _terminate = true;
+ uint64_t one = 1;
+ if (_eventFd != -1 && ::write(_eventFd, &one, sizeof(one)) == -1) {
+ LS_ERROR(_logger, "Unable to post a wake event: " << getLastError());
+ }
+}
+
+bool Server::startListening(int port) {
+ return startListening(INADDR_ANY, port);
+}
+
+bool Server::startListening(uint32_t hostAddr, int port) {
+ if (_epollFd == -1 || _eventFd == -1) {
+ LS_ERROR(_logger, "Unable to serve, did not initialize properly.");
+ return false;
+ }
+
+ _listenSock = socket(AF_INET, SOCK_STREAM, 0);
+ if (_listenSock == -1) {
+ LS_ERROR(_logger, "Unable to create listen socket: " << getLastError());
+ return false;
+ }
+ if (!configureSocket(_listenSock)) {
+ return false;
+ }
+ sockaddr_in sock;
+ memset(&sock, 0, sizeof(sock));
+ sock.sin_port = htons(port);
+ sock.sin_addr.s_addr = htonl(hostAddr);
+ sock.sin_family = AF_INET;
+ if (bind(_listenSock, reinterpret_cast<const sockaddr*>(&sock), sizeof(sock)) == -1) {
+ LS_ERROR(_logger, "Unable to bind socket: " << getLastError());
+ return false;
+ }
+ if (listen(_listenSock, 5) == -1) {
+ LS_ERROR(_logger, "Unable to listen on socket: " << getLastError());
+ return false;
+ }
+ epoll_event event = { EPOLLIN, { this } };
+ if (epoll_ctl(_epollFd, EPOLL_CTL_ADD, _listenSock, &event) == -1) {
+ LS_ERROR(_logger, "Unable to add listen socket to epoll: " << getLastError());
+ return false;
+ }
+
+ char buf[1024];
+ ::gethostname(buf, sizeof(buf));
+ LS_INFO(_logger, "Listening on http://" << buf << ":" << port << "/");
+
+ return true;
+}
+
+void Server::handlePipe() {
+ uint64_t dummy;
+ while (::read(_eventFd, &dummy, sizeof(dummy)) != -1) {
+ // Spin, draining the pipe until it returns EWOULDBLOCK or similar.
+ }
+ if (errno != EAGAIN || errno != EWOULDBLOCK) {
+ LS_ERROR(_logger, "Error from wakeFd read: " << getLastError());
+ _terminate = true;
+ }
+ // It's a "wake up" event; this will just cause the epoll loop to wake up.
+}
+
+Server::NewState Server::handleConnectionEvents(Connection* connection, uint32_t events) {
+ if (events & ~(EPOLLIN|EPOLLOUT|EPOLLHUP|EPOLLERR)) {
+ LS_WARNING(_logger, "Got unhandled epoll event (" << EventBits(events) << ") on connection: "
+ << formatAddress(connection->getRemoteAddress()));
+ return Close;
+ } else if (events & EPOLLERR) {
+ LS_INFO(_logger, "Error on socket (" << EventBits(events) << "): "
+ << formatAddress(connection->getRemoteAddress()));
+ return Close;
+ } else if (events & EPOLLHUP) {
+ LS_DEBUG(_logger, "Graceful hang-up (" << EventBits(events) << ") of socket: "
+ << formatAddress(connection->getRemoteAddress()));
+ return Close;
+ } else {
+ if (events & EPOLLOUT) {
+ connection->handleDataReadyForWrite();
+ }
+ if (events & EPOLLIN) {
+ connection->handleDataReadyForRead();
+ }
+ }
+ return KeepOpen;
+}
+
+void Server::checkAndDispatchEpoll(int epollMillis) {
+ const int maxEvents = 256;
+ epoll_event events[maxEvents];
+
+ std::list<Connection*> toBeDeleted;
+ int numEvents = epoll_wait(_epollFd, events, maxEvents, epollMillis);
+ if (numEvents == -1) {
+ if (errno != EINTR) {
+ LS_ERROR(_logger, "Error from epoll_wait: " << getLastError());
+ }
+ return;
+ }
+ if (numEvents == maxEvents) {
+ static time_t lastWarnTime = 0;
+ time_t now = time(NULL);
+ if (now - lastWarnTime >= 60) {
+ LS_WARNING(_logger, "Full event queue; may start starving connections. "
+ "Will warn at most once a minute");
+ lastWarnTime = now;
+ }
+ }
+ for (int i = 0; i < numEvents; ++i) {
+ if (events[i].data.ptr == this) {
+ if (events[i].events & ~EPOLLIN) {
+ LS_SEVERE(_logger, "Got unexpected event on listening socket ("
+ << EventBits(events[i].events) << ") - terminating");
+ _terminate = true;
+ break;
+ }
+ handleAccept();
+ } else if (events[i].data.ptr == &_eventFd) {
+ if (events[i].events & ~EPOLLIN) {
+ LS_SEVERE(_logger, "Got unexpected event on management pipe ("
+ << EventBits(events[i].events) << ") - terminating");
+ _terminate = true;
+ break;
+ }
+ handlePipe();
+ } else {
+ auto connection = reinterpret_cast<Connection*>(events[i].data.ptr);
+ if (handleConnectionEvents(connection, events[i].events) == Close) {
+ toBeDeleted.push_back(connection);
+ }
+ }
+ }
+ // The connections are all deleted at the end so we've processed any other subject's
+ // closes etc before we call onDisconnect().
+ for (auto it = toBeDeleted.begin(); it != toBeDeleted.end(); ++it) {
+ auto connection = *it;
+ if (_connections.find(connection) == _connections.end()) {
+ LS_SEVERE(_logger, "Attempt to delete connection we didn't know about: " << (void*)connection
+ << formatAddress(connection->getRemoteAddress()));
+ _terminate = true;
+ break;
+ }
+ LS_DEBUG(_logger, "Deleting connection: " << formatAddress(connection->getRemoteAddress()));
+ delete connection;
+ }
+}
+
+void Server::setStaticPath(const char* staticPath) {
+ LS_INFO(_logger, "Serving content from " << staticPath);
+ _staticPath = staticPath;
+}
+
+bool Server::serve(const char* staticPath, int port) {
+ setStaticPath(staticPath);
+ if (!startListening(port)) {
+ return false;
+ }
+
+ return loop();
+}
+
+bool Server::loop() {
+ if (_listenSock == -1) {
+ LS_ERROR(_logger, "Server not initialised");
+ return false;
+ }
+
+ // Stash away "the" server thread id.
+ _threadId = gettid();
+
+ while (!_terminate) {
+ // Always process events first to catch start up events.
+ processEventQueue();
+ checkAndDispatchEpoll(EpollTimeoutMillis);
+ }
+ // Reasonable effort to ensure anything enqueued during terminate has a chance to run.
+ processEventQueue();
+ LS_INFO(_logger, "Server terminating");
+ shutdown();
+ return _expectedTerminate;
+}
+
+Server::PollResult Server::poll(int millis) {
+ // Grab the thread ID on the first poll.
+ if (_threadId == 0) _threadId = gettid();
+ if (_threadId != gettid()) {
+ LS_ERROR(_logger, "poll() called from the wrong thread");
+ return PollResult::Error;
+ }
+ if (_listenSock == -1) {
+ LS_ERROR(_logger, "Server not initialised");
+ return PollResult::Error;
+ }
+ processEventQueue();
+ checkAndDispatchEpoll(millis);
+ if (!_terminate) return PollResult::Continue;
+
+ // Reasonable effort to ensure anything enqueued during terminate has a chance to run.
+ processEventQueue();
+ LS_INFO(_logger, "Server terminating");
+ shutdown();
+
+ return _expectedTerminate ? PollResult::Terminated : PollResult::Error;
+}
+
+void Server::processEventQueue() {
+ for (;;) {
+ std::shared_ptr<Runnable> runnable = popNextRunnable();
+ if (!runnable) break;
+ runnable->run();
+ }
+ time_t now = time(NULL);
+ if (now >= _nextDeadConnectionCheck) {
+ std::list<Connection*> toRemove;
+ for (auto it = _connections.cbegin(); it != _connections.cend(); ++it) {
+ time_t numSecondsSinceConnection = now - it->second;
+ auto connection = it->first;
+ if (connection->bytesReceived() == 0 && numSecondsSinceConnection >= _lameConnectionTimeoutSeconds) {
+ LS_INFO(_logger, formatAddress(connection->getRemoteAddress())
+ << " : Killing lame connection - no bytes received after " << numSecondsSinceConnection << "s");
+ toRemove.push_back(connection);
+ }
+ }
+ for (auto it = toRemove.begin(); it != toRemove.end(); ++it) {
+ delete *it;
+ }
+ }
+}
+
+void Server::handleAccept() {
+ sockaddr_in address;
+ socklen_t addrLen = sizeof(address);
+ int fd = ::accept(_listenSock,
+ reinterpret_cast<sockaddr*>(&address),
+ &addrLen);
+ if (fd == -1) {
+ LS_ERROR(_logger, "Unable to accept: " << getLastError());
+ return;
+ }
+ if (!configureSocket(fd)) {
+ ::close(fd);
+ return;
+ }
+ LS_INFO(_logger, formatAddress(address) << " : Accepted on descriptor " << fd);
+ Connection* newConnection = new Connection(_logger, *this, fd, address);
+ epoll_event event = { EPOLLIN, { newConnection } };
+ if (epoll_ctl(_epollFd, EPOLL_CTL_ADD, fd, &event) == -1) {
+ LS_ERROR(_logger, "Unable to add socket to epoll: " << getLastError());
+ delete newConnection;
+ ::close(fd);
+ return;
+ }
+ _connections.insert(std::make_pair(newConnection, time(NULL)));
+}
+
+void Server::remove(Connection* connection) {
+ checkThread();
+ epoll_event event = { 0, { connection } };
+ if (epoll_ctl(_epollFd, EPOLL_CTL_DEL, connection->getFd(), &event) == -1) {
+ LS_ERROR(_logger, "Unable to remove from epoll: " << getLastError());
+ }
+ _connections.erase(connection);
+}
+
+bool Server::subscribeToWriteEvents(Connection* connection) {
+ epoll_event event = { EPOLLIN | EPOLLOUT, { connection } };
+ if (epoll_ctl(_epollFd, EPOLL_CTL_MOD, connection->getFd(), &event) == -1) {
+ LS_ERROR(_logger, "Unable to subscribe to write events: " << getLastError());
+ return false;
+ }
+ return true;
+}
+
+bool Server::unsubscribeFromWriteEvents(Connection* connection) {
+ epoll_event event = { EPOLLIN, { connection } };
+ if (epoll_ctl(_epollFd, EPOLL_CTL_MOD, connection->getFd(), &event) == -1) {
+ LS_ERROR(_logger, "Unable to unsubscribe from write events: " << getLastError());
+ return false;
+ }
+ return true;
+}
+
+void Server::addWebSocketHandler(const char* endpoint, std::shared_ptr<WebSocket::Handler> handler,
+ bool allowCrossOriginRequests) {
+ _webSocketHandlerMap[endpoint] = { handler, allowCrossOriginRequests };
+}
+
+void Server::addPageHandler(std::shared_ptr<PageHandler> handler) {
+ _pageHandlers.emplace_back(handler);
+}
+
+bool Server::isCrossOriginAllowed(const std::string &endpoint) const {
+ auto splits = split(endpoint, '?');
+ auto iter = _webSocketHandlerMap.find(splits[0]);
+ if (iter == _webSocketHandlerMap.end()) {
+ return false;
+ }
+ return iter->second.allowCrossOrigin;
+}
+
+std::shared_ptr<WebSocket::Handler> Server::getWebSocketHandler(const char* endpoint) const {
+ auto splits = split(endpoint, '?');
+ auto iter = _webSocketHandlerMap.find(splits[0]);
+ if (iter == _webSocketHandlerMap.end()) {
+ return std::shared_ptr<WebSocket::Handler>();
+ }
+ return iter->second.handler;
+}
+
+void Server::execute(std::shared_ptr<Runnable> runnable) {
+ std::unique_lock<decltype(_pendingRunnableMutex)> lock(_pendingRunnableMutex);
+ _pendingRunnables.push_back(runnable);
+ lock.unlock();
+
+ uint64_t one = 1;
+ if (_eventFd != -1 && ::write(_eventFd, &one, sizeof(one)) == -1) {
+ if (errno != EAGAIN && errno != EWOULDBLOCK) {
+ LS_ERROR(_logger, "Unable to post a wake event: " << getLastError());
+ }
+ }
+}
+
+std::shared_ptr<Server::Runnable> Server::popNextRunnable() {
+ std::lock_guard<decltype(_pendingRunnableMutex)> lock(_pendingRunnableMutex);
+ std::shared_ptr<Runnable> runnable;
+ if (!_pendingRunnables.empty()) {
+ runnable = _pendingRunnables.front();
+ _pendingRunnables.pop_front();
+ }
+ return runnable;
+}
+
+std::string Server::getStatsDocument() const {
+ std::ostringstream doc;
+ doc << "clear();" << std::endl;
+ for (auto it = _connections.begin(); it != _connections.end(); ++it) {
+ doc << "connection({";
+ auto connection = it->first;
+ jsonKeyPairToStream(doc,
+ "since", EpochTimeAsLocal(it->second),
+ "fd", connection->getFd(),
+ "id", reinterpret_cast<uint64_t>(connection),
+ "uri", connection->getRequestUri(),
+ "addr", formatAddress(connection->getRemoteAddress()),
+ "user", connection->credentials() ?
+ connection->credentials()->username : "(not authed)",
+ "input", connection->inputBufferSize(),
+ "read", connection->bytesReceived(),
+ "output", connection->outputBufferSize(),
+ "written", connection->bytesSent()
+ );
+ doc << "});" << std::endl;
+ }
+ return doc.str();
+}
+
+void Server::setLameConnectionTimeoutSeconds(int seconds) {
+ LS_INFO(_logger, "Setting lame connection timeout to " << seconds);
+ _lameConnectionTimeoutSeconds = seconds;
+}
+
+void Server::setMaxKeepAliveDrops(int maxKeepAliveDrops) {
+ LS_INFO(_logger, "Setting max keep alive drops to " << maxKeepAliveDrops);
+ _maxKeepAliveDrops = maxKeepAliveDrops;
+}
+
+void Server::checkThread() const {
+ auto thisTid = gettid();
+ if (thisTid != _threadId) {
+ std::ostringstream o;
+ o << "seasocks called on wrong thread : " << thisTid << " instead of " << _threadId;
+ LS_SEVERE(_logger, o.str());
+ throw std::runtime_error(o.str());
+ }
+}
+
+std::shared_ptr<Response> Server::handle(const Request &request) {
+ for (auto handler : _pageHandlers) {
+ auto result = handler->handle(request);
+ if (result != Response::unhandled()) return result;
+ }
+ return Response::unhandled();
+}
+
+} // namespace seasocks