switched from fitpc/atom to prime/linux
Also removed a few old things that had nothing reasonable to be changed
to.
diff --git a/aos/linux_code/README.txt b/aos/linux_code/README.txt
new file mode 100644
index 0000000..ebaf40e
--- /dev/null
+++ b/aos/linux_code/README.txt
@@ -0,0 +1,15 @@
+see ../README.txt for stuff affecting crio and linux code
+
+The folder is called linux_code because it mainly deals with code that uses the queue system, which only works under GNU/Linux for a variety of reasons, some fundamental (futexes) and some because nobody bothers to fix them.
+The layout is designed with multiple linux boxes in mind.
+
+The one that talks to the cRIO etc is called the prime. We have multiple explanations for that name:
+ It is the primary controller.
+ PRIME/Primary Robot Intelligent Management Entity
+ Represents Optimus Prime, one of the good transformers who battle the bad ones that 254 names robots after.
+ It is easy to type and doesn't conflict with anything else common for tab-completion.
+ It's not hardware-specific.
+
+[NOTES]
+Any code should call aos::Init() (or aos::InitNRT() for processes that don't need to be realtime) before making any calls to any of the aos functions.
+Making calls to any of the aos functions (including aos::Init()) from more than 1 thread per process is not supported, but using fork(2) after some aos functions have been called and then continuing to make aos function calls (without calling one of the exec(3) functions) in both processes is supported.
diff --git a/aos/linux_code/camera/Buffers.cpp b/aos/linux_code/camera/Buffers.cpp
new file mode 100644
index 0000000..e1d22b6
--- /dev/null
+++ b/aos/linux_code/camera/Buffers.cpp
@@ -0,0 +1,158 @@
+#include "aos/linux_code/camera/Buffers.h"
+
+#include <sys/mman.h>
+
+#include "aos/linux_code/camera/V4L2.h"
+#include "aos/common/logging/logging.h"
+
+namespace aos {
+namespace camera {
+
+// Represents an actual v4l2 buffer.
+struct Buffers::Buffer {
+ void *start;
+ size_t length; // for munmap
+};
+const std::string Buffers::kFDServerName("/tmp/aos_fd_server");
+const std::string Buffers::kQueueName("CameraBufferQueue");
+
+int Buffers::CreateSocket(int (*bind_connect)(int, const sockaddr *, socklen_t)) {
+ union af_unix_sockaddr {
+ sockaddr_un un;
+ sockaddr addr;
+ } addr;
+ const int r = socket(AF_UNIX, SOCK_STREAM, 0);
+ if (r == -1) {
+ LOG(FATAL, "socket(AF_UNIX, SOCK_STREAM, 0) failed with %d: %s\n",
+ errno, strerror(errno));
+ }
+ addr.un.sun_family = AF_UNIX;
+ memset(addr.un.sun_path, 0, sizeof(addr.un.sun_path));
+ strcpy(addr.un.sun_path, kFDServerName.c_str());
+ if (bind_connect(r, &addr.addr, sizeof(addr.un)) == -1) {
+ LOG(FATAL, "bind_connect(=%p)(%d, %p, %zd) failed with %d: %s\n",
+ bind_connect, r, &addr.addr, sizeof(addr.un), errno, strerror(errno));
+ }
+ return r;
+}
+
+void Buffers::MMap() {
+ buffers_ = new Buffer[kNumBuffers];
+ v4l2_buffer buf;
+ for (unsigned int n = 0; n < kNumBuffers; ++n) {
+ memset(&buf, 0, sizeof(buf));
+ buf.type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
+ buf.memory = V4L2_MEMORY_MMAP;
+ buf.index = n;
+ if (xioctl(fd_, VIDIOC_QUERYBUF, &buf) == -1) {
+ LOG(FATAL, "ioctl VIDIOC_QUERYBUF(%d, %p) failed with %d: %s\n",
+ fd_, &buf, errno, strerror(errno));
+ }
+ buffers_[n].length = buf.length;
+ buffers_[n].start = mmap(NULL, buf.length,
+ PROT_READ | PROT_WRITE, MAP_SHARED,
+ fd_, buf.m.offset);
+ if (buffers_[n].start == MAP_FAILED) {
+ LOG(FATAL, "mmap(NULL, %zd, PROT_READ | PROT_WRITE, MAP_SHARED, %d, %jd)"
+ " failed with %d: %s\n", buf.length, fd_, static_cast<intmax_t>(buf.m.offset),
+ errno, strerror(errno));
+ }
+ }
+}
+
+void Buffers::Release() {
+ if (message_ != NULL) {
+ queue_->FreeMessage(message_);
+ message_ = NULL;
+ }
+}
+const void *Buffers::GetNext(bool block,
+ uint32_t *bytesused, timeval *timestamp, uint32_t *sequence) {
+ Release();
+
+ // TODO(brians) make sure the camera reader process hasn't died
+ do {
+ if (block) {
+ message_ = static_cast<const Message *>(queue_->ReadMessage(
+ RawQueue::kPeek | RawQueue::kBlock));
+ } else {
+ static int index = 0;
+ message_ = static_cast<const Message *>(queue_->ReadMessageIndex(
+ RawQueue::kBlock, &index));
+ }
+ } while (block && message_ == NULL);
+ if (message_ != NULL) {
+ if (bytesused != NULL) memcpy(bytesused, &message_->bytesused, sizeof(*bytesused));
+ if (timestamp != NULL) memcpy(timestamp, &message_->timestamp, sizeof(*timestamp));
+ if (sequence != NULL) memcpy(sequence, &message_->sequence, sizeof(*sequence));
+ return buffers_[message_->index].start;
+ } else {
+ return NULL;
+ }
+}
+
+int Buffers::FetchFD() {
+ int myfds[Buffers::kNumFDs]; // where to retrieve the fds into
+ char buf[CMSG_SPACE(sizeof(myfds))]; // ancillary data buffer
+
+ iovec data;
+ memset(&data, 0, sizeof(data));
+ char dummy;
+ data.iov_base = &dummy;
+ data.iov_len = sizeof(dummy);
+ msghdr msg;
+ memset(&msg, 0, sizeof(msg));
+ msg.msg_iov = &data;
+ msg.msg_iovlen = 1;
+ msg.msg_control = buf;
+ msg.msg_controllen = sizeof(buf);
+
+ switch (recvmsg(server_, &msg, 0)) {
+ case 0: // "the peer has performed an orderly shutdown"
+ LOG(FATAL, "the fd server shut down (connected on %d)\n", server_);
+ case -1:
+ LOG(FATAL, "recvmsg(server_(=%d), %p, 0) failed with %d: %s\n",
+ server_, &msg, errno, strerror(errno));
+ }
+ const cmsghdr *const cmsg = CMSG_FIRSTHDR(&msg);
+ if (cmsg == NULL) {
+ LOG(FATAL, "no headers in message\n");
+ }
+ if (cmsg->cmsg_len != CMSG_LEN(sizeof(myfds))) {
+ LOG(FATAL, "got wrong size. got %d but expected %zd\n",
+ cmsg->cmsg_len, CMSG_LEN(sizeof(myfds)));
+ }
+ if (cmsg->cmsg_level != SOL_SOCKET) {
+ LOG(FATAL, "cmsg_level=%d. expected SOL_SOCKET(=%d)\n", cmsg->cmsg_level, SOL_SOCKET);
+ }
+ if (cmsg->cmsg_type != SCM_RIGHTS) {
+ LOG(FATAL, "cmsg_type=%d. expected SCM_RIGHTS(=%d)\n", cmsg->cmsg_type, SCM_RIGHTS);
+ }
+ memcpy(myfds, CMSG_DATA(cmsg), sizeof(myfds));
+
+ return myfds[0];
+}
+Buffers::Buffers() : server_(CreateSocket(connect)), fd_(FetchFD()), message_(NULL) {
+ MMap();
+ queue_ = RawQueue::Fetch(kQueueName.c_str(), sizeof(Message), 971, 1);
+}
+
+Buffers::~Buffers() {
+ Release();
+
+ for (unsigned i = 0; i < kNumBuffers; ++i) {
+ if (munmap(buffers_[i].start, buffers_[i].length) == -1) {
+ LOG(WARNING, "munmap(%p, %zd) for destruction failed with %d: %s\n",
+ buffers_[i].start, buffers_[i].length, errno, strerror(errno));
+ }
+ }
+ delete[] buffers_;
+
+ if (close(fd_) == -1) {
+ LOG(WARNING, "close(%d) for destruction failed with %d: %s\n",
+ fd_, errno, strerror(errno));
+ }
+}
+
+} // namespace camera
+} // namespace aos
diff --git a/aos/linux_code/camera/Buffers.h b/aos/linux_code/camera/Buffers.h
new file mode 100644
index 0000000..b447468
--- /dev/null
+++ b/aos/linux_code/camera/Buffers.h
@@ -0,0 +1,92 @@
+#ifndef AOS_LINUX_CODE_CAMERA_CAMERA_BUFFERS_H_
+#define AOS_LINUX_CODE_CAMERA_CAMERA_BUFFERS_H_
+
+#include <sys/socket.h>
+#include <sys/un.h>
+
+#include <string>
+
+#include "aos/linux_code/ipc_lib/queue.h"
+#include "aos/common/type_traits.h"
+
+namespace aos {
+namespace camera {
+
+class Reader;
+class Buffers {
+ // It has to do a lot of the same things as all the other ones, but it gets
+ // the information from different places (some of it gets sent out by it).
+ friend class Reader;
+ // Not an abstract name so that an existing one can just be unlinked without
+ // disturbing it if necessary (like with shm_link).
+ static const std::string kFDServerName;
+ // How many v4l2 buffers and other things that depend on that.
+ static const unsigned int kNumBuffers = 10;
+ // How many fds to transfer from the fd server.
+ // Used to make it clear which 1s are 1 and which are this.
+ static const size_t kNumFDs = 1;
+ // Creates a socket and calls either bind or connect.
+ // Returns a bound/connected socket or -1 (the reason for which will already
+ // have been logged at ERROR).
+ static int CreateSocket(int (*bind_connect)(int, const sockaddr *, socklen_t));
+
+ // File descriptor connected to the fd server. Used for detecting if the
+ // camera reading process has died.
+ // A value of -2 means don't check.
+ const int server_;
+ // Gets the fd (using server_).
+ int FetchFD();
+ // File descriptor for the v4l2 device (that's valid in this process of
+ // course).
+ const int fd_;
+
+ struct Buffer;
+ // Buffer[kNumBuffers]
+ Buffer *buffers_;
+ struct Message {
+ uint32_t index;
+ uint32_t bytesused;
+ timeval timestamp;
+ uint32_t sequence;
+ };
+ static_assert(shm_ok<Message>::value, "it's going through queues");
+ // The current one. Sometimes NULL.
+ const Message *message_;
+ static const std::string kQueueName;
+ // NULL for the Reader one.
+ RawQueue *queue_;
+ // Make the actual mmap calls.
+ // Called by Buffers() automatically.
+ void MMap();
+ public:
+ Buffers();
+ // Will clean everything up.
+ // So that HTTPStreamer can create/destroy one for each client to make it
+ // simpler.
+ ~Buffers();
+
+ // Retrieves the next image. Will return the current one if it hasn't yet.
+ // Calls Release() at the beginning.
+ // NOTE: this means that the caller can't keep using references to the old
+ // return value after calling this function again
+ // block is whether to return NULL or wait for a new one
+ // the last 3 output parameters will be filled in straight from the
+ // v4l2_buffer if they're not NULL
+ // (see <http://v4l2spec.bytesex.org/spec/x5953.htm#V4L2-BUFFER> for details)
+ // NOTE: guaranteed to return a valid pointer if block is true
+ const void *GetNext(bool block,
+ uint32_t *bytesused, timeval *timestamp, uint32_t *sequence);
+ // Releases the most recent frame buffer. Automatically called by GetNext and
+ // the destructor. Safe to call multiple times without getting frames in
+ // between.
+ void Release();
+
+ // How big images are.
+ static const int32_t kWidth = 320, kHeight = 240;
+};
+
+} // namespace camera
+} // namespace aos
+
+#endif
+
diff --git a/aos/linux_code/camera/HTTPStreamer.cpp b/aos/linux_code/camera/HTTPStreamer.cpp
new file mode 100644
index 0000000..0b0c65f
--- /dev/null
+++ b/aos/linux_code/camera/HTTPStreamer.cpp
@@ -0,0 +1,394 @@
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <errno.h>
+#include <malloc.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <sys/time.h>
+#include <sys/mman.h>
+#include <netinet/ip.h>
+#include <sys/socket.h>
+#include <inttypes.h>
+
+#include <vector>
+
+#include "aos/common/network_port.h"
+#include "aos/linux_code/init.h"
+#include "aos/linux_code/camera/Buffers.h"
+#include "aos/common/logging/logging.h"
+
+namespace aos {
+namespace camera {
+
+namespace {
+
+// doesn't like being a static class member
+static const unsigned char dht_table[] = {
+ 0xff, 0xc4, 0x01, 0xa2, 0x00, 0x00, 0x01, 0x05, 0x01, 0x01, 0x01, 0x01,
+ 0x01, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x02,
+ 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b, 0x01, 0x00, 0x03,
+ 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x00, 0x00, 0x00,
+ 0x00, 0x00, 0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09,
+ 0x0a, 0x0b, 0x10, 0x00, 0x02, 0x01, 0x03, 0x03, 0x02, 0x04, 0x03, 0x05,
+ 0x05, 0x04, 0x04, 0x00, 0x00, 0x01, 0x7d, 0x01, 0x02, 0x03, 0x00, 0x04,
+ 0x11, 0x05, 0x12, 0x21, 0x31, 0x41, 0x06, 0x13, 0x51, 0x61, 0x07, 0x22,
+ 0x71, 0x14, 0x32, 0x81, 0x91, 0xa1, 0x08, 0x23, 0x42, 0xb1, 0xc1, 0x15,
+ 0x52, 0xd1, 0xf0, 0x24, 0x33, 0x62, 0x72, 0x82, 0x09, 0x0a, 0x16, 0x17,
+ 0x18, 0x19, 0x1a, 0x25, 0x26, 0x27, 0x28, 0x29, 0x2a, 0x34, 0x35, 0x36,
+ 0x37, 0x38, 0x39, 0x3a, 0x43, 0x44, 0x45, 0x46, 0x47, 0x48, 0x49, 0x4a,
+ 0x53, 0x54, 0x55, 0x56, 0x57, 0x58, 0x59, 0x5a, 0x63, 0x64, 0x65, 0x66,
+ 0x67, 0x68, 0x69, 0x6a, 0x73, 0x74, 0x75, 0x76, 0x77, 0x78, 0x79, 0x7a,
+ 0x83, 0x84, 0x85, 0x86, 0x87, 0x88, 0x89, 0x8a, 0x92, 0x93, 0x94, 0x95,
+ 0x96, 0x97, 0x98, 0x99, 0x9a, 0xa2, 0xa3, 0xa4, 0xa5, 0xa6, 0xa7, 0xa8,
+ 0xa9, 0xaa, 0xb2, 0xb3, 0xb4, 0xb5, 0xb6, 0xb7, 0xb8, 0xb9, 0xba, 0xc2,
+ 0xc3, 0xc4, 0xc5, 0xc6, 0xc7, 0xc8, 0xc9, 0xca, 0xd2, 0xd3, 0xd4, 0xd5,
+ 0xd6, 0xd7, 0xd8, 0xd9, 0xda, 0xe1, 0xe2, 0xe3, 0xe4, 0xe5, 0xe6, 0xe7,
+ 0xe8, 0xe9, 0xea, 0xf1, 0xf2, 0xf3, 0xf4, 0xf5, 0xf6, 0xf7, 0xf8, 0xf9,
+ 0xfa, 0x11, 0x00, 0x02, 0x01, 0x02, 0x04, 0x04, 0x03, 0x04, 0x07, 0x05,
+ 0x04, 0x04, 0x00, 0x01, 0x02, 0x77, 0x00, 0x01, 0x02, 0x03, 0x11, 0x04,
+ 0x05, 0x21, 0x31, 0x06, 0x12, 0x41, 0x51, 0x07, 0x61, 0x71, 0x13, 0x22,
+ 0x32, 0x81, 0x08, 0x14, 0x42, 0x91, 0xa1, 0xb1, 0xc1, 0x09, 0x23, 0x33,
+ 0x52, 0xf0, 0x15, 0x62, 0x72, 0xd1, 0x0a, 0x16, 0x24, 0x34, 0xe1, 0x25,
+ 0xf1, 0x17, 0x18, 0x19, 0x1a, 0x26, 0x27, 0x28, 0x29, 0x2a, 0x35, 0x36,
+ 0x37, 0x38, 0x39, 0x3a, 0x43, 0x44, 0x45, 0x46, 0x47, 0x48, 0x49, 0x4a,
+ 0x53, 0x54, 0x55, 0x56, 0x57, 0x58, 0x59, 0x5a, 0x63, 0x64, 0x65, 0x66,
+ 0x67, 0x68, 0x69, 0x6a, 0x73, 0x74, 0x75, 0x76, 0x77, 0x78, 0x79, 0x7a,
+ 0x82, 0x83, 0x84, 0x85, 0x86, 0x87, 0x88, 0x89, 0x8a, 0x92, 0x93, 0x94,
+ 0x95, 0x96, 0x97, 0x98, 0x99, 0x9a, 0xa2, 0xa3, 0xa4, 0xa5, 0xa6, 0xa7,
+ 0xa8, 0xa9, 0xaa, 0xb2, 0xb3, 0xb4, 0xb5, 0xb6, 0xb7, 0xb8, 0xb9, 0xba,
+ 0xc2, 0xc3, 0xc4, 0xc5, 0xc6, 0xc7, 0xc8, 0xc9, 0xca, 0xd2, 0xd3, 0xd4,
+ 0xd5, 0xd6, 0xd7, 0xd8, 0xd9, 0xda, 0xe2, 0xe3, 0xe4, 0xe5, 0xe6, 0xe7,
+ 0xe8, 0xe9, 0xea, 0xf2, 0xf3, 0xf4, 0xf5, 0xf6, 0xf7, 0xf8, 0xf9, 0xfa
+};
+
+const char kFirstHeader[] = "HTTP/1.0 200 OK\r\n"
+"Connection: close\r\n"
+"Server: AOS/0.0 Camera\r\n"
+"Cache-Control: no-store, no-cache, must-revalidate, pre-check=0, "
+"post-check=0, max-age=0\r\n" // this and above from mjpg-streamer
+"Pragma: no-cache\r\n"
+"Expires: Mon, 3 Jan 2000 12:34:56 GMT\r\n" // also from mjpg-streamer
+"Content-Type: multipart/x-mixed-replace; boundary=boundarydonotcross\r\n";
+
+} // namespace
+
+class HTTPStreamer {
+ // Represents a single client. Handles all reading and writing of sockets and
+ // queues.
+ class Client {
+ enum class State {
+ kReadHeaders,
+ kWriteHeaders,
+ kWriteBoundary, // these last 2 loop to each other
+ kWriteImage,
+ kWriteDHT, // happens in the middle of kWriteImage
+ };
+ const int sock_;
+ State state_;
+ inline fd_set *GetFDSetForCurrentState(fd_set *read_fds,
+ fd_set *write_fds) {
+ if (state_ == State::kReadHeaders) {
+ return read_fds;
+ } else {
+ return write_fds;
+ }
+ }
+ // MUST BE LONG ENOUGH TO HOLD kBoundaryText WITH A BIGGISH # IN IT
+ char scratch_[4096];
+ int to_write_;
+ int zero_reads_;
+ static const int kMaxZeroReads = 2000;
+ size_t pos_, dht_pos_, dht_start_;
+ Buffers buffers_;
+ const void *current_;
+ uint32_t size_;
+
+ Client(const Client &);
+ void operator=(const Client &);
+
+ public:
+ explicit Client(int sock) : sock_(sock), state_(State::kReadHeaders),
+ zero_reads_(0), pos_(0) {}
+ ~Client() {
+ LOG(DEBUG, "closing socket %d\n", sock_);
+ if (close(sock_) == -1) {
+ LOG(INFO, "closing socket %d for destruction failed with %d: %s\n",
+ sock_, errno, strerror(errno));
+ }
+ }
+ // Set any fds necessary into the 2 arguments.
+ void FDSet(fd_set *read_fds, fd_set *write_fds) {
+ FD_SET(sock_, GetFDSetForCurrentState(read_fds, write_fds));
+ }
+ // The arguments are the same as the last FDSet call (after a successful
+ // select).
+ // Return value is whether or not to keep this one around.
+ bool Process(fd_set *read_fds, fd_set *write_fds) {
+ // if the socket we're waiting on isn't ready
+ if (!FD_ISSET(sock_, GetFDSetForCurrentState(read_fds, write_fds))) {
+ return true;
+ }
+
+ ssize_t ret;
+ switch (state_) {
+ case State::kReadHeaders:
+ if (pos_ >= sizeof(scratch_)) {
+ LOG(WARNING, "read too many bytes of headers on sock %d."
+ " somebody should increase the size of scratch_\n", sock_);
+ return false;
+ }
+ if (zero_reads_ > kMaxZeroReads) {
+ LOG(WARNING, "read 0 bytes %d times on sock %d. giving up\n",
+ zero_reads_, sock_);
+ return false;
+ }
+ ret = read(sock_, scratch_ + pos_, sizeof(scratch_) - pos_);
+ if (ret == -1) {
+ LOG(WARNING, "read(%d, %p, %zd) failed with %d: %s\n",
+ sock_, scratch_ + pos_, sizeof(scratch_) - pos_,
+ errno, strerror(errno));
+ return false;
+ }
+ pos_ += ret;
+ // if we just received \r\n\r\n (the end of the headers)
+ if (scratch_[pos_ - 4] == '\r' && scratch_[pos_ - 3] == '\n' &&
+ scratch_[pos_ - 2] == '\r' && scratch_[pos_ - 1] == '\n') {
+ LOG(INFO, "entering state kWriteHeaders"
+ " after %zd bytes of headers read\n", pos_ - 1);
+ pos_ = 0;
+ state_ = State::kWriteHeaders;
+ }
+ scratch_[pos_] = '\0';
+ if (ret == 0) {
+ ++zero_reads_;
+ } else {
+ zero_reads_ = 0;
+ LOG(DEBUG, "read %zd bytes of headers scratch_=%s\n",
+ ret, scratch_);
+ }
+ break;
+ case State::kWriteHeaders:
+ // this intentionally doesn't write the terminating \0 on the string
+ ret = write(sock_, kFirstHeader + pos_, sizeof(kFirstHeader) - pos_);
+ if (ret == -1) {
+ LOG(WARNING, "write(%d, %p, %zd) failed with %d: %s\n",
+ sock_, kFirstHeader + pos_, sizeof(kFirstHeader) - pos_,
+ errno, strerror(errno));
+ } else {
+ pos_ += ret;
+ if (pos_ >= sizeof(kFirstHeader)) {
+ current_ = NULL;
+ state_ = State::kWriteBoundary;
+ }
+ }
+ break;
+ case State::kWriteBoundary:
+ if (current_ == NULL) {
+ timeval timestamp;
+ current_ = buffers_.GetNext(false, &size_, ×tamp, NULL);
+
+ /*static int skip = 0;
+ if (current_ != NULL) skip = (skip + 1) % 30;
+ if (!skip) current_ = NULL;
+ if (current_ == NULL) break;*/
+
+#if 0
+ // set pos_ to where the first header starts
+ for (pos_ = 0; static_cast<const uint8_t *>(current_)[pos_] != 0xFF;
+ ++pos_);
+#else
+ pos_ = 0;
+#endif
+#if 0
+ // go through the frame looking for the start of frame marker
+ for (dht_start_ = 0;
+ static_cast<const uint8_t *>(current_)[dht_start_ + 0] !=
+ 0xFF &&
+ static_cast<const uint8_t *>(current_)[dht_start_ + 1] !=
+ 0xC0 &&
+ dht_start_ < size_; ++dht_start_)
+ printf("[%zd]=%"PRIx8" ", dht_start_,
+ static_cast<const uint8_t *>(current_)[dht_start_]);
+ if (dht_start_ >= size_) {
+ LOG(WARNING, "couldn't find start of frame marker\n");
+ return false;
+ }
+#else
+ dht_start_ = 0;
+#endif
+ dht_pos_ = 0;
+
+ // aos.ChannelImageGetter depends on the exact format of this
+ to_write_ = snprintf(scratch_, sizeof(scratch_),
+ "\r\n--boundarydonotcross\r\n"
+ "Content-Type: image/jpeg\r\n"
+ "Content-Length: %" PRId32 "\r\n"
+ "X-Timestamp: %ld.%06ld\r\n"
+ "\r\n",
+ size_,
+ timestamp.tv_sec, timestamp.tv_usec);
+ }
+ ret = write(sock_, scratch_ + pos_, to_write_ - pos_);
+ if (ret == -1) {
+ LOG(WARNING, "write(%d, %p, %zd) failed with %d: %s\n",
+ sock_, scratch_ + pos_, to_write_ - pos_,
+ errno, strerror(errno));
+ return false;
+ } else {
+ pos_ += ret;
+ if (static_cast<ssize_t>(pos_) >= to_write_) {
+ pos_ = 0;
+ state_ = State::kWriteImage;
+ }
+ }
+ break;
+ case State::kWriteImage:
+ ret = write(sock_, static_cast<const char *>(current_) + pos_,
+ ((dht_start_ == 0) ? size_ : dht_start_) - pos_);
+ if (ret == -1) {
+ LOG(WARNING, "write(%d, %p, %zd) failed with %d: %s\n",
+ sock_, static_cast<const char *>(current_) + pos_,
+ ((dht_start_ == 0) ? size_ : dht_start_) - pos_,
+ errno, strerror(errno));
+ return false;
+ } else {
+ pos_ += ret;
+ if (dht_start_ == 0) {
+ if (pos_ >= size_) {
+ buffers_.Release();
+ current_ = NULL;
+ state_ = State::kWriteBoundary;
+ }
+ } else {
+ if (pos_ >= dht_start_) {
+ dht_start_ = 0;
+ state_ = State::kWriteDHT;
+ }
+ }
+ }
+ break;
+ case State::kWriteDHT:
+ ret = write(sock_, dht_table + dht_pos_,
+ sizeof(dht_table) - dht_pos_);
+ if (ret == -1) {
+ LOG(WARNING, "write(%d, %p, %zd) failed with %d: %s\n",
+ sock_, dht_table + dht_pos_, sizeof(dht_table) - dht_pos_,
+ errno, strerror(errno));
+ return false;
+ } else {
+ dht_pos_ += ret;
+ if (dht_pos_ >= sizeof(dht_table)) {
+ state_ = State::kWriteImage;
+ }
+ }
+ break;
+ default:
+ LOG(FATAL, "something weird happened\n");
+ }
+ return true;
+ }
+ };
+
+ const int bind_socket_;
+
+ public:
+ HTTPStreamer() : bind_socket_(socket(AF_INET, SOCK_STREAM, 0)) {
+ if (bind_socket_ < 0) {
+ LOG(FATAL, "socket(AF_INET, SOCK_STREAM, 0) failed with %d: %s\n",
+ errno, strerror(errno));
+ }
+
+ union {
+ sockaddr_in in;
+ sockaddr addr;
+ } bind_sockaddr;
+ memset(&bind_sockaddr, 0, sizeof(bind_sockaddr));
+ bind_sockaddr.in.sin_family = AF_INET;
+ bind_sockaddr.in.sin_port =
+ htons(static_cast<uint16_t>(aos::NetworkPort::kCameraStreamer));
+ bind_sockaddr.in.sin_addr.s_addr = htonl(INADDR_ANY);
+ int optval = 1;
+ setsockopt(bind_socket_, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval));
+ if (bind(bind_socket_, &bind_sockaddr.addr,
+ sizeof(bind_sockaddr.addr)) == -1) {
+ LOG(FATAL, "bind(%d, %p) failed because of %d: %s\n",
+ bind_socket_, &bind_sockaddr.addr, errno, strerror(errno));
+ }
+
+ if (listen(bind_socket_, 10) == -1) {
+ LOG(FATAL, "listen(%d, 10) failed because of %d: %s\n", bind_socket_,
+ errno, strerror(errno));
+ }
+ const int flags = fcntl(bind_socket_, F_GETFL, 0);
+ if (flags == -1) {
+ LOG(FATAL, "fcntl(%d, F_GETFL, 0) failed because of %d: %s\n",
+ bind_socket_, errno, strerror(errno));
+ }
+ if (fcntl(bind_socket_, F_SETFL, flags | O_NONBLOCK) == -1) {
+ LOG(FATAL, "fcntl(%d, F_SETFL, %x) failed because of %d: %s\n",
+ bind_socket_, flags | O_NONBLOCK, errno, strerror(errno));
+ }
+ }
+ void Run() {
+ signal(SIGPIPE, SIG_IGN);
+
+ std::vector<Client *> clients;
+ fd_set read_fds, write_fds;
+ while (true) {
+ FD_ZERO(&read_fds);
+ FD_ZERO(&write_fds);
+ FD_SET(bind_socket_, &read_fds);
+ for (auto it = clients.begin(); it != clients.end(); ++it) {
+ (*it)->FDSet(&read_fds, &write_fds);
+ }
+ switch (select(FD_SETSIZE, &read_fds, &write_fds,
+ NULL, // err
+ NULL)) { // timeout
+ case -1:
+ LOG(ERROR, "select(FD_SETSIZE(=%d), %p, %p, NULL, NULL) failed"
+ " because of %d: %s\n", FD_SETSIZE, &read_fds, &write_fds,
+ errno, strerror(errno));
+ continue;
+ case 0:
+ LOG(ERROR, "select with NULL timeout timed out...\n");
+ continue;
+ }
+
+ if (FD_ISSET(bind_socket_, &read_fds)) {
+ const int sock = accept4(bind_socket_, NULL, NULL, SOCK_NONBLOCK);
+ if (sock == -1) {
+ LOG(ERROR, "accept4(%d, NULL, NULL, SOCK_NONBLOCK(=%d) failed"
+ " because of %d: %s\n",
+ bind_socket_, SOCK_NONBLOCK, errno, strerror(errno));
+ } else {
+ clients.push_back(new Client(sock));
+ }
+ }
+
+ std::vector<std::vector<Client *>::iterator> to_remove;
+ for (auto it = clients.begin(); it != clients.end(); ++it) {
+ if (!(*it)->Process(&read_fds, &write_fds)) {
+ to_remove.push_back(it);
+ delete *it;
+ }
+ }
+ for (auto it = to_remove.rbegin(); it != to_remove.rend(); ++it) {
+ LOG(INFO, "removing client\n");
+ clients.erase(*it);
+ }
+ }
+ }
+};
+
+} // namespace camera
+} // namespace aos
+
+int main() {
+ ::aos::InitNRT();
+ ::aos::camera::HTTPStreamer streamer;
+ streamer.Run();
+ ::aos::Cleanup();
+}
diff --git a/aos/linux_code/camera/Reader.cpp b/aos/linux_code/camera/Reader.cpp
new file mode 100644
index 0000000..cf77548
--- /dev/null
+++ b/aos/linux_code/camera/Reader.cpp
@@ -0,0 +1,419 @@
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <errno.h>
+#include <malloc.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <sys/time.h>
+#include <sys/mman.h>
+
+#include <string>
+#include <inttypes.h>
+
+#include "aos/linux_code/init.h"
+#include "aos/linux_code/camera/V4L2.h"
+#include "aos/linux_code/camera/Buffers.h"
+#include "aos/common/logging/logging.h"
+#include "aos/linux_code/ipc_lib/queue.h"
+
+#define CLEAR(x) memset(&(x), 0, sizeof(x))
+
+namespace aos {
+namespace camera {
+
+class Reader {
+ static const char *const dev_name;
+
+ // of the camera
+ int fd_;
+ // the bound socket listening for fd requests
+ int server_fd_;
+
+ RawQueue *queue_, *recycle_queue_;
+ // the number of buffers currently queued in v4l2
+ uint32_t queued_;
+ public:
+ Reader() {
+ struct stat st;
+ if (stat(dev_name, &st) == -1) {
+ LOG(FATAL, "Cannot identify '%s' because of %d: %s\n",
+ dev_name, errno, strerror(errno));
+ }
+ if (!S_ISCHR(st.st_mode)) {
+ LOG(FATAL, "%s is no device\n", dev_name);
+ }
+
+ fd_ = open(dev_name, O_RDWR /* required */ | O_NONBLOCK, 0);
+ if (fd_ == -1) {
+ LOG(FATAL, "Cannot open '%s' because of %d: %s\n",
+ dev_name, errno, strerror(errno));
+ }
+
+ queue_ = RawQueue::Fetch(Buffers::kQueueName.c_str(),
+ sizeof(Buffers::Message), 971, 1,
+ 1, Buffers::kNumBuffers, &recycle_queue_);
+ // read off any existing recycled messages
+ while (recycle_queue_->ReadMessage(RawQueue::kNonBlock) != NULL);
+ queued_ = 0;
+
+ InitServer();
+ Init();
+ }
+ private:
+ void InitServer() {
+ if (unlink(Buffers::kFDServerName.c_str()) == -1 && errno != ENOENT) {
+ LOG(WARNING, "unlink(kFDServerName(='%s')) failed with %d: %s\n",
+ Buffers::kFDServerName.c_str(), errno, strerror(errno));
+ }
+ if ((server_fd_ = Buffers::CreateSocket(bind)) == -1) {
+ LOG(FATAL, "creating the IPC socket failed\n");
+ }
+ if (listen(server_fd_, 10) == -1) {
+ LOG(FATAL, "listen(%d, 10) failed with %d: %s\n",
+ server_fd_, errno, strerror(errno));
+ }
+ }
+ void SendFD(const int sock) {
+ int myfds[Buffers::kNumFDs]; /* Contains the file descriptors to pass. */
+ myfds[0] = fd_;
+ char buf[CMSG_SPACE(sizeof(myfds))]; /* ancillary data buffer */
+
+ iovec data;
+ memset(&data, 0, sizeof(data));
+ char dummy = 'A';
+ data.iov_base = &dummy;
+ data.iov_len = sizeof(dummy);
+ msghdr msg;
+ memset(&msg, 0, sizeof(msg));
+ msg.msg_iov = &data;
+ msg.msg_iovlen = 1;
+ msg.msg_control = buf;
+ msg.msg_controllen = sizeof(buf);
+ cmsghdr *const cmsg = CMSG_FIRSTHDR(&msg);
+ cmsg->cmsg_level = SOL_SOCKET;
+ cmsg->cmsg_type = SCM_RIGHTS;
+ cmsg->cmsg_len = CMSG_LEN(sizeof(myfds));
+ /* Initialize the payload: */
+ memcpy(CMSG_DATA(cmsg), myfds, sizeof(myfds));
+ if (sendmsg(sock, &msg, 0) == -1) {
+ LOG(ERROR, "sendmsg(%d, %p, 0) failed with %d: %s\n",
+ sock, &msg, errno, strerror(errno));
+ }
+ // leave it open so that the other end can tell if this process dies
+ }
+
+#if 0
+ // if we ever do want to do any of these things, this is how
+ void Stop() {
+ const v4l2_buf_type type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
+ if (xioctl(fd_, VIDIOC_STREAMOFF, &type) == -1) {
+ errno_exit("VIDIOC_STREAMOFF");
+ }
+ }
+ void Close() {
+ if (close(fd_) == -1)
+ errno_exit("close");
+ fd_ = -1;
+ }
+#endif
+
+ void QueueBuffer(v4l2_buffer *buf) {
+ if (xioctl(fd_, VIDIOC_QBUF, buf) == -1) {
+ LOG(WARNING, "ioctl VIDIOC_QBUF(%d, %p) failed with %d: %s."
+ " losing buf #%" PRIu32 "\n",
+ fd_, &buf, errno, strerror(errno), buf->index);
+ } else {
+ LOG(DEBUG, "put buf #%" PRIu32 " into driver's queue\n", buf->index);
+ ++queued_;
+ }
+ }
+ void ReadFrame() {
+ v4l2_buffer buf;
+ CLEAR(buf);
+ buf.type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
+ buf.memory = V4L2_MEMORY_MMAP;
+
+ const Buffers::Message *read;
+ do {
+ read = static_cast<const Buffers::Message *>(
+ // we block waiting for one if we can't dequeue one without leaving
+ // the driver <= 2 (to be safe)
+ recycle_queue_->ReadMessage((queued_ <= 2) ?
+ RawQueue::kBlock : RawQueue::kNonBlock));
+ if (read != NULL) {
+ buf.index = read->index;
+ recycle_queue_->FreeMessage(read);
+ QueueBuffer(&buf);
+ }
+ } while (read != NULL);
+
+ if (xioctl(fd_, VIDIOC_DQBUF, &buf) == -1) {
+ if (errno != EAGAIN) {
+ LOG(ERROR, "ioctl VIDIOC_DQBUF(%d, %p) failed with %d: %s\n",
+ fd_, &buf, errno, strerror(errno));
+ }
+ return;
+ }
+ --queued_;
+ if (buf.index >= Buffers::kNumBuffers) {
+ LOG(ERROR, "buf.index (%" PRIu32 ") is >= kNumBuffers (%u)\n",
+ buf.index, Buffers::kNumBuffers);
+ return;
+ }
+
+ Buffers::Message *const msg = static_cast<Buffers::Message *>(
+ queue_->GetMessage());
+ if (msg == NULL) {
+ LOG(WARNING,
+ "couldn't get a message to send buf #%" PRIu32 " from queue %p."
+ " re-queueing now\n", buf.index, queue_);
+ QueueBuffer(&buf);
+ return;
+ }
+ msg->index = buf.index;
+ msg->bytesused = buf.bytesused;
+ memcpy(&msg->timestamp, &buf.timestamp, sizeof(msg->timestamp));
+ msg->sequence = buf.sequence;
+ if (!queue_->WriteMessage(msg, RawQueue::kOverride)) {
+ LOG(WARNING,
+ "sending message %p with buf #%" PRIu32 " to queue %p failed."
+ " re-queueing now\n", msg, buf.index, queue_);
+ QueueBuffer(&buf);
+ return;
+ } else {
+ LOG(DEBUG, "sent message off to queue %p with buffer #%" PRIu32 "\n",
+ queue_, buf.index);
+ }
+ }
+
+ void init_mmap() {
+ v4l2_requestbuffers req;
+ CLEAR(req);
+ req.count = Buffers::kNumBuffers;
+ req.type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
+ req.memory = V4L2_MEMORY_MMAP;
+ if (xioctl(fd_, VIDIOC_REQBUFS, &req) == -1) {
+ if (EINVAL == errno) {
+ LOG(FATAL, "%s does not support memory mapping\n", dev_name);
+ } else {
+ LOG(FATAL, "ioctl VIDIOC_REQBUFS(%d, %p) failed with %d: %s\n",
+ fd_, &req, errno, strerror(errno));
+ }
+ }
+ queued_ = Buffers::kNumBuffers;
+ if (req.count < Buffers::kNumBuffers) {
+ LOG(FATAL, "Insufficient buffer memory on %s\n", dev_name);
+ }
+ }
+
+ // Sets one of the camera's user-control values.
+ // Prints the old and new values.
+ // Just prints a message if the camera doesn't support this control or value.
+ bool SetCameraControl(uint32_t id, const char *name, int value) {
+ struct v4l2_control getArg = {id, 0U};
+ int r = xioctl(fd_, VIDIOC_G_CTRL, &getArg);
+ if (r == 0) {
+ if (getArg.value == value) {
+ printf("Camera control %s was already %d\n", name, getArg.value);
+ return true;
+ }
+ } else if (errno == EINVAL) {
+ printf("Camera control %s is invalid\n", name);
+ errno = 0;
+ return false;
+ }
+
+ struct v4l2_control setArg = {id, value};
+ r = xioctl(fd_, VIDIOC_S_CTRL, &setArg);
+ if (r == 0) {
+ printf("Set camera control %s from %d to %d\n", name, getArg.value, value);
+ return true;
+ }
+
+ printf("Couldn't set camera control %s to %d: %s (errno %d)\n",
+ name, value, strerror(errno), errno);
+ errno = 0;
+ return false;
+ }
+
+ void Init() {
+ v4l2_capability cap;
+ if (xioctl(fd_, VIDIOC_QUERYCAP, &cap) == -1) {
+ if (EINVAL == errno) {
+ LOG(FATAL, "%s is no V4L2 device\n",
+ dev_name);
+ } else {
+ LOG(FATAL, "ioctl VIDIOC_QUERYCAP(%d, %p) failed with %d: %s\n",
+ fd_, &cap, errno, strerror(errno));
+ }
+ }
+ if (!(cap.capabilities & V4L2_CAP_VIDEO_CAPTURE)) {
+ LOG(FATAL, "%s is no video capture device\n",
+ dev_name);
+ }
+ if (!(cap.capabilities & V4L2_CAP_STREAMING)) {
+ LOG(FATAL, "%s does not support streaming i/o\n",
+ dev_name);
+ }
+
+ /* Select video input, video standard and tune here. */
+
+ v4l2_cropcap cropcap;
+ CLEAR(cropcap);
+ cropcap.type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
+ if (xioctl(fd_, VIDIOC_CROPCAP, &cropcap) == 0) {
+ v4l2_crop crop;
+ crop.type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
+ crop.c = cropcap.defrect; /* reset to default */
+
+ if (xioctl(fd_, VIDIOC_S_CROP, &crop) == -1) {
+ switch (errno) {
+ case EINVAL:
+ /* Cropping not supported. */
+ break;
+ default:
+ /* Errors ignored. */
+ break;
+ }
+ }
+ } else {
+ /* Errors ignored. */
+ }
+
+ v4l2_format fmt;
+ CLEAR(fmt);
+ fmt.type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
+ fmt.fmt.pix.width = Buffers::kWidth;
+ fmt.fmt.pix.height = Buffers::kHeight;
+ fmt.fmt.pix.pixelformat = V4L2_PIX_FMT_MJPEG;
+ fmt.fmt.pix.field = V4L2_FIELD_ANY;
+ //fmt.fmt.pix.pixelformat = V4L2_PIX_FMT_YUYV;
+ //fmt.fmt.pix.field = V4L2_FIELD_INTERLACED;
+ if (xioctl(fd_, VIDIOC_S_FMT, &fmt) == -1) {
+ LOG(FATAL, "ioctl VIDIC_S_FMT(%d, %p) failed with %d: %s\n",
+ fd_, &fmt, errno, strerror(errno));
+ }
+ /* Note VIDIOC_S_FMT may change width and height. */
+
+ /* Buggy driver paranoia. */
+ unsigned int min = fmt.fmt.pix.width * 2;
+ if (fmt.fmt.pix.bytesperline < min)
+ fmt.fmt.pix.bytesperline = min;
+ min = fmt.fmt.pix.bytesperline * fmt.fmt.pix.height;
+ if (fmt.fmt.pix.sizeimage < min)
+ fmt.fmt.pix.sizeimage = min;
+
+ if (!SetCameraControl(V4L2_CID_EXPOSURE_AUTO,
+ "V4L2_CID_EXPOSURE_AUTO", V4L2_EXPOSURE_MANUAL)) {
+ LOG(FATAL, "Failed to set exposure\n");
+ }
+
+ if (!SetCameraControl(V4L2_CID_EXPOSURE_ABSOLUTE,
+ "V4L2_CID_EXPOSURE_ABSOLUTE", 65)) {
+ LOG(FATAL, "Failed to set exposure\n");
+ }
+
+ if (!SetCameraControl(V4L2_CID_BRIGHTNESS, "V4L2_CID_BRIGHTNESS", 128)) {
+ LOG(FATAL, "Failed to set up camera\n");
+ }
+
+ if (!SetCameraControl(V4L2_CID_GAIN, "V4L2_CID_GAIN", 0)) {
+ LOG(FATAL, "Failed to set up camera\n");
+ }
+
+#if 0
+ // set framerate
+ struct v4l2_streamparm *setfps;
+ setfps = (struct v4l2_streamparm *) calloc(1, sizeof(struct v4l2_streamparm));
+ memset(setfps, 0, sizeof(struct v4l2_streamparm));
+ setfps->type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
+ setfps->parm.capture.timeperframe.numerator = 1;
+ setfps->parm.capture.timeperframe.denominator = 20;
+ if (xioctl(fd_, VIDIOC_S_PARM, setfps) == -1) {
+ LOG(ERROR, "ioctl VIDIOC_S_PARM(%d, %p) failed with %d: %s\n",
+ fd_, setfps, errno, strerror(errno));
+ exit(EXIT_FAILURE);
+ }
+ LOG(INFO, "framerate ended up at %d/%d\n",
+ setfps->parm.capture.timeperframe.numerator,
+ setfps->parm.capture.timeperframe.denominator);
+#endif
+
+ init_mmap();
+ }
+
+ void Start() {
+ LOG(DEBUG, "queueing buffers for the first time\n");
+ v4l2_buffer buf;
+ for (unsigned int i = 0; i < Buffers::kNumBuffers; ++i) {
+ CLEAR(buf);
+ buf.type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
+ buf.memory = V4L2_MEMORY_MMAP;
+ buf.index = i;
+ QueueBuffer(&buf);
+ }
+ LOG(DEBUG, "done with first queue\n");
+
+ v4l2_buf_type type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
+ if (xioctl(fd_, VIDIOC_STREAMON, &type) == -1) {
+ LOG(FATAL, "ioctl VIDIOC_STREAMON(%d, %p) failed with %d: %s\n",
+ fd_, &type, errno, strerror(errno));
+ }
+ }
+
+ public:
+ void Run() {
+ Start();
+
+ fd_set fds;
+ timeval tv;
+ while (true) {
+ // HAVE TO DO THIS EACH TIME THROUGH THE LOOP
+ tv.tv_sec = 2;
+ tv.tv_usec = 0;
+
+ FD_ZERO(&fds);
+ FD_SET(fd_, &fds);
+ FD_SET(server_fd_, &fds);
+ switch (select(std::max(fd_, server_fd_) + 1, &fds, NULL, NULL, &tv)) {
+ case -1:
+ if (errno != EINTR) {
+ LOG(ERROR, "select(%d, %p, NULL, NULL, %p) failed with %d: %s\n",
+ std::max(fd_, server_fd_) + 1, &fds, &tv, errno, strerror(errno));
+ }
+ continue;
+ case 0:
+ LOG(WARNING, "select timed out\n");
+ continue;
+ }
+
+ if (FD_ISSET(fd_, &fds)) {
+ LOG(DEBUG, "Got a frame\n");
+ ReadFrame();
+ }
+ if (FD_ISSET(server_fd_, &fds)) {
+ const int sock = accept4(server_fd_, NULL, NULL, SOCK_NONBLOCK);
+ if (sock == -1) {
+ LOG(ERROR, "accept4(%d, NULL, NULL, SOCK_NONBLOCK(=%d) failed with %d: %s\n",
+ server_fd_, SOCK_NONBLOCK, errno, strerror(errno));
+ } else {
+ SendFD(sock);
+ }
+ }
+ }
+ }
+};
+const char *const Reader::dev_name = "/dev/video0";
+
+} // namespace camera
+} // namespace aos
+
+int main() {
+ ::aos::InitNRT();
+ ::aos::camera::Reader reader;
+ reader.Run();
+ ::aos::Cleanup();
+}
diff --git a/aos/linux_code/camera/V4L2.h b/aos/linux_code/camera/V4L2.h
new file mode 100644
index 0000000..e018bea
--- /dev/null
+++ b/aos/linux_code/camera/V4L2.h
@@ -0,0 +1,27 @@
+#ifndef AOS_LINUX_CODE_CAMREA_V4L2_H_
+#define AOS_LINUX_CODE_CAMREA_V4L2_H_
+
+// This file handles including everything needed to use V4L2 and has some
+// utility functions.
+
+#include <sys/ioctl.h>
+
+#include <asm/types.h> /* for videodev2.h */
+#include <linux/videodev2.h>
+
+namespace aos {
+namespace camera {
+
+static inline int xioctl(int fd, int request, void *arg) {
+ int r;
+ do {
+ r = ioctl(fd, request, reinterpret_cast<uintptr_t>(arg));
+ } while (r == -1 && errno == EINTR);
+ return r;
+}
+
+} // namespace camera
+} // namespace aos
+
+#endif
+
diff --git a/aos/linux_code/camera/aos.jar_manifest b/aos/linux_code/camera/aos.jar_manifest
new file mode 100644
index 0000000..8b13789
--- /dev/null
+++ b/aos/linux_code/camera/aos.jar_manifest
@@ -0,0 +1 @@
+
diff --git a/aos/linux_code/camera/camera.gyp b/aos/linux_code/camera/camera.gyp
new file mode 100644
index 0000000..e4f1e04
--- /dev/null
+++ b/aos/linux_code/camera/camera.gyp
@@ -0,0 +1,81 @@
+{
+ 'targets': [
+ {
+ 'target_name': 'aos_camera',
+ 'type': 'loadable_module',
+ 'sources': [
+ 'jni.cpp',
+ ],
+ 'dependencies': [
+ '<(AOS)/common/network/network.gyp:socket_so',
+ '<(AOS)/common/common.gyp:timing_so',
+ 'private_aos_camera_jar',
+ '<(EXTERNALS):libjpeg',
+ ],
+ 'export_dependent_settings': [
+ '<(AOS)/common/network/network.gyp:socket_so',
+ '<(AOS)/common/common.gyp:timing_so',
+ 'private_aos_camera_jar',
+ ],
+ },
+ {
+ 'target_name': 'private_aos_camera_jar',
+ 'dependencies': [
+ '<(EXTERNALS):javacv',
+ ],
+ 'variables': {
+ 'srcdirs': ['java'],
+ 'gen_headers': ['aos.Natives'],
+ },
+ 'export_dependent_settings': [
+ '<(EXTERNALS):javacv',
+ ],
+ 'direct_dependent_settings': {
+ 'variables': {
+ 'jni_libs': ['aos_camera'],
+ },
+ },
+ 'includes': ['../../build/java.gypi'],
+ 'hard_dependency': 1,
+ },
+ {
+ 'target_name': 'buffers',
+ 'type': 'static_library',
+ 'sources': [
+ 'Buffers.cpp',
+ ],
+ 'dependencies': [
+ '<(AOS)/linux_code/ipc_lib/ipc_lib.gyp:queue',
+ '<(AOS)/build/aos.gyp:logging',
+ ],
+ 'export_dependent_settings': [
+ '<(AOS)/linux_code/ipc_lib/ipc_lib.gyp:queue',
+ ],
+ },
+ {
+ 'target_name': 'CameraHTTPStreamer',
+ 'type': 'executable',
+ 'sources': [
+ 'HTTPStreamer.cpp',
+ ],
+ 'dependencies': [
+ 'buffers',
+ '<(AOS)/linux_code/linux_code.gyp:init',
+ '<(AOS)/build/aos.gyp:logging',
+ ],
+ },
+ {
+ 'target_name': 'CameraReader',
+ 'type': 'executable',
+ 'sources': [
+ 'Reader.cpp',
+ ],
+ 'dependencies': [
+ 'buffers',
+ '<(AOS)/linux_code/linux_code.gyp:init',
+ '<(AOS)/build/aos.gyp:logging',
+ '<(AOS)/linux_code/ipc_lib/ipc_lib.gyp:queue',
+ ],
+ },
+ ],
+}
diff --git a/aos/linux_code/camera/java/aos/CameraProcessor.java b/aos/linux_code/camera/java/aos/CameraProcessor.java
new file mode 100644
index 0000000..4f6c68d
--- /dev/null
+++ b/aos/linux_code/camera/java/aos/CameraProcessor.java
@@ -0,0 +1,67 @@
+package aos;
+
+import java.io.IOException;
+import java.net.Inet4Address;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.SocketChannel;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import com.googlecode.javacv.cpp.opencv_core;
+
+/**
+ * Makes implementing code that processes frames from a camera easy.
+ */
+public abstract class CameraProcessor {
+ private static final Logger LOG = Logger.getLogger(CameraProcessor.class.getName());
+ protected final ImageGetter getter;
+ protected final ServableImage start = new ServableImage(ImageGetter.width, ImageGetter.height, opencv_core.IPL_DEPTH_8U, 3);
+
+ /**
+ * Parses any arguments it recognizes out of {@code args} and initializes stuff appropriately.
+ * This includes using {@link QueueLogHandler} for all exceptions and {@link Thread#setDefaultUncaughtExceptionHandler}ing.
+ * @param args from {@code main}
+ */
+ protected CameraProcessor(String[] args) throws UnknownHostException, IOException {
+ QueueLogHandler.UseForAll();
+ ReadableByteChannel channel = null;
+ for (int i = 0; i < args.length; ++i) {
+ final String c = args[i];
+ if (c.equals("--host")) {
+ String host = args[++i];
+ final SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress(Inet4Address.getByName(host), 9714));
+ socketChannel.write(ByteBuffer.wrap(new byte[] {'\r', '\n', '\r', '\n'})); // get it past the read headers stage
+ channel = socketChannel;
+ } else {
+ System.err.println("CameraProcessor: warning: unrecognized argument '" + c + "'. ignoring");
+ }
+ }
+
+ if (channel != null) {
+ getter = new ChannelImageGetter(channel);
+ } else {
+ System.out.println("creating QueueImageGetter");
+ getter = new QueueImageGetter();
+ System.out.println("done");
+ }
+
+ LOG.log(Level.INFO, "CameraProcessor is up");
+ System.err.println("CameraProcessor is up (on stderr)");
+ }
+
+ protected abstract void RunIteration();
+
+ protected void Run() {
+ while (true) {
+ if (!getter.get(start.getImage())) {
+ LOG.log(Level.WARNING, "getting image failed");
+ continue;
+ }
+ RunIteration();
+ start.releaseImage();
+ }
+ }
+}
diff --git a/aos/linux_code/camera/java/aos/ChannelImageGetter.java b/aos/linux_code/camera/java/aos/ChannelImageGetter.java
new file mode 100644
index 0000000..511b55b
--- /dev/null
+++ b/aos/linux_code/camera/java/aos/ChannelImageGetter.java
@@ -0,0 +1,158 @@
+package aos;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Retrieves images from a {@link InputChannel}. Expects the images in mjpg form.
+ * For now, only accepts streams formatted pretty closely to how aos::camera::HTTPStreamer does it.
+ */
+public class ChannelImageGetter extends JPEGImageGetter {
+ /**
+ * What to multiply each length by when it needs to allocate a larger buffer to fit an image.
+ */
+ private static final double extraLength = 1.2;
+
+ private static final Logger LOG = Logger.getLogger(ChannelImageGetter.class
+ .getName());
+ private final ReadableByteChannel channel;
+ private final Selector selector = Selector.open();
+ private String separator = "--boundarydonotcross\r\n";
+ private ByteBuffer current;
+ private final ByteBuffer headerBuffer = ByteBuffer.allocateDirect(30);
+ private final Map<String, String> headers = new TreeMap<String, String>(String.CASE_INSENSITIVE_ORDER);
+
+ public ChannelImageGetter(ReadableByteChannel channel) throws IOException {
+ this.channel = channel;
+ if (channel instanceof SelectableChannel) {
+ ((SelectableChannel)channel).configureBlocking(false);
+ }
+ }
+
+ @Override
+ public ByteBuffer getJPEG() {
+ try {
+ if (!parseHeaders()) {
+ return null;
+ }
+ LOG.log(Level.FINE, "parsed headers " + headers.toString());
+ try {
+ final int length = Integer.parseInt(headers.get("Content-Length"));
+ if (current == null || current.capacity() < length) {
+ LOG.log(Level.INFO, "allocating a new direct buffer of length " + length * extraLength);
+ current = ByteBuffer.allocateDirect((int) (length * extraLength));
+ } else {
+ current.rewind();
+ current.limit(length);
+ }
+ } catch (NumberFormatException e) {
+ LOG.log(Level.WARNING, "couldn't parse '" + headers.get("Content-Length") + "' as a number");
+ return null;
+ }
+ current.put(headerBuffer); // copy out any of the image that got buffered with the headers
+ while (current.hasRemaining()) {
+ channel.read(current);
+ }
+ current.flip();
+ } catch (IOException e) {
+ LOG.log(Level.WARNING, "reading the headers and/or image failed", e);
+ return null;
+ }
+ return current;
+ }
+ // returns success
+ private boolean parseHeaders() throws IOException {
+ // Reads chunks into headerBuffer and parses out headers.
+ // Looks for separator first.
+
+ headerBuffer.clear();
+ headers.clear();
+ final byte[] separatorBytes = separator.getBytes();
+ int separatorIndex = 0; // how much of the separator has been matched
+ while (headerBuffer.hasRemaining() || headerBuffer.limit() < headerBuffer.capacity()) {
+ if (channel instanceof SelectableChannel) {
+ ((SelectableChannel)channel).register(selector, SelectionKey.OP_READ);
+ selector.select();
+ }
+ headerBuffer.limit(headerBuffer.capacity());
+ channel.read(headerBuffer);
+ headerBuffer.flip();
+ if (separatorIndex < separatorBytes.length) {
+ // make sure we don't get part of the way through
+ while (headerBuffer.remaining() >= (separatorBytes.length - separatorIndex)) {
+ final byte c = headerBuffer.get();
+ if (separatorBytes[separatorIndex++] != c) {
+ separatorIndex = 0;
+ }
+ if (separatorIndex == separatorBytes.length) {
+ break;
+ }
+ }
+ headerBuffer.compact();
+ } else {
+ int keyEnd = 0, valueStart = 0;
+ boolean foundEndR = false; // found the end \r
+ while (headerBuffer.hasRemaining()) {
+ final byte c = headerBuffer.get();
+ if (foundEndR) {
+ if (c != '\n') {
+ LOG.log(Level.WARNING, "found \r\n\r but no \n afterwards");
+ } else {
+ return true;
+ }
+ } else if (keyEnd == 0) {
+ if (c == ':') {
+ keyEnd = headerBuffer.position() - 1;
+ } else if (c == '\r') {
+ foundEndR = true;
+ }
+ } else if (valueStart == 0) {
+ if (c != ' ') {
+ valueStart = headerBuffer.position() - 1;
+ }
+ } else {
+ if (c == '\r') {
+ final int valueEnd = headerBuffer.position();
+ final byte[] key = new byte[keyEnd];
+ headerBuffer.position(0);
+ headerBuffer.get(key);
+ final byte[] value = new byte[valueEnd - valueStart - 1];
+ headerBuffer.position(valueStart);
+ headerBuffer.get(value);
+ headers.put(new String(key), new String(value));
+
+ headerBuffer.get(); // get the \r
+ headerBuffer.get(); // get the \n
+
+ headerBuffer.compact();
+ headerBuffer.flip();
+
+ keyEnd = valueStart = 0;
+ }
+ }
+ }
+ }
+ }
+ // if we got here, then it doesn't have space left and we haven't finished
+ LOG.log(Level.WARNING, "got a header that was too long. headerBuffer should be made bigger");
+ return false;
+ }
+
+ @Override
+ public double getTimestamp() {
+ if (headers.containsKey("X-Timestamp")) {
+ return Double.parseDouble(headers.get("X-Timestamp"));
+ } else {
+ throw new UnsupportedOperationException("source stream doesn't have X-Timestamp headers");
+ }
+ }
+
+}
diff --git a/aos/linux_code/camera/java/aos/DebugServer.java b/aos/linux_code/camera/java/aos/DebugServer.java
new file mode 100644
index 0000000..398cb11
--- /dev/null
+++ b/aos/linux_code/camera/java/aos/DebugServer.java
@@ -0,0 +1,357 @@
+package aos;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import com.googlecode.javacv.cpp.opencv_core.IplImage;
+
+/**
+ * A server that serves {@link ServableImage}s.
+ */
+public class DebugServer {
+ private static final String initialHeaderString = "HTTP/1.0 200 OK\r\n"
+ + "Connection: close\r\n"
+ + "Server: AOS/0.0 Vision Code Debug\r\n"
+ + "Cache-Control: no-store, no-cache, must-revalidate, pre-check=0, post-check=0, max-age=0\r\n"
+ + "Pragma: no-cache\r\n"
+ + "Expires: Mon, 3 Jan 2000 12:34:56 GMT\r\n"
+ + "Content-Type: multipart/x-mixed-replace; boundary=boundarydonotcross\r\n";
+ private static final String intermediateHeaderFormat = "\r\n--boundarydonotcross\r\n"
+ + "Content-Type: image/bmp\r\n"
+ + "Content-Length: %d\r\n"
+ + "X-Timestamp: %f\r\n"
+ + "\r\n";
+ private static final ByteBuffer initialHeader;
+ private static final Pattern headerPattern;
+ static {
+ initialHeader = ByteBuffer.wrap(initialHeaderString.getBytes())
+ .asReadOnlyBuffer();
+ headerPattern = Pattern.compile("^GET ([^? ]+)(?:\\?i=(\\S*))? HTTP/.*\r\n.*$",
+ Pattern.DOTALL);
+ }
+
+ private static final Logger LOG = Logger.getLogger(DebugServer.class
+ .getName());
+ private final ServerSocketChannel server;
+ private final Collection<Client> clients = new ArrayList<Client>();
+ private final Map<String, ServableImage> images = new HashMap<String, ServableImage>();
+ private final Map<String, Palette> palettes = new HashMap<String, Palette>();
+ private double timestamp;
+
+ public static class Palette {
+ private final ByteArrayOutputStream out = new ByteArrayOutputStream();
+ private int entries = 0;
+
+ /**
+ * Adds a new color. All 4 arguments are unsigned bytes.
+ * @param r red
+ * @param g green
+ * @param b blue
+ * @param a alpha (doesn't seem to work)
+ * @return this
+ */
+ public Palette add(int r, int g, int b, int a) {
+ out.write(b);
+ out.write(g);
+ out.write(r);
+ out.write(a);
+ ++entries;
+ return this;
+ }
+
+ private int entries() {
+ return entries;
+ }
+ private int bytes() {
+ return entries * 4;
+ }
+ private void writeTo(ByteBuffer buffer) {
+ buffer.put(out.toByteArray());
+ }
+ }
+
+ private class Client {
+ private final int bmpHeaderSize;
+ private int bmpType;
+ private Palette palette;
+
+ private final SocketChannel channel;
+ private ServableImage image = null;
+ private IplImage img;
+ private int index;
+ private final ByteBuffer[] buffers;
+ private final ByteBuffer initial = initialHeader.duplicate();
+ private final ByteBuffer read = ByteBuffer.allocate(1024);
+
+ public Client(SocketChannel channel) throws IOException {
+ this.channel = channel;
+ channel.configureBlocking(false);
+
+ if (bmpType == 3) {
+ bmpHeaderSize = 122;
+ } else if (bmpType == 0) {
+ bmpHeaderSize = 54;
+ } else {
+ throw new AssertionError("unknown bmpType value " + bmpType);
+ }
+ // [0] gets filled in by createBmpHeader which writes the header into [1]
+ // [2] gets set to the image buffer
+ buffers = new ByteBuffer[] { null, ByteBuffer.allocate(2048), null };
+ }
+
+ public void register(Selector selector) throws ClosedChannelException {
+ channel.register(
+ selector,
+ (image == null) ? SelectionKey.OP_READ
+ : SelectionKey.OP_WRITE, this);
+ }
+
+ public void close() {
+ if (image != null) {
+ image.setDebugging(false);
+ }
+ if (channel != null) {
+ try {
+ channel.close();
+ } catch (IOException e) {
+ LOG.log(Level.WARNING,
+ "encountered error when closing channel", e);
+ }
+ }
+ }
+
+ private void createBmpHeader(ByteBuffer buffer) {
+ // <http://en.wikipedia.org/wiki/BMP_file_format#File_structure>
+ // explains what these all are
+ // signed/unsigned numbers don't matter because they'd better not be
+ // that big
+ final int paletteSize = (palette == null) ? 0 : palette.bytes();
+ buffers[0] = ByteBuffer.wrap(String.format(intermediateHeaderFormat,
+ bmpHeaderSize + paletteSize + image.imageSize(), timestamp).getBytes());
+ buffer.order(ByteOrder.LITTLE_ENDIAN);
+ buffer.put((byte) 'B').put((byte) 'M');
+ buffer.putInt(bmpHeaderSize + paletteSize + image.imageSize());
+ buffer.putInt(0); // skip ahead 4 bytes
+ buffer.putInt(bmpHeaderSize + paletteSize); // offset to start of image data
+ buffer.putInt(bmpHeaderSize - 14); // size of the rest of the header
+ // BMPs expect image data bottom to top, so -height to fix that
+ buffer.putInt(ImageGetter.width).putInt(-ImageGetter.height);
+ buffer.putShort((short) 1).putShort(image.bitsPerPixel());
+ buffer.putInt(bmpType);
+ buffer.putInt(image.imageSize()); // number of bytes in the actual
+ // image
+ buffer.putInt(2835).putInt(2835); // physical resolution; don't
+ // think it matters
+ buffer.putInt((palette == null) ? 0 : palette.entries());
+ buffer.putInt(0); // # of important colors (0 means all)
+ if (palette != null) {
+ palette.writeTo(buffer);
+ }
+ final int expected;
+ if (bmpType == 0) { // BI_RGB
+ expected = bmpHeaderSize + paletteSize;
+ } else if (bmpType == 3) { // BI_BITFIELDS
+ buffer.putInt(0x0000FF00).putInt(0x00FF0000).putInt(0xFF000000)
+ .putInt(0); // RGBA bitmasks
+ buffer.putInt(0x57696E20); // LCS_WINDOWS_COLOR_SPACE
+ expected = bmpHeaderSize - 48;
+ } else {
+ throw new AssertionError("unknown bmpType value " + bmpType);
+ }
+ if (buffer.position() != expected) {
+ throw new AssertionError(
+ "header ended up the wrong size. expected "
+ + expected + " but got "
+ + buffer.position());
+ }
+ buffer.limit(bmpHeaderSize + paletteSize);
+ buffer.rewind();
+ }
+
+ /**
+ * Does anything that this one can right now.
+ *
+ * @return whether or not to {@link #close()} and remove this one
+ */
+ public boolean run() throws IOException {
+ if (image == null) {
+ final int bytesRead = channel.read(read);
+ final String readString = new String(read.array(), 0,
+ read.position());
+ LOG.log(Level.INFO, "read " + bytesRead
+ + " header bytes position=" + read.position()
+ + " string='" + readString + "'");
+
+ final Matcher matcher = headerPattern.matcher(readString);
+ if (matcher.matches()) {
+ final String url = matcher.group(1);
+ image = images.get(url);
+ if (image == null) {
+ LOG.log(Level.INFO, "couldn't find an image for url '"
+ + url + "'. dropping client");
+ return true;
+ } else {
+ LOG.log(Level.INFO, "found an image for url '"
+ + url + "'");
+ }
+ palette = palettes.get(url);
+ bmpType = 0; // could change this in the future
+ createBmpHeader(buffers[1]);
+ image.setDebugging(true);
+ final String indexString = matcher.group(2);
+ if (indexString != null) {
+ index = Integer.parseInt(indexString);
+ } else {
+ index = 0;
+ }
+ LOG.log(Level.INFO, "using index " + index);
+ } else if (!read.hasRemaining()) {
+ read.flip();
+ LOG.log(Level.WARNING,
+ "ran out of buffer space reading the header. currently have '"
+ + readString + "'. dropping connection");
+ return true;
+ } else if (bytesRead == -1) {
+ read.flip();
+ LOG.log(Level.WARNING,
+ "reached end of stream for headers without getting anything valid. currently have "
+ + read.limit()
+ + " bytes ='"
+ + readString
+ + "'. dropping connection");
+ return true;
+ }
+ } else if (initial.hasRemaining()) {
+ channel.write(initial);
+ } else {
+ if (buffers[2] == null) {
+ img = image.getSnapshot(index);
+ if (img == null) {
+ return false;
+ } else {
+ buffers[2] = img.getByteBuffer();
+ LOG.log(Level.FINE, "got " + buffers[2]
+ + " from the image");
+ }
+ }
+ channel.write(buffers);
+ boolean remaining = false;
+ for (ByteBuffer c : buffers) {
+ if (c.hasRemaining()) {
+ remaining = true;
+ }
+ }
+ if (!remaining) {
+ for (ByteBuffer c : buffers) {
+ c.rewind();
+ }
+ buffers[2] = null;
+ image.releaseSnapshot(index, img);
+ }
+ }
+ return false;
+ }
+ }
+
+ public DebugServer(int port) throws IOException {
+ server = ServerSocketChannel.open();
+ server.configureBlocking(false);
+ server.socket().bind(new InetSocketAddress(port), 10);
+ new Thread("DebugServer") {
+ @Override
+ public void run() {
+ try {
+ loop();
+ } catch (Throwable e) {
+ LOG.log(Level.SEVERE,
+ "trouble running the server loop", e);
+ System.exit(1);
+ }
+ }
+ }.start();
+ }
+
+ public void addImage(String name, ServableImage image) {
+ if (image.bitsPerPixel() != 24) {
+ throw new IllegalArgumentException("only 24-bit images are supported");
+ // could support 16 and 32 bpp images by using bmpType of 3
+ }
+ images.put(name, image);
+ }
+ public void addImage(String name, ServableImage image, Palette palette) {
+ if (image.bitsPerPixel() != 8) {
+ throw new IllegalArgumentException("only 8-bit images are supported");
+ // everything should work for 1, 2, and 4 bpp ones too except for padding etc
+ }
+ if (palette.entries() > (1 << image.bitsPerPixel())) {
+ throw new IllegalArgumentException("too many colors in the palette");
+ }
+ images.put(name, image);
+ palettes.put(name, palette);
+ }
+ /**
+ * This timestamp is written out in the debugging images.
+ * @param timestamp the current timestamp
+ */
+ public void setTimestamp(double timestamp) {
+ this.timestamp = timestamp;
+ }
+
+ private void loop() throws IOException {
+ final Selector selector = Selector.open();
+ server.register(selector, SelectionKey.OP_ACCEPT);
+ while (true) {
+ try {
+ for (Client c : clients) {
+ c.register(selector);
+ }
+ selector.select();
+ for (final Iterator<SelectionKey> i = selector.selectedKeys()
+ .iterator(); i.hasNext();) {
+ final SelectionKey c = i.next();
+ if (c.isAcceptable()) {
+ // there's only 1 socket there that can accept
+ final SocketChannel channel = server.accept();
+ if (channel != null) {
+ clients.add(new Client(channel));
+ }
+ } else {
+ final Client client = (Client) c.attachment();
+ try {
+ if (client.run()) {
+ client.close();
+ clients.remove(client);
+ }
+ } catch (Exception e) {
+ LOG.log(Level.INFO, "dropping client " + client
+ + " because it's run() threw an exception",
+ e);
+ client.close();
+ clients.remove(client);
+ }
+ }
+ i.remove();
+ }
+ } catch (IOException e) {
+ LOG.log(Level.WARNING, "trouble running the server loop", e);
+ }
+ }
+ }
+}
diff --git a/aos/linux_code/camera/java/aos/ImageGetter.java b/aos/linux_code/camera/java/aos/ImageGetter.java
new file mode 100644
index 0000000..f0f1063
--- /dev/null
+++ b/aos/linux_code/camera/java/aos/ImageGetter.java
@@ -0,0 +1,24 @@
+package aos;
+
+import com.googlecode.javacv.cpp.opencv_core;
+import com.googlecode.javacv.cpp.opencv_core.IplImage;
+
+/**
+ * An object that can retrieve images from somewhere.
+ */
+public interface ImageGetter {
+ public static int width = 640, height = 480;
+
+ /**
+ * Gets an image.
+ * @param out Where to write the image to. Must be a 3-channel {@link opencv_core#IPL_DEPTH_8U} image.
+ * @return whether it succeeded or not
+ */
+ public boolean get(IplImage out);
+ /**
+ * Only valid after a successful {@link #get()}.
+ * @return The timestamp from the most recent frame. Will be in seconds with at least ms accuracy.
+ */
+ public double getTimestamp();
+}
+
diff --git a/aos/linux_code/camera/java/aos/JPEGDecoder.java b/aos/linux_code/camera/java/aos/JPEGDecoder.java
new file mode 100644
index 0000000..63d12fd
--- /dev/null
+++ b/aos/linux_code/camera/java/aos/JPEGDecoder.java
@@ -0,0 +1,29 @@
+package aos;
+
+import java.nio.ByteBuffer;
+
+import com.googlecode.javacv.cpp.opencv_core;
+import com.googlecode.javacv.cpp.opencv_core.IplImage;
+
+/**
+ * Efficiently decodes a JPEG image from a @{class ByteBuffer} into an @{class IplImage}.
+ * Instances are not safe for use from multiple threads.
+ * The first use of an instance allocates some largish buffers which are never freed.
+ */
+public class JPEGDecoder {
+ private final long[] state = new long[1];
+
+ /**
+ * @param in Must be direct. The {@link ByteBuffer#limit()} of it will be respected.
+ * @param out Where to write the decoded image to. Must be a 3-channel {@link opencv_core#IPL_DEPTH_8U} image.
+ * Will be written in the RGB color space.
+ * @return Whether or not it succeeded. If not, {@code out} is undefined.
+ */
+ public boolean decode(ByteBuffer in, IplImage out) {
+ if (out.nChannels() != 3 || out.depth() != opencv_core.IPL_DEPTH_8U) {
+ throw new IllegalArgumentException("out is invalid");
+ }
+ return Natives.decodeJPEG(state, in, in.limit(), out.getByteBuffer());
+ }
+}
+
diff --git a/aos/linux_code/camera/java/aos/JPEGImageGetter.java b/aos/linux_code/camera/java/aos/JPEGImageGetter.java
new file mode 100644
index 0000000..84296e7
--- /dev/null
+++ b/aos/linux_code/camera/java/aos/JPEGImageGetter.java
@@ -0,0 +1,26 @@
+package aos;
+
+import java.nio.ByteBuffer;
+
+import com.googlecode.javacv.cpp.opencv_core.IplImage;
+
+/**
+ * Helper class for {@link ImageGetter}s that return JPEG images.
+ */
+public abstract class JPEGImageGetter implements ImageGetter {
+
+ private final JPEGDecoder decoder = new JPEGDecoder();
+
+ @Override
+ public boolean get(IplImage out) {
+ final ByteBuffer jpeg = getJPEG();
+ if (jpeg == null) return false;
+ final boolean r = decoder.decode(jpeg, out);
+ release();
+ return r;
+ }
+
+ protected abstract ByteBuffer getJPEG();
+ protected void release() {}
+
+}
diff --git a/aos/linux_code/camera/java/aos/JavaCVImageGetter.java b/aos/linux_code/camera/java/aos/JavaCVImageGetter.java
new file mode 100644
index 0000000..67398f0
--- /dev/null
+++ b/aos/linux_code/camera/java/aos/JavaCVImageGetter.java
@@ -0,0 +1,56 @@
+package aos;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import com.googlecode.javacv.FrameGrabber;
+import com.googlecode.javacv.cpp.opencv_core.IplImage;
+
+
+/**
+ * Adapts between the JavaCV {@link FrameGrabber} and {@link ImageGetter}.
+ * There is (at least) 1 extra copy involved, so this shouldn't be used if you care about speed.
+ */
+public class JavaCVImageGetter implements ImageGetter {
+ private static final Logger LOG = Logger.getLogger(JavaCVImageGetter.class
+ .getName());
+ private final FrameGrabber grabber;
+
+ public JavaCVImageGetter(FrameGrabber grabber) {
+ this.grabber = grabber;
+ if (grabber.getImageWidth() != width || grabber.getImageHeight() != height) {
+ if (grabber.getImageWidth() == 0 && grabber.getImageHeight() == 0) {
+ LOG.log(Level.WARNING, "grabber says it will give 0x0 images at the start. ignoring");
+ } else {
+ throw new IllegalArgumentException("grabber says it will give images that are the wrong size!!");
+ }
+ }
+ }
+
+ @Override
+ public boolean get(IplImage out) {
+ try {
+ final IplImage frame = grabber.grab();
+ if (grabber.getImageWidth() != width || grabber.getImageHeight() != height) {
+ LOG.log(Level.SEVERE, "grabber says it will give the wrong size images");
+ return false;
+ }
+ if (out.imageSize() != frame.imageSize()) {
+ LOG.log(Level.SEVERE, "the grabber gave a " + frame.imageSize() + "-byte image" +
+ "but a " + out.imageSize() + "-byte image was passed in");
+ return false;
+ }
+ out.getByteBuffer().put(frame.getByteBuffer());
+ return true;
+ } catch (FrameGrabber.Exception e) {
+ LOG.log(Level.WARNING, "grabber.grab() threw an exception", e);
+ return false;
+ }
+ }
+
+ @Override
+ public double getTimestamp() {
+ // grabber.getTimestamp seems to be in ms (all the implementations are at least)
+ return grabber.getTimestamp() / 1000.0;
+ }
+}
diff --git a/aos/linux_code/camera/java/aos/NativeBufferError.java b/aos/linux_code/camera/java/aos/NativeBufferError.java
new file mode 100644
index 0000000..41c794d
--- /dev/null
+++ b/aos/linux_code/camera/java/aos/NativeBufferError.java
@@ -0,0 +1,22 @@
+package aos;
+
+public class NativeBufferError extends NativeError {
+ private static final long serialVersionUID = -5298480149664213316L;
+
+ public NativeBufferError() {
+ super();
+ }
+
+ public NativeBufferError(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public NativeBufferError(String message) {
+ super(message);
+ }
+
+ public NativeBufferError(Throwable cause) {
+ super(cause);
+ }
+
+}
diff --git a/aos/linux_code/camera/java/aos/NativeError.java b/aos/linux_code/camera/java/aos/NativeError.java
new file mode 100644
index 0000000..d410234
--- /dev/null
+++ b/aos/linux_code/camera/java/aos/NativeError.java
@@ -0,0 +1,25 @@
+package aos;
+
+/**
+ * Represents an error from native code.
+ */
+public class NativeError extends Error {
+ private static final long serialVersionUID = 7394872852984037261L;
+
+ public NativeError() {
+ super();
+ }
+
+ public NativeError(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public NativeError(String message) {
+ super(message);
+ }
+
+ public NativeError(Throwable cause) {
+ super(cause);
+ }
+
+}
diff --git a/aos/linux_code/camera/java/aos/NativeLoader.java b/aos/linux_code/camera/java/aos/NativeLoader.java
new file mode 100644
index 0000000..d4fe7a8
--- /dev/null
+++ b/aos/linux_code/camera/java/aos/NativeLoader.java
@@ -0,0 +1,14 @@
+package aos;
+
+/**
+ * Provides support for dealing with loading native code.
+ */
+public class NativeLoader {
+ /**
+ * Loads a native library.
+ * @param name the name of the gyp shared_library or loadable_module target to load
+ */
+ public static void load(String name) {
+ System.load(System.getProperty("one-jar.expand.dir") + "/so_libs/lib" + name + ".so");
+ }
+}
diff --git a/aos/linux_code/camera/java/aos/Natives.java b/aos/linux_code/camera/java/aos/Natives.java
new file mode 100644
index 0000000..8ea4e01
--- /dev/null
+++ b/aos/linux_code/camera/java/aos/Natives.java
@@ -0,0 +1,82 @@
+package aos;
+
+import com.googlecode.javacv.cpp.opencv_core;
+
+import java.nio.ByteBuffer;
+import java.util.logging.Level;
+
+/**
+ * <p>Package-private class that has all of the native functions in it to make them easier to implement.</p>
+ * <p>WARNING: The raw native functions are <b>NOT</b> thread-safe!!!!! Any java functions that need to be thread safe MUST be synchronized in JAVA!</p>
+ */
+class Natives {
+ static {
+ NativeLoader.load("aos_camera");
+ nativeInit(ImageGetter.width, ImageGetter.height);
+ }
+ private static native void nativeInit(int width, int height);
+ /**
+ * Empty function to make sure the class gets loaded (which means loading the native library).
+ */
+ public static void ensureLoaded() {}
+
+ /**
+ * Decodes a JPEG from in into out. Both buffers must be direct.
+ * @param state a long[1] for storing thread-local state
+ * @param in the JPEG to decode
+ * @param inBytes how many bytes long the JPEG is
+ * @param out the buffer to write the decoded image into
+ * @return Whether or not it succeeded. If not, {@code out} is undefined.
+ */
+ public static native boolean decodeJPEG(long[] state, ByteBuffer in, int inBytes, ByteBuffer out);
+
+ /**
+ * Thresholds in into out. Both buffers must be direct.
+ * All of the short arguments should be unsigned bytes. The min and max parameters specify what colors to accept.
+ * @param in The image to threshold. Must be a 3-channel {@link opencv_core#IPL_DEPTH_8U} image buffer in the HSV color space.
+ * @param out Where to write the thresholded image to. Must be a 1-channel {@link opencv_core#IPL_DEPTH_8U} image buffer.
+ * @param hoffset An offset to be added to the hue value before comparing it to {@code hmin} and {@code hmax}.
+ * The addition will be performed to a {@code uint8_t}, which will wrap around. This means that it must be positive.
+ * Useful for finding red values.
+ */
+ public static native void threshold(ByteBuffer in, ByteBuffer out, short hoffset, char hmin, char hmax,
+ char smin, char smax, char vmin, char vmax);
+
+ /**
+ * Converts the colors from in to the format required for dumping them into a BMP image.
+ * @param in The image to convert. Must be a 3-channel {@link opencv_core#IPL_DEPTH_8U} image buffer in the regular (BGR I think...) color space.
+ * @param out Where to write the converted image to. Must be a 3-channel {@link opencv_core#IPL_DEPTH_8U} image buffer.
+ */
+ public static native void convertBGR2BMP(ByteBuffer in, ByteBuffer out);
+
+ /**
+ * Retrieves a JPEG image from the queue system. Will block until a new one is ready.
+ * @param id from {@link #queueInit()}
+ * @return Will be direct. This buffer <b>must not EVER</b> be written to.
+ */
+ public static native ByteBuffer queueGetJPEG(long id);
+ /**
+ * Retrieves the latest frame timestamp from the queue system. Must only be called between {@link #queueGetJPEG} and {@link #queueReleaseJPEG}.
+ * @param id from {@link #queueInit()}
+ * @return a timestamp
+ */
+ public static native double queueGetTimestamp(long id);
+ /**
+ * Releases the last image retrieved from the queue system. The result of the last {@link #queueGetJPEG()} will now be invalid.
+ * @param id from {@link #queueInit()}
+ */
+ public static native void queueReleaseJPEG(long id);
+ /**
+ * Prepares to start retrieving JPEGs from the queues.
+ * @return the ID to pass to the other queue functions
+ */
+ public static native long queueInit();
+
+ /**
+ * Puts the given message into the logging framework.
+ * @param message the complete log message
+ * @param level the level (from {@link Level#intValue()}
+ */
+ public static native void LOG(String message, int level);
+}
+
diff --git a/aos/linux_code/camera/java/aos/QueueImageGetter.java b/aos/linux_code/camera/java/aos/QueueImageGetter.java
new file mode 100644
index 0000000..7ed2f65
--- /dev/null
+++ b/aos/linux_code/camera/java/aos/QueueImageGetter.java
@@ -0,0 +1,34 @@
+package aos;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Retrieves images from the queue system.<br>
+ * {@link #getTimestamp()} returns the value from the v4l2 driver,
+ * which is a CLOCK_MONOTONIC time for most (all?) of them and definitely uvcvideo.
+ */
+public class QueueImageGetter extends JPEGImageGetter {
+ private final long nativeID;
+ public QueueImageGetter() {
+ nativeID = Natives.queueInit();
+ }
+
+ @Override
+ public ByteBuffer getJPEG() {
+ final ByteBuffer buf = Natives.queueGetJPEG(nativeID);
+ if (buf == null) {
+ return null;
+ }
+ return buf.asReadOnlyBuffer();
+ }
+
+ @Override
+ public void release() {
+ Natives.queueReleaseJPEG(nativeID);
+ }
+
+ @Override
+ public double getTimestamp() {
+ return Natives.queueGetTimestamp(nativeID);
+ }
+}
diff --git a/aos/linux_code/camera/java/aos/QueueLogHandler.java b/aos/linux_code/camera/java/aos/QueueLogHandler.java
new file mode 100644
index 0000000..3eb8938
--- /dev/null
+++ b/aos/linux_code/camera/java/aos/QueueLogHandler.java
@@ -0,0 +1,119 @@
+package aos;
+
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.util.logging.Formatter;
+import java.util.logging.Handler;
+import java.util.logging.Level;
+import java.util.logging.LogRecord;
+import java.util.logging.Logger;
+import java.util.logging.SimpleFormatter;
+
+/**
+ * <p>Sends messages to the AOS queue-based logging system. Also sends anything that's at least a {@link Level#WARNING} to {@link System#err}.</p>
+ * <p>Writes out each stack frame of exceptions as a separate line after the first one with a \t at the beginning.</p>
+ */
+public class QueueLogHandler extends Handler {
+ private Formatter defaultFormatter;
+
+ /**
+ * Sets up the logging framework to use an instance of this class for all logging and returns the newly created instance.
+ */
+ public static QueueLogHandler UseForAll() {
+ Natives.ensureLoaded();
+ final Logger top = Logger.getLogger("");
+ for (Handler c : top.getHandlers()) {
+ top.removeHandler(c);
+ }
+ QueueLogHandler instance = new QueueLogHandler();
+ top.addHandler(instance);
+ top.setLevel(Level.ALL);
+
+ Thread.setDefaultUncaughtExceptionHandler(new UncaughtExceptionHandler() {
+ @Override
+ public void uncaughtException(Thread thread, Throwable e) {
+ top.log(Level.SEVERE, "uncaught exception in thread " + thread, e);
+ System.exit(1);
+ }
+ });
+
+ return instance;
+ }
+
+ @Override
+ public void close() throws SecurityException {
+ }
+ @Override
+ public void flush() {
+ }
+
+ private Formatter findFormatter() {
+ final Formatter r = getFormatter();
+ if (r != null) {
+ return r;
+ }
+ if (defaultFormatter != null) {
+ return defaultFormatter;
+ }
+ return defaultFormatter = new SimpleFormatter();
+ }
+ @Override
+ public void publish(LogRecord record) {
+ /*final StringBuilder thrownString = new StringBuilder(0);
+ if (record.getThrown() != null) {
+ thrownString.append(": ");
+ thrownString.append(record.getThrown().toString());
+ for (StackTraceElement c : record.getThrown().getStackTrace()) {
+ thrownString.append(" > ");
+ thrownString.append(c.getClassName());
+ thrownString.append('.');
+ thrownString.append(c.getMethodName());
+ thrownString.append('(');
+ thrownString.append(c.getFileName());
+ thrownString.append(':');
+ thrownString.append(c.getLineNumber());
+ thrownString.append(')');
+ }
+ }
+ Natives.LOG(record.getSourceClassName() + ": " + record.getSourceMethodName() + ": " +
+ findFormatter().formatMessage(record) + thrownString, record.getLevel().intValue());*/
+ if (record.getThrown() instanceof UnsatisfiedLinkError || record.getThrown().getCause() instanceof UnsatisfiedLinkError) {
+ record.setThrown(new UnsatisfiedLinkError("are you running a JVM of the correct bitness?").initCause(record.getThrown()));
+ }
+ Natives.LOG(record.getSourceClassName() + ": " + record.getSourceMethodName() + ": " +
+ findFormatter().formatMessage(record), record.getLevel().intValue());
+ if (record.getThrown() != null) {
+ logException(record.getThrown(), record.getLevel().intValue(), false);
+ }
+
+ if (record.getLevel().intValue() >= Level.WARNING.intValue()) {
+ System.err.println(findFormatter().format(record));
+ }
+ }
+ private void logException(Throwable e, int level, boolean caused_by) {
+ final StringBuilder thrownString = new StringBuilder();
+ if (caused_by) {
+ thrownString.append("Caused by: ");
+ }
+ thrownString.append(e.getClass().getName());
+ thrownString.append(": ");
+ thrownString.append(e.getLocalizedMessage());
+ Natives.LOG(thrownString.toString(), level);
+ for (StackTraceElement c : e.getStackTrace()) {
+ thrownString.setLength(0);
+ thrownString.append("\t");
+ thrownString.append(c.getClassName());
+ thrownString.append('.');
+ thrownString.append(c.getMethodName());
+ thrownString.append('(');
+ thrownString.append(c.getFileName());
+ thrownString.append(':');
+ thrownString.append(c.getLineNumber());
+ thrownString.append(')');
+ Natives.LOG(thrownString.toString(), level);
+ }
+ if (e.getCause() != null) {
+ logException(e.getCause(), level, true);
+ }
+ }
+
+}
diff --git a/aos/linux_code/camera/java/aos/ServableImage.java b/aos/linux_code/camera/java/aos/ServableImage.java
new file mode 100644
index 0000000..c5db252
--- /dev/null
+++ b/aos/linux_code/camera/java/aos/ServableImage.java
@@ -0,0 +1,145 @@
+package aos;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Queue;
+import java.util.logging.Logger;
+
+import com.googlecode.javacv.cpp.opencv_core.IplImage;
+
+/**
+ * Provides {@link IplImage}s that can be used with a {@link DebugServer}.
+ */
+public class ServableImage {
+ @SuppressWarnings("unused")
+ private static final Logger LOG = Logger.getLogger(ServableImage.class
+ .getName());
+ private final int width, height, depth, channels;
+ private final ArrayList<Queue<IplImage>> queues = new ArrayList<Queue<IplImage>>();
+ private final ArrayList<IplImage> snapshots = new ArrayList<IplImage>();
+ private final IplImage current;
+ private int debugging = 0;
+
+ public ServableImage(int width, int height, int depth, int channels) {
+ this.width = width;
+ this.height = height;
+ this.depth = depth;
+ this.channels = channels;
+
+ current = IplImage.create(width, height, depth, channels);
+ }
+
+ /**
+ * @return the number of bytes in each image
+ */
+ public int imageSize() {
+ return width * height * depth * channels / 8;
+ }
+
+ /**
+ * @return the number of bits in each pixel
+ */
+ public short bitsPerPixel() {
+ return (short) (depth * channels);
+ }
+
+ /**
+ * Retrieves an image that should be used for debugging. It clears the value
+ * when called. {@link #releastSnapshot} MUST be called with the result.
+ *
+ * @param i
+ * Which snapshot to retrieve. 0 means the most recent final
+ * image.
+ * @return The most recent image at this index. {@code null} if there isn't
+ * a new one since last time this function was called. Will be in
+ * the correct color space to dump into a BMP image if this is a
+ * 3-channel image.
+ */
+ public synchronized IplImage getSnapshot(int i) {
+ if (snapshots.size() > i) {
+ return snapshots.get(i);
+ } else {
+ return null;
+ }
+ }
+
+ public synchronized void releaseSnapshot(int i, IplImage image) {
+ queues.get(i).add(image);
+ }
+
+ /**
+ * This function will return the same image if called repeatedly until
+ * {@link #releaseImage} is called.
+ *
+ * @return the current image
+ */
+ public synchronized IplImage getImage() {
+ return current;
+ }
+
+ /**
+ * Releases the current image (to be potentially sent out for debugging and
+ * then reused).
+ */
+ public synchronized void releaseImage() {
+ recordSnapshot(0);
+ }
+
+ /**
+ * Records a copy of the current image for debugging. It will be accessible
+ * at <{@code <base path>?i=<the value>}>. Does nothing unless
+ * {@link #isDebugging()}. This method <i>should</i> get called regardless
+ * of {@link #isDebugging()} to avoid outputting old debugging images. Note:
+ * 0 is not a valid snapshot number.
+ *
+ * @param i
+ * which snapshot this is
+ */
+ public synchronized void recordSnapshot(int i) {
+ while (queues.size() <= i) {
+ queues.add(null);
+ }
+ if (queues.get(i) == null) {
+ queues.set(i, new ArrayDeque<IplImage>());
+ }
+ while (snapshots.size() <= i) {
+ snapshots.add(null);
+ }
+ if (snapshots.get(i) != null) {
+ releaseSnapshot(i, snapshots.get(i));
+ }
+ if (isDebugging()) {
+ IplImage snapshot = queues.get(i).poll();
+ if (snapshot == null) {
+ snapshot = IplImage.create(width, height, depth, channels);
+ }
+ if (channels == 3) {
+ Natives.convertBGR2BMP(current.getByteBuffer(),
+ snapshot.getByteBuffer());
+ } else {
+ snapshot.getByteBuffer().put(current.getByteBuffer());
+ }
+ snapshots.set(i, snapshot);
+ } else {
+ snapshots.set(i, null);
+ }
+ }
+
+ /**
+ * @return whether or not to do extra debug work with the current image
+ */
+ public synchronized boolean isDebugging() {
+ return debugging > 0;
+ }
+
+ /**
+ * Whoever turns this on should turn it off when they're done.
+ */
+ synchronized void setDebugging(boolean debugging) {
+ if (debugging) {
+ ++this.debugging;
+ } else {
+ --this.debugging;
+ }
+ }
+}
diff --git a/aos/linux_code/camera/java/aos/Thresholder.java b/aos/linux_code/camera/java/aos/Thresholder.java
new file mode 100644
index 0000000..adf1b49
--- /dev/null
+++ b/aos/linux_code/camera/java/aos/Thresholder.java
@@ -0,0 +1,28 @@
+package aos;
+
+import com.googlecode.javacv.cpp.opencv_core;
+import com.googlecode.javacv.cpp.opencv_core.IplImage;
+
+public class Thresholder {
+ /**
+ * Thresholds in into out.
+ * All of the int arguments should be unsigned bytes except hoffset, which should be a signed byte.
+ * The min and max parameters specify what colors to accept.
+ * @param in The image to threshold. Must be a 3-channel {@link opencv_core#IPL_DEPTH_8U} image in the HSV color space.
+ * @param out Where to write the thresholded image to. Must be a 1-channel {@link opencv_core#IPL_DEPTH_8U} image.
+ * @param hoffset An offset to be added to the hue value before comparing it to {@code hmin} and {@code hmax}.
+ * The addition will be performed to a {@code uint8_t}, which will wrap around. This means that it must be positive.
+ * Useful for finding red values.
+ */
+ public static void threshold(IplImage in, IplImage out, int hoffset, int hmin, int hmax,
+ int smin, int smax, int vmin, int vmax) {
+ if (in.nChannels() != 3 || in.depth() != opencv_core.IPL_DEPTH_8U) {
+ throw new IllegalArgumentException("in is invalid");
+ }
+ if (out.nChannels() != 1 || out.depth() != opencv_core.IPL_DEPTH_8U) {
+ throw new IllegalArgumentException("out is invalid");
+ }
+ Natives.threshold(in.getByteBuffer(), out.getByteBuffer(), (short)hoffset,
+ (char)hmin, (char)hmax, (char)smin, (char)smax, (char)vmin, (char)vmax);
+ }
+}
diff --git a/aos/linux_code/camera/jni.cpp b/aos/linux_code/camera/jni.cpp
new file mode 100644
index 0000000..c547a9c
--- /dev/null
+++ b/aos/linux_code/camera/jni.cpp
@@ -0,0 +1,259 @@
+#include <setjmp.h>
+
+#include "jni/aos_Natives.h"
+#include "aos/linux_code/camera/Buffers.h"
+#include "aos/externals/libjpeg/include/jpeglib.h"
+#include "aos/common/logging/logging_impl.h"
+#include "aos/linux_code/init.h"
+
+using aos::camera::Buffers;
+
+namespace {
+
+jclass nativeError, bufferError, outOfMemoryError;
+bool findClass(JNIEnv *env, const char *name, jclass *out) {
+ jclass local = env->FindClass(name);
+ if (out == NULL) return true;
+ *out = static_cast<jclass>(env->NewGlobalRef(local));
+ if (out == NULL) return true;
+ env->DeleteLocalRef(local);
+ return false;
+}
+
+// Checks that the size is correct and retrieves the address.
+// An expected_size of 0 means don't check it.
+// If this function returns NULL, a java exception will already have been
+// thrown.
+void *getBufferAddress(JNIEnv *env, jobject obj, jlong expected_size) {
+ if (obj == NULL) {
+ env->ThrowNew(nativeError, "null buffer");
+ return NULL;
+ }
+ if (expected_size != 0 &&
+ expected_size != env->GetDirectBufferCapacity(obj)) {
+ char *str;
+ if (asprintf(&str, "wrong size. expected %lld but got %lld",
+ expected_size, env->GetDirectBufferCapacity(obj)) < 0) {
+ env->ThrowNew(bufferError, "creating message failed");
+ return NULL;
+ }
+ env->ThrowNew(bufferError, str);
+ free(str);
+ return NULL;
+ }
+ void *const r = env->GetDirectBufferAddress(obj);
+ if (r == NULL) {
+ env->ThrowNew(bufferError, "couldn't get address");
+ }
+ return r;
+}
+
+const int kImagePixels = Buffers::kWidth * Buffers::kHeight;
+
+void jpeg_log_message(jpeg_common_struct *cinfo, log_level level) {
+ char buf[LOG_MESSAGE_LEN];
+ cinfo->err->format_message(cinfo, buf);
+ log_do(level, "libjpeg: %s\n", buf);
+}
+void jpeg_error_exit(jpeg_common_struct *cinfo) __attribute__((noreturn));
+void jpeg_error_exit(jpeg_common_struct *cinfo) {
+ jpeg_log_message(cinfo, ERROR);
+ longjmp(*static_cast<jmp_buf *>(cinfo->client_data), 1);
+}
+void jpeg_emit_message(jpeg_common_struct *cinfo, int msg_level) {
+ if (msg_level < 0) {
+ jpeg_log_message(cinfo, WARNING);
+ longjmp(*static_cast<jmp_buf *>(cinfo->client_data), 2);
+ }
+ // this spews a lot of messages out
+ //jpeg_log_message(cinfo, DEBUG);
+}
+
+// The structure used to hold all of the state for the functions that deal with
+// a Buffers. A pointer to this structure is stored java-side.
+struct BuffersHolder {
+ Buffers buffers;
+ timeval timestamp;
+ BuffersHolder() : buffers() {}
+};
+
+} // namespace
+
+void Java_aos_Natives_nativeInit(JNIEnv *env, jclass, jint width, jint height) {
+ if (findClass(env, "aos/NativeError", &nativeError)) return;
+ if (findClass(env, "aos/NativeBufferError", &bufferError)) return;
+ if (findClass(env, "java/lang/OutOfMemoryError", &outOfMemoryError)) return;
+
+ aos::InitNRT();
+
+ if (width != Buffers::kWidth || height != Buffers::kHeight) {
+ env->ThrowNew(nativeError, "dimensions mismatch");
+ return;
+ }
+
+ LOG(INFO, "nativeInit finished\n");
+}
+
+static_assert(sizeof(jlong) >= sizeof(void *),
+ "can't stick pointers into jlongs");
+
+jboolean Java_aos_Natives_decodeJPEG(JNIEnv *env, jclass, jlongArray stateArray,
+ jobject inobj, jint inLength,
+ jobject outobj) {
+ unsigned char *const in = static_cast<unsigned char *>(
+ getBufferAddress(env, inobj, 0));
+ if (in == NULL) return false;
+ if (env->GetDirectBufferCapacity(inobj) < inLength) {
+ env->ThrowNew(bufferError, "in is too small");
+ return false;
+ }
+ unsigned char *const out = static_cast<unsigned char *>(
+ getBufferAddress(env, outobj, kImagePixels * 3));
+ if (out == NULL) return false;
+
+ jpeg_decompress_struct *volatile cinfo; // volatile because of the setjmp call
+
+ jlong state;
+ env->GetLongArrayRegion(stateArray, 0, 1, &state);
+ if (env->ExceptionCheck()) return false;
+ if (state == 0) {
+ cinfo = static_cast<jpeg_decompress_struct *>(malloc(sizeof(*cinfo)));
+ if (cinfo == NULL) {
+ env->ThrowNew(outOfMemoryError, "malloc for jpeg_decompress_struct");
+ return false;
+ }
+ cinfo->err = jpeg_std_error(static_cast<jpeg_error_mgr *>(
+ malloc(sizeof(*cinfo->err))));
+ cinfo->client_data = malloc(sizeof(jmp_buf));
+ cinfo->err->error_exit = jpeg_error_exit;
+ cinfo->err->emit_message = jpeg_emit_message;
+ // if the error handler sees a failure, it needs to clean up
+ // (jpeg_abort_decompress) and then return the failure
+ // set cinfo->client_data to the jmp_buf
+ jpeg_create_decompress(cinfo);
+ state = reinterpret_cast<intptr_t>(cinfo);
+ env->SetLongArrayRegion(stateArray, 0, 1, &state);
+ if (env->ExceptionCheck()) return false;
+ } else {
+ cinfo = reinterpret_cast<jpeg_decompress_struct *>(state);
+ }
+
+ // set up the jump buffer
+ // this has to happen each time
+ if (setjmp(*static_cast<jmp_buf *>(cinfo->client_data))) {
+ jpeg_abort_decompress(cinfo);
+ return false;
+ }
+
+ jpeg_mem_src(cinfo, in, inLength);
+ jpeg_read_header(cinfo, TRUE);
+ if (cinfo->image_width != static_cast<unsigned int>(Buffers::kWidth) ||
+ cinfo->image_height != static_cast<unsigned int>(Buffers::kHeight)) {
+ LOG(WARNING, "got (%ux%u) image but expected (%dx%d)\n", cinfo->image_width,
+ cinfo->image_height, Buffers::kWidth, Buffers::kHeight);
+ jpeg_abort_decompress(cinfo);
+ return false;
+ }
+ cinfo->out_color_space = JCS_RGB;
+ jpeg_start_decompress(cinfo);
+ if (cinfo->output_components != 3) {
+ LOG(WARNING, "libjpeg wants to return %d color components instead of 3\n",
+ cinfo->out_color_components);
+ jpeg_abort_decompress(cinfo);
+ return false;
+ }
+ if (cinfo->output_width != static_cast<unsigned int>(Buffers::kWidth) ||
+ cinfo->output_height != static_cast<unsigned int>(Buffers::kHeight)) {
+ LOG(WARNING, "libjpeg wants to return a (%ux%u) image but need (%dx%d)\n",
+ cinfo->output_width, cinfo->output_height,
+ Buffers::kWidth, Buffers::kHeight);
+ jpeg_abort_decompress(cinfo);
+ return false;
+ }
+
+ unsigned char *buffers[Buffers::kHeight];
+ for (int i = 0; i < Buffers::kHeight; ++i) {
+ buffers[i] = &out[i * Buffers::kWidth * 3];
+ }
+ while (cinfo->output_scanline < cinfo->output_height) {
+ jpeg_read_scanlines(cinfo, &buffers[cinfo->output_scanline],
+ Buffers::kHeight - cinfo->output_scanline);
+ }
+
+ jpeg_finish_decompress(cinfo);
+ return true;
+}
+
+void Java_aos_Natives_threshold(JNIEnv *env, jclass, jobject inobj,
+ jobject outobj, jshort hoffset, jchar hmin,
+ jchar hmax, jchar smin, jchar smax, jchar vmin,
+ jchar vmax) {
+ const unsigned char *__restrict__ const in = static_cast<unsigned char *>(
+ getBufferAddress(env, inobj, kImagePixels * 3));
+ if (in == NULL) return;
+ char *__restrict__ const out = static_cast<char *>(
+ getBufferAddress(env, outobj, kImagePixels));
+ if (out == NULL) return;
+
+ for (int i = 0; i < kImagePixels; ++i) {
+ const uint8_t h = in[i * 3] + static_cast<uint8_t>(hoffset);
+ out[i] = h > hmin && h < hmax &&
+ in[i * 3 + 1] > smin && in[i * 3 + 1] < smax &&
+ in[i * 3 + 2] > vmin && in[i * 3 + 2] < vmax;
+ }
+}
+void Java_aos_Natives_convertBGR2BMP(JNIEnv *env, jclass,
+ jobject inobj, jobject outobj) {
+ const char *__restrict__ const in = static_cast<char *>(
+ getBufferAddress(env, inobj, kImagePixels * 3));
+ if (in == NULL) return;
+ char *__restrict__ const out = static_cast<char *>(
+ getBufferAddress(env, outobj, kImagePixels * 3));
+ if (out == NULL) return;
+
+ for (int i = 0; i < kImagePixels; ++i) {
+ out[i * 3 + 0] = in[i * 3 + 2];
+ out[i * 3 + 1] = in[i * 3 + 1];
+ out[i * 3 + 2] = in[i * 3 + 0];
+ }
+}
+
+jlong Java_aos_Natives_queueInit(JNIEnv *, jclass) {
+ return reinterpret_cast<intptr_t>(new BuffersHolder());
+}
+void Java_aos_Natives_queueReleaseJPEG(JNIEnv *, jclass, jlong ptr) {
+ reinterpret_cast<BuffersHolder *>(ptr)->buffers.Release();
+}
+jobject Java_aos_Natives_queueGetJPEG(JNIEnv *env, jclass, jlong ptr) {
+ uint32_t size;
+ BuffersHolder *const holder = reinterpret_cast<BuffersHolder *>(ptr);
+ const void *const r = holder->buffers.GetNext(true, &size,
+ &holder->timestamp, NULL);
+ if (r == NULL) return NULL;
+ return env->NewDirectByteBuffer(const_cast<void *>(r), size);
+}
+jdouble Java_aos_Natives_queueGetTimestamp(JNIEnv *, jclass, jlong ptr) {
+ const BuffersHolder *const holder = reinterpret_cast<BuffersHolder *>(ptr);
+ return holder->timestamp.tv_sec + holder->timestamp.tv_usec / 1000000.0;
+}
+
+void Java_aos_Natives_LOG(JNIEnv *env, jclass, jstring message, jint jlevel) {
+ log_level level;
+ if (jlevel >= 1000) {
+ // Don't want to use FATAL because the uncaught java exception that is
+ // likely to come next will be useful.
+ level = ERROR;
+ } else if (jlevel >= 900) {
+ level = WARNING;
+ } else if (jlevel >= 800) {
+ level = INFO;
+ } else {
+ level = DEBUG;
+ }
+ // Can't use Get/ReleaseStringCritical because log_do might block waiting to
+ // put its message into the queue.
+ const char *const message_chars = env->GetStringUTFChars(message, NULL);
+ if (message_chars == NULL) return;
+ log_do(level, "%s\n", message_chars);
+ env->ReleaseStringUTFChars(message, message_chars);
+}
diff --git a/aos/linux_code/configuration.cc b/aos/linux_code/configuration.cc
new file mode 100644
index 0000000..6c39a93
--- /dev/null
+++ b/aos/linux_code/configuration.cc
@@ -0,0 +1,105 @@
+#include "aos/linux_code/configuration.h"
+
+#include <string.h>
+#include <errno.h>
+#include <stdlib.h>
+#include <sys/types.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <ifaddrs.h>
+#include <unistd.h>
+
+#include "aos/common/logging/logging.h"
+#include "aos/common/unique_malloc_ptr.h"
+#include "aos/common/once.h"
+
+namespace aos {
+namespace configuration {
+namespace {
+
+// Including the terminating '\0'.
+const size_t kMaxAddrLength = 18;
+
+// TODO(brians): Don't hard-code this.
+const char *const kLinuxNetInterface = "eth0";
+const in_addr *DoGetOwnIPAddress() {
+ ifaddrs *addrs;
+ if (getifaddrs(&addrs) != 0) {
+ LOG(FATAL, "getifaddrs(%p) failed with %d: %s\n", &addrs,
+ errno, strerror(errno));
+ }
+ // Smart pointers don't work very well for iterating through a linked list,
+ // but it does do a very nice job of making sure that addrs gets freed.
+ unique_c_ptr<ifaddrs, freeifaddrs> addrs_deleter(addrs);
+
+ for (; addrs != NULL; addrs = addrs->ifa_next) {
+ if (addrs->ifa_addr->sa_family == AF_INET) {
+ if (strcmp(kLinuxNetInterface, addrs->ifa_name) == 0) {
+ static const in_addr r =
+ reinterpret_cast<sockaddr_in *>(__builtin_assume_aligned(
+ addrs->ifa_addr, alignof(sockaddr_in)))->sin_addr;
+ return &r;
+ }
+ }
+ }
+ LOG(FATAL, "couldn't find an AF_INET interface named \"%s\"\n",
+ kLinuxNetInterface);
+}
+
+const char *DoGetRootDirectory() {
+ ssize_t size = 0;
+ char *r = NULL;
+ while (true) {
+ if (r != NULL) delete r;
+ size += 256;
+ r = new char[size];
+
+ ssize_t ret = readlink("/proc/self/exe", r, size);
+ if (ret < 0) {
+ if (ret != -1) {
+ LOG(WARNING, "it returned %zd, not -1\n", ret);
+ }
+ LOG(FATAL, "readlink(\"/proc/self/exe\", %p, %zu) failed with %d: %s\n",
+ r, size, errno, strerror(errno));
+ }
+ if (ret < size) {
+ void *last_slash = memrchr(r, '/', size);
+ if (last_slash == NULL) {
+ r[ret] = '\0';
+ LOG(FATAL, "couldn't find a '/' in \"%s\"\n", r);
+ }
+ *static_cast<char *>(last_slash) = '\0';
+ LOG(INFO, "got a root dir of \"%s\"\n", r);
+ return r;
+ }
+ }
+}
+
+const char *DoGetLoggingDirectory() {
+ static const char kSuffix[] = "/../../tmp/robot_logs";
+ const char *root = GetRootDirectory();
+ char *r = new char[strlen(root) + sizeof(kSuffix)];
+ strcpy(r, root);
+ strcat(r, kSuffix);
+ return r;
+}
+
+} // namespace
+
+const char *GetRootDirectory() {
+ static aos::Once<const char> once(DoGetRootDirectory);
+ return once.Get();
+}
+
+const char *GetLoggingDirectory() {
+ static aos::Once<const char> once(DoGetLoggingDirectory);
+ return once.Get();
+}
+
+const in_addr &GetOwnIPAddress() {
+ static aos::Once<const in_addr> once(DoGetOwnIPAddress);
+ return *once.Get();
+}
+
+} // namespace configuration
+} // namespace aos
diff --git a/aos/linux_code/configuration.h b/aos/linux_code/configuration.h
new file mode 100644
index 0000000..3568f28
--- /dev/null
+++ b/aos/linux_code/configuration.h
@@ -0,0 +1,31 @@
+#ifndef AOS_LINUX_CODE_CONFIGURATION_H_
+#define AOS_LINUX_CODE_CONFIGURATION_H_
+
+#include <stdint.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+
+namespace aos {
+
+// Holds global configuration data. All of the functions are safe to call
+// from wherever.
+namespace configuration {
+
+// Returns "our" IP address.
+const in_addr &GetOwnIPAddress();
+
+// Returns the "root directory" for this run. Under linux, this is the
+// directory where the executable is located (from /proc/self/exe)
+// The return value will always be to a static string, so no freeing is
+// necessary.
+const char *GetRootDirectory();
+// Returns the directory where logs get written. Relative to GetRootDirectory().
+// The return value will always be to a static string, so no freeing is
+// necessary.
+const char *GetLoggingDirectory();
+
+} // namespace configuration
+} // namespace aos
+
+#endif // AOS_LINUX_CODE_CONFIGURATION_H_
diff --git a/aos/linux_code/core/BinaryLogReader.cpp b/aos/linux_code/core/BinaryLogReader.cpp
new file mode 100644
index 0000000..74cc95c
--- /dev/null
+++ b/aos/linux_code/core/BinaryLogReader.cpp
@@ -0,0 +1,116 @@
+#include <stdio.h>
+#include <stdlib.h>
+#include <errno.h>
+#include <time.h>
+#include <string.h>
+#include <string>
+#include <unistd.h>
+#include <sys/types.h>
+#include <pwd.h>
+#include <fcntl.h>
+
+#include <map>
+
+#include "aos/linux_code/logging/linux_logging.h"
+#include "aos/linux_code/core/LogFileCommon.h"
+#include "aos/linux_code/init.h"
+#include "aos/linux_code/configuration.h"
+
+namespace aos {
+namespace logging {
+namespace linux_code {
+namespace {
+
+int BinaryLogReaderMain() {
+ InitNRT();
+
+ const char *folder = configuration::GetLoggingDirectory();
+ if (access(folder, R_OK | W_OK) == -1) {
+ LOG(FATAL, "folder '%s' does not exist. please create it\n", folder);
+ }
+ LOG(INFO, "logging to folder '%s'\n", folder);
+
+ const time_t t = time(NULL);
+ char *tmp;
+ if (asprintf(&tmp, "%s/aos_log-%jd", folder, static_cast<uintmax_t>(t)) ==
+ -1) {
+ fprintf(stderr,
+ "BinaryLogReader: couldn't create final name because of %d (%s)."
+ " exiting\n", errno, strerror(errno));
+ return EXIT_FAILURE;
+ }
+ char *tmp2;
+ if (asprintf(&tmp2, "%s/aos_log-current", folder) == -1) {
+ fprintf(stderr,
+ "BinaryLogReader: couldn't create symlink name because of %d (%s)."
+ " not creating current symlink\n", errno, strerror(errno));
+ } else {
+ if (unlink(tmp2) == -1 && (errno != EROFS && errno != ENOENT)) {
+ fprintf(stderr,
+ "BinaryLogReader: warning: unlink('%s') failed"
+ " because of %d (%s)\n",
+ tmp2, errno, strerror(errno));
+ }
+ if (symlink(tmp, tmp2) == -1) {
+ fprintf(stderr, "BinaryLogReader: warning: symlink('%s', '%s') failed"
+ " because of %d (%s)\n", tmp, tmp2, errno, strerror(errno));
+ }
+ free(tmp2);
+ }
+ int fd = open(tmp, O_SYNC | O_APPEND | O_RDWR | O_CREAT,
+ S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
+ free(tmp);
+ if (fd == -1) {
+ fprintf(stderr,
+ "BinaryLogReader: couldn't open file '%s' because of %d (%s)."
+ " exiting\n", tmp, errno, strerror(errno));
+ return EXIT_FAILURE;
+ }
+ LogFileAccessor writer(fd, true);
+
+ struct timespec timespec;
+ time_t last_sec = 0;
+ while (true) {
+ clock_gettime(CLOCK_MONOTONIC, ×pec);
+ if (last_sec != timespec.tv_sec) {
+ LOG(INFO, "msyncing output\n");
+ last_sec = timespec.tv_sec;
+ writer.Sync();
+ }
+
+ const LogMessage *const msg = ReadNext();
+ if (msg == NULL) continue;
+
+ // add 1 for terminating '\0'
+ size_t name_size = strlen(msg->name) + 1;
+ size_t message_size = strlen(msg->message) + 1;
+
+ LogFileMessageHeader *const output = writer.GetWritePosition(
+ sizeof(LogFileMessageHeader) + name_size + message_size);
+ char *output_strings = reinterpret_cast<char *>(output) + sizeof(*output);
+ output->name_size = name_size;
+ output->message_size = message_size;
+ output->source = msg->source;
+ output->level = msg->level;
+ output->time_sec = msg->seconds;
+ output->time_nsec = msg->nseconds;
+ output->sequence = msg->sequence;
+ memcpy(output_strings, msg->name, name_size);
+ memcpy(output_strings + name_size, msg->message, message_size);
+ futex_set(&output->marker);
+
+ logging::linux_code::Free(msg);
+ }
+
+ Cleanup();
+ return 0;
+}
+
+} // namespace
+} // namespace linux_code
+} // namespace logging
+} // namespace aos
+
+int main() {
+ return ::aos::logging::linux_code::BinaryLogReaderMain();
+}
diff --git a/aos/linux_code/core/LogDisplayer.cpp b/aos/linux_code/core/LogDisplayer.cpp
new file mode 100644
index 0000000..3a98dd4
--- /dev/null
+++ b/aos/linux_code/core/LogDisplayer.cpp
@@ -0,0 +1,170 @@
+#include <stdio.h>
+#include <stdlib.h>
+#include <getopt.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#include <inttypes.h>
+#include <errno.h>
+
+#include "aos/linux_code/core/LogFileCommon.h"
+#include "aos/common/logging/logging_impl.h"
+
+namespace {
+
+const char *kArgsHelp = "[OPTION]... [FILE]\n"
+ "Display log file FILE (created by BinaryLogReader) to stdout.\n"
+ "FILE is \"aos_log-current\" by default.\n"
+ "\n"
+ " -n, --name NAME only display entries from processes named NAME\n"
+ " -l, --level LEVEL "
+ "only display log entries at least as important as LEVEL\n"
+ " // -p, --pid PID only display log entries from process PID\n"
+ " -f, --follow "
+ "wait when the end of the file is reached (implies --end)\n"
+ " -t, --terminate stop when the end of file is reached (default)\n"
+ " -b, --beginning start at the beginning of the file (default)\n"
+ " -e, --end start at the end of the file\n"
+ " -s, --skip NUMBER skip NUMBER matching logs\n"
+ " // -m, --max NUMBER only display up to NUMBER logs\n"
+ " // -o, --format FORMAT use FORMAT to display log entries\n"
+ " -h, --help display this help and exit\n"
+ "\n"
+ "LEVEL must be DEBUG, INFO, WARNING, ERROR, or FATAL.\n"
+ " It defaults to INFO.\n"
+ "\n"
+ "TODO(brians) implement the commented out ones and changing FILE\n";
+
+void PrintHelpAndExit() {
+ fprintf(stderr, "Usage: %s %s", program_invocation_name, kArgsHelp);
+
+ exit(EXIT_SUCCESS);
+}
+
+} // namespace
+
+int main(int argc, char **argv) {
+ const char *filter_name = NULL;
+ log_level filter_level = INFO;
+ bool follow = false, start_at_beginning = true;
+ const char *filename = "aos_log-current";
+
+ while (true) {
+ static struct option long_options[] = {
+ {"name", required_argument, NULL, 'n'},
+ {"level", required_argument, NULL, 'l'},
+ {"pid", required_argument, NULL, 'p'},
+
+ {"follow", no_argument, NULL, 'f'},
+ {"terminate", no_argument, NULL, 't'},
+ {"beginning", no_argument, NULL, 'b'},
+ {"end", no_argument, NULL, 'e'},
+ {"skip", required_argument, NULL, 's'},
+ {"max", required_argument, NULL, 'm'},
+
+ {"format", required_argument, NULL, 'o'},
+
+ {"help", no_argument, NULL, 'h'},
+ {0, 0, 0, 0}
+ };
+ int option_index = 0;
+
+ const int c = getopt_long(argc, argv, "n:l:p:fts:m:o:h",
+ long_options, &option_index);
+ if (c == -1) { // if we're at the end
+ break;
+ }
+ switch (c) {
+ case 0:
+ fprintf(stderr, "LogDisplayer: got a 0 option but didn't set up any\n");
+ abort();
+ case 'n':
+ filter_name = optarg;
+ break;
+ case 'l':
+ filter_level = ::aos::logging::str_log(optarg);
+ if (filter_level == LOG_UNKNOWN) {
+ fprintf(stderr, "LogDisplayer: unknown log level '%s'\n", optarg);
+ exit(EXIT_FAILURE);
+ }
+ break;
+ case 'p':
+ abort();
+ break;
+ case 'f':
+ follow = true;
+ start_at_beginning = false;
+ break;
+ case 't':
+ follow = false;
+ break;
+ case 'b':
+ start_at_beginning = true;
+ break;
+ case 'e':
+ start_at_beginning = false;
+ break;
+ case 'm':
+ abort();
+ break;
+ case 'o':
+ abort();
+ break;
+ case 'h':
+ PrintHelpAndExit();
+ break;
+ case '?':
+ break;
+ default:
+ fprintf(stderr, "LogDisplayer: in a bad spot (%s: %d)\n",
+ __FILE__, __LINE__);
+ abort();
+ }
+ }
+
+ fprintf(stderr, "displaying down to level %s from file '%s'\n",
+ ::aos::logging::log_str(filter_level), filename);
+ if (optind < argc) {
+ fprintf(stderr, "non-option ARGV-elements: ");
+ while (optind < argc) {
+ fprintf(stderr, "%s\n", argv[optind++]);
+ }
+ }
+
+ int fd = open(filename, O_RDONLY);
+ if (fd == -1) {
+ fprintf(stderr, "error: couldn't open file '%s' for reading because of %s\n",
+ filename, strerror(errno));
+ exit(EXIT_FAILURE);
+ }
+ ::aos::logging::LogFileAccessor accessor(fd, false);
+ if (!start_at_beginning) {
+ accessor.MoveToEnd();
+ }
+ const ::aos::logging::LogFileMessageHeader *msg;
+ ::aos::logging::LogMessage log_message;
+ do {
+ msg = accessor.ReadNextMessage(follow);
+ if (msg == NULL) continue;
+ if (::aos::logging::log_gt_important(filter_level, msg->level)) continue;
+ if (filter_name != NULL &&
+ strcmp(filter_name,
+ reinterpret_cast<const char *>(msg) + sizeof(*msg)) != 0) {
+ continue;
+ }
+
+ log_message.source = msg->source;
+ log_message.sequence = msg->sequence;
+ log_message.level = msg->level;
+ log_message.seconds = msg->time_sec;
+ log_message.nseconds = msg->time_nsec;
+ strncpy(log_message.name,
+ reinterpret_cast<const char *>(msg) + sizeof(*msg),
+ sizeof(log_message.name));
+ strncpy(log_message.message,
+ reinterpret_cast<const char *>(msg) + sizeof(*msg) +
+ msg->name_size,
+ sizeof(log_message.message));
+ ::aos::logging::internal::PrintMessage(stdout, log_message);
+ } while (msg != NULL);
+}
diff --git a/aos/linux_code/core/LogFileCommon.h b/aos/linux_code/core/LogFileCommon.h
new file mode 100644
index 0000000..23ddc62
--- /dev/null
+++ b/aos/linux_code/core/LogFileCommon.h
@@ -0,0 +1,199 @@
+#ifndef AOS_LINUX_CODE_CORE_LOG_FILE_COMMON_H_
+#define AOS_LINUX_CODE_CORE_LOG_FILE_COMMON_H_
+
+#include <stdio.h>
+#include <errno.h>
+#include <string.h>
+#include <sys/mman.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <unistd.h>
+
+#include <algorithm>
+
+#include "aos/common/logging/logging_impl.h"
+
+namespace aos {
+namespace logging {
+
+// File format: {
+// LogFileMessageHeader header;
+// char *name; // of the process that wrote the message
+// char *message;
+// } not crossing kPageSize boundaries into the file.
+//
+// Field sizes designed to fit the various values from LogMessage even on
+// other machines (hopefully) because they're baked into the files.
+//
+// A lot of the fields don't have comments because they're the same as the
+// identically named fields in LogMessage.
+struct __attribute__((aligned)) LogFileMessageHeader {
+ // gets futex_set once this one has been written
+ // for readers keeping up with a live writer
+ //
+ // gets initialized to 0 by ftruncate
+ //
+ // there will be something here after the last log on a "page" set to 2
+ // (by the futex_set) to indicate that the next log is on the next page
+ mutex marker;
+ static_assert(sizeof(marker) == 4, "mutex changed size!");
+ log_level level;
+ static_assert(sizeof(level) == 1, "log_level changed size!");
+
+ uint32_t time_sec;
+ static_assert(sizeof(time_sec) >= sizeof(LogMessage::seconds),
+ "tv_sec won't fit");
+ uint32_t time_nsec;
+ static_assert(sizeof(time_nsec) >= sizeof(LogMessage::nseconds),
+ "tv_nsec won't fit");
+
+ int32_t source;
+ static_assert(sizeof(source) >= sizeof(LogMessage::source), "PIDs won't fit");
+ uint16_t sequence;
+ static_assert(sizeof(sequence) == sizeof(LogMessage::sequence),
+ "something changed");
+
+ // both including the terminating '\0'
+ uint32_t name_size;
+ uint32_t message_size;
+};
+static_assert(std::is_pod<LogFileMessageHeader>::value,
+ "LogFileMessageHeader will to get dumped to a file");
+
+// Handles the mmapping and munmapping for reading and writing log files.
+class LogFileAccessor {
+ private:
+ // The size of the chunks that get mmaped/munmapped together. Large enough so
+ // that not too much space is wasted and it's hopefully bigger than and a
+ // multiple of the system page size but small enough so that really large chunks
+ // of memory don't have to get mapped at the same time.
+ static const size_t kPageSize = 32768;
+ // What to align messages to. Necessary for futexes to work.
+ static const size_t kAlignment = 64;
+ static_assert(kAlignment >= __alignof__(mutex), "futexes will complain");
+
+ const int fd_;
+ const bool writable_;
+
+ off_t offset_; // into the file. will be aligned to kPageSize
+ char *current_;
+ size_t position_;
+
+ inline unsigned long SystemPageSize() {
+ static unsigned long r = sysconf(_SC_PAGESIZE);
+ return r;
+ }
+ void MapNextPage() {
+ if (writable_) {
+ if (ftruncate(fd_, offset_ + kPageSize) == -1) {
+ fprintf(stderr, "ftruncate(%d, %zd) failed with %d: %s. aborting\n",
+ fd_, kPageSize, errno, strerror(errno));
+ printf("see stderr\n");
+ abort();
+ }
+ }
+ current_ = static_cast<char *>(mmap(NULL, kPageSize,
+ PROT_READ | (writable_ ? PROT_WRITE : 0),
+ MAP_SHARED, fd_, offset_));
+ if (current_ == MAP_FAILED) {
+ fprintf(stderr, "mmap(NULL, %zd, PROT_READ | PROT_WRITE, MAP_SHARED, %d, %jd)"
+ " failed with %d: %s. aborting\n", kPageSize, fd_,
+ static_cast<intmax_t>(offset_), errno, strerror(errno));
+ printf("see stderr\n");
+ abort();
+ }
+ offset_ += kPageSize;
+ }
+ void Unmap(void *location) {
+ if (munmap(location, kPageSize) == -1) {
+ fprintf(stderr, "munmap(%p, %zd) failed with %d: %s. aborting\n",
+ location, kPageSize, errno, strerror(errno));
+ printf("see stderr\n");
+ abort();
+ }
+ }
+ public:
+ LogFileAccessor(int fd, bool writable) : fd_(fd), writable_(writable),
+ offset_(0), current_(0), position_(0) {
+ // check to make sure that mmap will allow mmaping in chunks of kPageSize
+ if (SystemPageSize() > kPageSize || (kPageSize % SystemPageSize()) != 0) {
+ fprintf(stderr, "LogFileCommon: system page size (%lu)"
+ " not compatible with kPageSize (%zd). aborting\n",
+ SystemPageSize(), kPageSize);
+ printf("see stderr\n");
+ abort();
+ }
+
+ MapNextPage();
+ }
+ // message_size should be the total number of bytes needed for the message
+ LogFileMessageHeader *GetWritePosition(size_t message_size) {
+ if (position_ + message_size + (kAlignment - (message_size % kAlignment)) +
+ sizeof(mutex) > kPageSize) {
+ char *const temp = current_;
+ MapNextPage();
+ if (futex_set_value(static_cast<mutex *>(static_cast<void *>(
+ &temp[position_])), 2) == -1) {
+ fprintf(stderr,
+ "LogFileCommon: futex_set_value(%p, 2) failed with %d: %s."
+ " readers will hang\n",
+ &temp[position_], errno, strerror(errno));
+ }
+ Unmap(temp);
+ position_ = 0;
+ }
+ LogFileMessageHeader *const r = static_cast<LogFileMessageHeader *>(
+ static_cast<void *>(¤t_[position_]));
+ position_ += message_size;
+ // keep it aligned for next time
+ position_ += kAlignment - (position_ % kAlignment);
+ return r;
+ }
+ // may only return NULL if wait is false
+ const LogFileMessageHeader *ReadNextMessage(bool wait) {
+ LogFileMessageHeader *r;
+ do {
+ r = static_cast<LogFileMessageHeader *>(
+ static_cast<void *>(¤t_[position_]));
+ if (wait) {
+ if (futex_wait(&r->marker) != 0) continue;
+ }
+ if (r->marker == 2) {
+ Unmap(current_);
+ MapNextPage();
+ position_ = 0;
+ r = static_cast<LogFileMessageHeader *>(static_cast<void *>(current_));
+ }
+ } while (wait && r->marker == 0);
+ if (r->marker == 0) {
+ return NULL;
+ }
+ position_ += sizeof(LogFileMessageHeader) + r->name_size + r->message_size;
+ // keep it aligned for next time
+ position_ += kAlignment - (position_ % kAlignment);
+ return r;
+ }
+
+ // asynchronously syncs all open mappings
+ void Sync() {
+ msync(current_, kPageSize, MS_ASYNC | MS_INVALIDATE);
+ }
+
+ void MoveToEnd() {
+ Unmap(current_);
+ struct stat info;
+ if (fstat(fd_, &info) == -1) {
+ fprintf(stderr, "LOgFileCommon: fstat(%d, %p) failed with %d: %s\n",
+ fd_, &info, errno, strerror(errno));
+ printf("see stderr\n");
+ abort();
+ }
+ offset_ = info.st_size - kPageSize;
+ MapNextPage();
+ }
+};
+
+} // namespace logging
+} // namespace aos
+
+#endif
diff --git a/aos/linux_code/core/LogStreamer.cpp b/aos/linux_code/core/LogStreamer.cpp
new file mode 100644
index 0000000..d570489
--- /dev/null
+++ b/aos/linux_code/core/LogStreamer.cpp
@@ -0,0 +1,53 @@
+#include <stdio.h>
+#include <stdlib.h>
+#include <errno.h>
+#include <time.h>
+#include <string.h>
+#include <string>
+#include <unistd.h>
+#include <sys/types.h>
+#include <pwd.h>
+#include <fcntl.h>
+#include <inttypes.h>
+
+#include "aos/linux_code/logging/linux_logging.h"
+#include "aos/linux_code/core/LogFileCommon.h"
+#include "aos/linux_code/init.h"
+#include "aos/linux_code/ipc_lib/queue.h"
+#include "aos/common/logging/logging_impl.h"
+#include "aos/common/time.h"
+
+namespace aos {
+namespace logging {
+namespace linux_code {
+namespace {
+
+int LogStreamerMain() {
+ InitNRT();
+
+ const time::Time now = time::Time::Now();
+ printf("starting at %" PRId32 "s%" PRId32 "ns-----------------------------\n",
+ now.sec(), now.nsec());
+
+ int index = 0;
+ while (true) {
+ const LogMessage *const msg = ReadNext(RawQueue::kBlock, &index);
+ if (msg == NULL) continue;
+
+ internal::PrintMessage(stdout, *msg);
+
+ logging::linux_code::Free(msg);
+ }
+
+ Cleanup();
+ return 0;
+}
+
+} // namespace
+} // namespace linux_code
+} // namespace logging
+} // namespace aos
+
+int main() {
+ return ::aos::logging::linux_code::LogStreamerMain();
+}
diff --git a/aos/linux_code/core/core.cc b/aos/linux_code/core/core.cc
new file mode 100644
index 0000000..c8cec67
--- /dev/null
+++ b/aos/linux_code/core/core.cc
@@ -0,0 +1,26 @@
+#include <sys/select.h>
+#include <stdlib.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <sys/types.h>
+
+#include <string>
+
+#include "aos/linux_code/init.h"
+
+// Initializes shared memory. This is the only file that will create the shared
+// memory file if it doesn't already exist (and set everything up).
+
+int main(int argc, char **argv) {
+ aos::InitCreate();
+
+ if (argc > 1) {
+ if (system((std::string("touch '") + argv[1] + "'").c_str()) != 0) {
+ fprintf(stderr, "`touch '%s'` failed\n", argv[1]);
+ exit(EXIT_FAILURE);
+ }
+ }
+
+ select(0, NULL, NULL, NULL, NULL); // wait forever
+ aos::Cleanup();
+}
diff --git a/aos/linux_code/core/core.gyp b/aos/linux_code/core/core.gyp
new file mode 100644
index 0000000..d68168b
--- /dev/null
+++ b/aos/linux_code/core/core.gyp
@@ -0,0 +1,50 @@
+{
+ 'targets': [
+ {
+ 'target_name': 'core',
+ 'type': 'executable',
+ 'sources': [
+ 'core.cc',
+ ],
+ 'dependencies': [
+ '<(AOS)/linux_code/linux_code.gyp:init',
+ ],
+ },
+ {
+ 'target_name': 'BinaryLogReader',
+ 'type': 'executable',
+ 'sources': [
+ 'BinaryLogReader.cpp',
+ ],
+ 'dependencies': [
+ '<(AOS)/build/aos.gyp:logging',
+ '<(AOS)/linux_code/linux_code.gyp:init',
+ '<(AOS)/linux_code/linux_code.gyp:configuration',
+ ],
+ },
+ {
+ 'target_name': 'LogStreamer',
+ 'type': 'executable',
+ 'sources': [
+ 'LogStreamer.cpp',
+ ],
+ 'dependencies': [
+ '<(AOS)/build/aos.gyp:logging',
+ '<(AOS)/linux_code/linux_code.gyp:init',
+ '<(AOS)/common/common.gyp:time',
+ '<(AOS)/linux_code/ipc_lib/ipc_lib.gyp:queue',
+ ],
+ },
+ {
+ 'target_name': 'LogDisplayer',
+ 'type': 'executable',
+ 'sources': [
+ 'LogDisplayer.cpp',
+ ],
+ 'dependencies': [
+ '<(AOS)/build/aos.gyp:logging',
+ '<(AOS)/linux_code/linux_code.gyp:init',
+ ],
+ },
+ ],
+}
diff --git a/aos/linux_code/init.cc b/aos/linux_code/init.cc
new file mode 100644
index 0000000..3c0704e
--- /dev/null
+++ b/aos/linux_code/init.cc
@@ -0,0 +1,115 @@
+#include "aos/linux_code/init.h"
+
+#include <stdio.h>
+#include <string.h>
+#include <sys/mman.h>
+#include <errno.h>
+#include <sched.h>
+#include <sys/resource.h>
+#include <asm-generic/resource.h> // for RLIMIT_RTTIME
+#include <sys/types.h>
+#include <unistd.h>
+#include <stdlib.h>
+#include <stdint.h>
+
+#include "aos/common/die.h"
+#include "aos/linux_code/logging/linux_logging.h"
+#include "aos/linux_code/ipc_lib/shared_mem.h"
+
+namespace aos {
+
+namespace {
+
+void SetSoftRLimit(int resource, rlim64_t soft, bool set_for_root) {
+ bool am_root = getuid() == 0;
+ if (set_for_root || !am_root) {
+ struct rlimit64 rlim;
+ if (getrlimit64(resource, &rlim) == -1) {
+ Die("%s-init: getrlimit64(%d) failed with %d (%s)\n",
+ program_invocation_short_name, resource, errno, strerror(errno));
+ }
+ rlim.rlim_cur = soft;
+ if (setrlimit64(resource, &rlim) == -1) {
+ Die("%s-init: setrlimit64(%d, {cur=%ju,max=%ju})"
+ " failed with %d (%s)\n", program_invocation_short_name,
+ resource, (uintmax_t)rlim.rlim_cur, (uintmax_t)rlim.rlim_max,
+ errno, strerror(errno));
+ }
+ }
+}
+
+// Common stuff that needs to happen at the beginning of both the realtime and
+// non-realtime initialization sequences. May be called twice.
+void InitStart() {
+ // Allow locking as much as we want into RAM.
+ SetSoftRLimit(RLIMIT_MEMLOCK, RLIM_INFINITY, false);
+
+ // Do create core files of unlimited size.
+ SetSoftRLimit(RLIMIT_CORE, RLIM_INFINITY, true);
+}
+
+int LockAllMemory() {
+ InitStart();
+ if (mlockall(MCL_CURRENT | MCL_FUTURE) == -1) {
+ Die("%s-init: mlockall failed with %d (%s)\n",
+ program_invocation_short_name, errno, strerror(errno));
+ }
+
+ // Forces the memory pages for all the stack space that we're ever going to
+ // use to be loaded into memory (so it can be locked there).
+ uint8_t data[4096 * 8];
+ // Not 0 because linux might optimize that to a 0-filled page.
+ memset(data, 1, sizeof(data));
+
+ return 0;
+}
+
+// Do the initialization code that is necessary for both realtime and
+// non-realtime processes.
+void DoInitNRT(aos_core_create create) {
+ InitStart();
+ if (aos_core_create_shared_mem(create)) {
+ Die("%s-init: creating shared memory reference failed\n",
+ program_invocation_short_name);
+ }
+ logging::linux_code::Register();
+}
+
+const char *const kNoRealtimeEnvironmentVariable = "AOS_NO_REALTIME";
+
+} // namespace
+
+void InitNRT() { DoInitNRT(aos_core_create::reference); }
+void InitCreate() { DoInitNRT(aos_core_create::create); }
+void Init(int relative_priority) {
+ if (getenv(kNoRealtimeEnvironmentVariable) == NULL) { // if nobody set it
+ LockAllMemory();
+ // Only let rt processes run for 1 second straight.
+ SetSoftRLimit(RLIMIT_RTTIME, 1000000, true);
+ // Allow rt processes up to priority 40.
+ SetSoftRLimit(RLIMIT_RTPRIO, 40, false);
+ // Set our process to priority 40.
+ struct sched_param param;
+ param.sched_priority = 30 + relative_priority;
+ if (sched_setscheduler(0, SCHED_FIFO, ¶m) != 0) {
+ Die("%s-init: setting SCHED_FIFO failed with %d (%s)\n",
+ program_invocation_short_name, errno, strerror(errno));
+ }
+ } else {
+ fprintf(stderr, "%s not doing realtime initialization because environment"
+ " variable %s is set\n", program_invocation_short_name,
+ kNoRealtimeEnvironmentVariable);
+ printf("no realtime for %s. see stderr\n", program_invocation_short_name);
+ }
+
+ InitNRT();
+}
+
+void Cleanup() {
+ if (aos_core_free_shared_mem()) {
+ Die("%s-init: freeing shared mem failed\n",
+ program_invocation_short_name);
+ }
+}
+
+} // namespace aos
diff --git a/aos/linux_code/init.h b/aos/linux_code/init.h
new file mode 100644
index 0000000..0042533
--- /dev/null
+++ b/aos/linux_code/init.h
@@ -0,0 +1,21 @@
+#ifndef AOS_LINUX_CODE_INIT_H_
+#define AOS_LINUX_CODE_INIT_H_
+
+namespace aos {
+
+// Does the non-realtime parts of the initialization process.
+void InitNRT();
+// Initializes everything, including the realtime stuff.
+// relative_priority adjusts the priority of this process relative to all of the
+// other ones (positive for higher priority).
+void Init(int relative_priority = 0);
+// Same as InitNRT, except will remove an existing shared memory file and create
+// a new one.
+void InitCreate();
+// Cleans up (probably not going to get called very often because few things can
+// exit gracefully).
+void Cleanup();
+
+} // namespace aos
+
+#endif // AOS_LINUX_CODE_INIT_H_
diff --git a/aos/linux_code/ipc_lib/aos_sync.c b/aos/linux_code/ipc_lib/aos_sync.c
new file mode 100644
index 0000000..52ebed1
--- /dev/null
+++ b/aos/linux_code/ipc_lib/aos_sync.c
@@ -0,0 +1,203 @@
+#include "aos/linux_code/ipc_lib/aos_sync.h"
+
+#include <stdio.h>
+#include <linux/futex.h>
+#include <unistd.h>
+#include <sys/syscall.h>
+#include <errno.h>
+#include <stdint.h>
+#include <limits.h>
+#include <string.h>
+#include <inttypes.h>
+
+// TODO(brians): Inline these in the new PI version.
+#define cmpxchg(ptr, o, n) __sync_val_compare_and_swap(ptr, o, n)
+static inline uint32_t xchg(mutex *pointer, uint32_t value) {
+ uint32_t result;
+ __atomic_exchange(pointer, &value, &result, __ATOMIC_SEQ_CST);
+ return result;
+}
+
+// this code is based on something that appears to be based on <http://www.akkadia.org/drepper/futex.pdf>, which also has a lot of useful information
+// should probably use <http://lxr.linux.no/linux+v2.6.34/Documentation/robust-futexes.txt> once it becomes available
+// (sys_set_robust_list appears to be the function name)
+// <http://locklessinc.com/articles/futex_cheat_sheet/> and
+// <http://locklessinc.com/articles/mutex_cv_futex/> are useful
+// <http://lwn.net/Articles/360699/> has a nice overview of futexes in late 2009 (fairly recent compared to everything else...)
+// can't use PRIVATE futex operations because they use the pid (or something) as part of the hash
+//
+// Remember that EAGAIN and EWOUDBLOCK are the same! (ie if you get EAGAIN from
+// FUTEX_WAIT, the docs call it EWOULDBLOCK...)
+//
+// Values for a mutex:
+// 0 = unlocked
+// 1 = locked, not contended
+// 2 = locked, probably contended
+// Values for a "futex":
+// 0 = unset
+// 1 = set
+
+static inline int sys_futex(mutex *addr1, int op, int val1,
+ const struct timespec *timeout, void *addr2, int val3) {
+ return syscall(SYS_futex, addr1, op, val1, timeout, addr2, val3);
+}
+static inline int sys_futex_requeue(mutex *addr1, int op, int num_wake,
+ int num_requeue, mutex *m) {
+ return syscall(SYS_futex, addr1, op, num_wake, num_requeue, m);
+}
+static inline int sys_futex_op(mutex *addr1, int op, int num_waiters1,
+ int num_waiters2, mutex *addr2, int op_args_etc) {
+ return syscall(SYS_futex, addr1, op, num_waiters1,
+ num_waiters2, addr2, op_args_etc);
+}
+
+static inline int mutex_get(mutex *m, uint8_t signals_fail, const
+ struct timespec *timeout) {
+ int c;
+ c = cmpxchg(m, 0, 1);
+ if (!c) return 0;
+ /* The lock is now contended */
+ if (c == 1) c = xchg(m, 2);
+ while (c) {
+ /* Wait in the kernel */
+ //printf("sync here %d\n", __LINE__);
+ if (sys_futex(m, FUTEX_WAIT, 2, timeout, NULL, 0) == -1) {
+ if (signals_fail && errno == EINTR) {
+ return 1;
+ }
+ if (timeout != NULL && errno == ETIMEDOUT) {
+ return 2;
+ }
+ }
+ //printf("sync here %d\n", __LINE__);
+ c = xchg(m, 2);
+ }
+ return 0;
+}
+int mutex_lock(mutex *m) {
+ return mutex_get(m, 1, NULL);
+}
+int mutex_lock_timeout(mutex *m, const struct timespec *timeout) {
+ return mutex_get(m, 1, timeout);
+}
+int mutex_grab(mutex *m) {
+ return mutex_get(m, 0, NULL);
+}
+
+void mutex_unlock(mutex *m) {
+ /* Unlock, and if not contended then exit. */
+ //printf("mutex_unlock(%p) => %d \n",m,*m);
+ switch (xchg(m, 0)) {
+ case 0:
+ fprintf(stderr, "sync: multiple unlock of %p. aborting\n", m);
+ printf("see stderr\n");
+ abort();
+ case 1:
+ //printf("mutex_unlock return(%p) => %d \n",m,*m);
+ break;
+ case 2:
+ if (sys_futex(m, FUTEX_WAKE, 1, NULL, NULL, 0) == -1) {
+ fprintf(stderr, "sync: waking 1 from %p failed with %d: %s\n",
+ m, errno, strerror(errno));
+ printf("see stderr\n");
+ abort();
+ } else {
+ break;
+ }
+ default:
+ fprintf(stderr, "sync: got a garbage value from mutex %p. aborting\n",
+ m);
+ printf("see stderr\n");
+ abort();
+ }
+}
+int mutex_trylock(mutex *m) {
+ /* Try to take the lock, if is currently unlocked */
+ unsigned c = cmpxchg(m, 0, 1);
+ if (!c) return 0;
+ return 1;
+}
+
+int futex_wait(mutex *m) {
+ if (*m) {
+ return 0;
+ }
+ if (sys_futex(m, FUTEX_WAIT, 0, NULL, NULL, 0) == -1) {
+ if (errno == EINTR) {
+ return 1;
+ } else if (errno != EWOULDBLOCK) {
+ return -1;
+ }
+ }
+ return 0;
+}
+int futex_set_value(mutex *m, mutex value) {
+ xchg(m, value);
+ return sys_futex(m, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
+}
+int futex_set(mutex *m) {
+ return futex_set_value(m, 1);
+}
+int futex_unset(mutex *m) {
+ return !xchg(m, 0);
+}
+
+void condition_wait(mutex *c, mutex *m) {
+ const mutex wait_start = *c;
+
+ mutex_unlock(m);
+
+ while (1) {
+ if (sys_futex(c, FUTEX_WAIT, wait_start, NULL, NULL, 0) == -1) {
+ // If it failed for some reason other than somebody else doing a wake
+ // before we actually made it to sleep.
+ if (__builtin_expect(*c == wait_start, 0)) {
+ // Try again if it was because of a signal.
+ if (errno == EINTR) continue;
+ fprintf(stderr, "FUTEX_WAIT(%p, %"PRIu32", NULL, NULL, 0) failed"
+ " with %d: %s\n",
+ c, wait_start, errno, strerror(errno));
+ printf("see stderr\n");
+ abort();
+ }
+ }
+ // Simplified mutex_lock that always leaves it
+ // contended in case anybody else got requeued.
+ while (xchg(m, 2) != 0) {
+ if (sys_futex(m, FUTEX_WAIT, 2, NULL, NULL, 0) == -1) {
+ // Try again if it was because of a signal or somebody else unlocked it
+ // before we went to sleep.
+ if (errno == EINTR || errno == EWOULDBLOCK) continue;
+ fprintf(stderr, "sync: FUTEX_WAIT(%p, 2, NULL, NULL, 0)"
+ " failed with %d: %s\n",
+ m, errno, strerror(errno));
+ printf("see stderr\n");
+ abort();
+ }
+ }
+ return;
+ }
+}
+
+void condition_signal(mutex *c) {
+ __sync_fetch_and_add(c, 1);
+ if (sys_futex(c, FUTEX_WAKE, 1, NULL, NULL, 0) == -1) {
+ fprintf(stderr, "sync: FUTEX_WAKE(%p, 1, NULL, NULL, 0)"
+ " failed with %d: %s\n",
+ c, errno, strerror(errno));
+ printf("see stderr\n");
+ abort();
+ }
+}
+
+void condition_broadcast(mutex *c, mutex *m) {
+ __sync_fetch_and_add(c, 1);
+ // Wake 1 waiter and requeue the rest.
+ if (sys_futex_requeue(c, FUTEX_REQUEUE, 1, INT_MAX, m) == -1) {
+ fprintf(stderr, "sync: FUTEX_REQUEUE(%p, 1, INT_MAX, %p, 0)"
+ " failed with %d: %s\n",
+ c, m, errno, strerror(errno));
+ printf("see stderr\n");
+ abort();
+ }
+}
diff --git a/aos/linux_code/ipc_lib/aos_sync.h b/aos/linux_code/ipc_lib/aos_sync.h
new file mode 100644
index 0000000..7a81ca3
--- /dev/null
+++ b/aos/linux_code/ipc_lib/aos_sync.h
@@ -0,0 +1,96 @@
+#ifndef AOS_LINUX_CODE_IPC_LIB_SYNC_H_
+#define AOS_LINUX_CODE_IPC_LIB_SYNC_H_
+
+#include <stdlib.h>
+#include <signal.h>
+#include <stdint.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif // __cplusplus
+
+// TODO(brians) add client requests to make helgrind useful with this code
+// <http://www.valgrind.org/docs/manual/hg-manual.html#hg-manual.client-requests>
+// and <http://www.valgrind.org/docs/manual/drd-manual.html#drd-manual.clientreqs>
+// list the interesting ones
+
+// Have to align structs containing it to sizeof(int).
+// Valid initial values for use with mutex_ functions are 0 (unlocked) and 1 (locked).
+// Valid initial values for use with futex_ functions are 0 (unset) and 1 (set).
+// No initialization is necessary for use as c with the condition_ functions.
+// The value should not be changed after multiple processes have started
+// accessing an instance except through the functions declared in this file.
+typedef volatile uint32_t mutex __attribute__((aligned(sizeof(int))));
+
+// All return -1 for other error (which will be in errno from futex(2)).
+//
+// There is no priority inversion protection.
+// TODO(brians) look at using
+// <http://www.kernel.org/doc/Documentation/pi-futex.txt>
+
+// Returns 1 if interrupted by a signal.
+//
+// One of the highest priority processes blocked on a given mutex will be the
+// one to lock it when it is unlocked.
+int mutex_lock(mutex *m) __attribute__((warn_unused_result));
+// Returns 2 if it timed out or 1 if interrupted by a signal.
+int mutex_lock_timeout(mutex *m, const struct timespec *timeout)
+ __attribute__((warn_unused_result));
+// Ignores signals. Can not fail.
+int mutex_grab(mutex *m);
+// abort(2)s for multiple unlocking.
+void mutex_unlock(mutex *m);
+// Returns 0 when successful in locking the mutex and 1 if somebody else has it
+// locked.
+int mutex_trylock(mutex *m) __attribute__((warn_unused_result));
+
+// The futex_ functions are similar to the mutex_ ones but different.
+// They are designed for signalling when something happens (possibly to
+// multiple listeners). A mutex manipulated with them can only be set or unset.
+//
+// They are different from the condition_ functions in that they do NOT work
+// correctly as standard condition variables. While it is possible to keep
+// track of the "condition" using the value part of the futex_* functions, the
+// obvious implementation has basically the same race condition that condition
+// variables are designed to prevent between somebody else grabbing the mutex
+// and changing whether it's set or not and the futex_ function changing the
+// futex's value.
+
+// Wait for the futex to be set. Will return immediately if it's already set.
+// Returns 0 if successful or it was already set, 1 if interrupted by a signal,
+// or -1.
+int futex_wait(mutex *m) __attribute__((warn_unused_result));
+// Set the futex and wake up anybody waiting on it.
+// Returns the number that were woken or -1.
+//
+// This will always wake up all waiters at the same time and set the value to 1.
+int futex_set(mutex *m);
+// Same as above except lets something other than 1 be used as the final value.
+int futex_set_value(mutex *m, mutex value);
+// Unsets the futex (sets the value to 0).
+// Returns 0 if it was set before and 1 if it wasn't.
+// Can not fail.
+int futex_unset(mutex *m);
+
+// The condition_ functions implement condition variable support. The API is
+// similar to the pthreads api and works the same way. The same m argument must
+// be passed in for all calls to all of the condition_ functions with a given c.
+
+// Wait for the condition variable to be signalled. m will be unlocked
+// atomically with actually starting to wait. m is guaranteed to be locked when
+// this function returns.
+// NOTE: The relocking of m is not atomic with stopping the actual wait and
+// other process(es) may lock (+unlock) the mutex first.
+void condition_wait(mutex *c, mutex *m);
+// If any other processes are condition_waiting on c, wake 1 of them. Does not
+// require m to be locked.
+void condition_signal(mutex *c);
+// Wakes all processes that are condition_waiting on c. Does not require m to be
+// locked.
+void condition_broadcast(mutex *c, mutex *m);
+
+#ifdef __cplusplus
+}
+#endif // __cplusplus
+
+#endif // AOS_LINUX_CODE_IPC_LIB_SYNC_H_
diff --git a/aos/linux_code/ipc_lib/condition.cc b/aos/linux_code/ipc_lib/condition.cc
new file mode 100644
index 0000000..b764026
--- /dev/null
+++ b/aos/linux_code/ipc_lib/condition.cc
@@ -0,0 +1,26 @@
+#include "aos/common/condition.h"
+
+#include <inttypes.h>
+
+#include "aos/common/type_traits.h"
+
+namespace aos {
+
+static_assert(shm_ok<Condition>::value, "Condition should work"
+ " in shared memory");
+
+Condition::Condition(Mutex *m) : impl_(), m_(m) {}
+
+void Condition::Wait() {
+ condition_wait(&impl_, &m_->impl_);
+}
+
+void Condition::Signal() {
+ condition_signal(&impl_);
+}
+
+void Condition::Broadcast() {
+ condition_broadcast(&impl_, &m_->impl_);
+}
+
+} // namespace aos
diff --git a/aos/linux_code/ipc_lib/core_lib.c b/aos/linux_code/ipc_lib/core_lib.c
new file mode 100644
index 0000000..cc1ccbb
--- /dev/null
+++ b/aos/linux_code/ipc_lib/core_lib.c
@@ -0,0 +1,44 @@
+#include "aos/linux_code/ipc_lib/core_lib.h"
+
+#include <stdio.h>
+#include <stdlib.h>
+
+#include "aos/linux_code/ipc_lib/shared_mem.h"
+
+static inline uint8_t aos_8max(uint8_t l, uint8_t r) {
+ return (l > r) ? l : r;
+}
+void *shm_malloc_aligned(size_t length, uint8_t alignment) {
+ // minimum alignments from
+ // <http://software.intel.com/en-us/articles/data-alignment-when-migrating-to-64-bit-intel-architecture/>
+ if (length <= 1) {
+ alignment = aos_8max(alignment, 1);
+ } else if (length <= 2) {
+ alignment = aos_8max(alignment, 2);
+ } else if (length <= 4) {
+ alignment = aos_8max(alignment, 4);
+ } else if (length <= 8) {
+ alignment = aos_8max(alignment, 8);
+ } else if (length <= 16) {
+ alignment = aos_8max(alignment, 16);
+ } else {
+ alignment = aos_8max(alignment, (length >= 64) ? 64 : 16);
+ }
+
+ void *msg = NULL;
+ aos_shm_core *shm_core = global_core->mem_struct;
+ mutex_grab(&shm_core->msg_alloc_lock);
+ shm_core->msg_alloc = (uint8_t *)shm_core->msg_alloc - length;
+ const uint8_t align_extra = (uintptr_t)shm_core->msg_alloc % alignment;
+ shm_core->msg_alloc = (uint8_t *)shm_core->msg_alloc - align_extra;
+ msg = shm_core->msg_alloc;
+ if (msg <= global_core->shared_mem) {
+ fprintf(stderr, "core_lib: RAN OUT OF SHARED MEMORY!!!----------------------------------------------------------\n");
+ printf("if you didn't see the stderr output just then, you should have\n");
+ abort();
+ }
+ //printf("alloc %p\n", msg);
+ mutex_unlock(&shm_core->msg_alloc_lock);
+ return msg;
+}
+
diff --git a/aos/linux_code/ipc_lib/core_lib.h b/aos/linux_code/ipc_lib/core_lib.h
new file mode 100644
index 0000000..1b0d754
--- /dev/null
+++ b/aos/linux_code/ipc_lib/core_lib.h
@@ -0,0 +1,23 @@
+#ifndef _AOS_CORE_LIB_H_
+#define _AOS_CORE_LIB_H_
+
+#include <stdint.h>
+
+#include "aos/linux_code/ipc_lib/aos_sync.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif // __cplusplus
+
+void *shm_malloc_aligned(size_t length, uint8_t alignment)
+ __attribute__((alloc_size(1)));
+static void *shm_malloc(size_t length) __attribute__((alloc_size(1)));
+static inline void *shm_malloc(size_t length) {
+ return shm_malloc_aligned(length, 0);
+}
+
+#ifdef __cplusplus
+}
+#endif // __cplusplus
+
+#endif
diff --git a/aos/linux_code/ipc_lib/ipc_lib.gyp b/aos/linux_code/ipc_lib/ipc_lib.gyp
new file mode 100644
index 0000000..f947d5e
--- /dev/null
+++ b/aos/linux_code/ipc_lib/ipc_lib.gyp
@@ -0,0 +1,82 @@
+{
+ 'targets': [
+ {
+ 'target_name': 'aos_sync',
+ 'type': 'static_library',
+ 'sources': [
+ 'aos_sync.c',
+ ],
+ },
+ {
+ 'target_name': 'core_lib',
+ 'type': 'static_library',
+ 'sources': [
+ 'core_lib.c',
+ ],
+ 'dependencies': [
+ 'aos_sync',
+ 'shared_mem',
+ ],
+ 'export_dependent_settings': [
+ 'aos_sync',
+ ],
+ },
+ {
+ 'target_name': 'shared_mem',
+ 'type': 'static_library',
+ 'sources': [
+ 'shared_mem.c',
+ ],
+ 'dependencies': [
+ 'aos_sync',
+ ],
+ 'export_dependent_settings': [
+ 'aos_sync',
+ ],
+ },
+ {
+ 'target_name': 'queue',
+ 'type': 'static_library',
+ 'sources': [
+ 'queue.cc',
+ ],
+ 'dependencies': [
+ '<(AOS)/common/common.gyp:condition',
+ '<(AOS)/common/common.gyp:mutex',
+ 'core_lib',
+ # TODO(brians): fix this once there's a nice logging interface to use
+ # '<(AOS)/build/aos.gyp:logging',
+ ],
+ },
+ {
+ 'target_name': 'raw_queue_test',
+ 'type': 'executable',
+ 'sources': [
+ 'queue_test.cc',
+ ],
+ 'dependencies': [
+ '<(EXTERNALS):gtest',
+ 'queue',
+ '<(AOS)/build/aos.gyp:logging',
+ 'core_lib',
+ '<(AOS)/common/common.gyp:queue_testutils',
+ '<(AOS)/common/common.gyp:time',
+ ],
+ },
+ {
+ 'target_name': 'ipc_stress_test',
+ 'type': 'executable',
+ 'sources': [
+ 'ipc_stress_test.cc',
+ ],
+ 'dependencies': [
+ '<(EXTERNALS):gtest',
+ '<(AOS)/common/common.gyp:time',
+ '<(AOS)/common/common.gyp:queue_testutils',
+ '<(AOS)/common/common.gyp:mutex',
+ 'core_lib',
+ '<(AOS)/common/common.gyp:die',
+ ],
+ },
+ ],
+}
diff --git a/aos/linux_code/ipc_lib/ipc_stress_test.cc b/aos/linux_code/ipc_lib/ipc_stress_test.cc
new file mode 100644
index 0000000..1b55e82
--- /dev/null
+++ b/aos/linux_code/ipc_lib/ipc_stress_test.cc
@@ -0,0 +1,248 @@
+#include <stdio.h>
+#include <unistd.h>
+#include <errno.h>
+#include <string.h>
+#include <sys/types.h>
+#include <sys/wait.h>
+#include <libgen.h>
+#include <assert.h>
+
+#include <vector>
+#include <string>
+
+#include "aos/common/time.h"
+#include "aos/common/queue_testutils.h"
+#include "aos/common/type_traits.h"
+#include "aos/common/mutex.h"
+#include "aos/linux_code/ipc_lib/core_lib.h"
+#include "aos/common/die.h"
+
+// This runs all of the IPC-related tests in a bunch of parallel processes for a
+// while and makes sure that they don't fail. It also captures the stdout and
+// stderr output from each test run and only prints it out (not interleaved with
+// the output from any other run) if the test fails.
+//
+// It's written in C++ for performance. We need actual OS-level parallelism for
+// this to work, which means that Ruby's out because it doesn't have good
+// support for doing that. My Python implementation ended up pretty heavily disk
+// IO-bound, which is a bad way to test CPU contention.
+
+namespace aos {
+
+// Each test is represented by the name of the test binary and then any
+// arguments to pass to it.
+// Using --gtest_filter is a bad idea because it seems to result in a lot of
+// swapping which causes everything to be disk-bound (at least for me).
+static const ::std::vector< ::std::vector< ::std::string>> kTests = {
+ {"queue_test"},
+ {"condition_test"},
+ {"mutex_test"},
+ {"raw_queue_test"},
+};
+// These arguments get inserted before any per-test arguments.
+static const ::std::vector< ::std::string> kDefaultArgs = {
+ "--gtest_repeat=30",
+ "--gtest_shuffle",
+};
+
+// How many test processes to run at a time.
+static const int kTesters = 100;
+// How long to test for.
+static constexpr time::Time kTestTime = time::Time::InSeconds(30);
+
+// The structure that gets put into shared memory and then referenced by all of
+// the child processes.
+struct Shared {
+ Shared(const time::Time &stop_time)
+ : stop_time(stop_time), total_iterations(0) {}
+
+ // Synchronizes access to stdout/stderr to avoid interleaving failure
+ // messages.
+ Mutex output_mutex;
+
+ // When to stop.
+ time::Time stop_time;
+
+ // The total number of iterations. Updated by each child as it finishes.
+ int total_iterations;
+ // Sychronizes writes to total_iterations
+ Mutex total_iterations_mutex;
+
+ const char *path;
+};
+static_assert(shm_ok<Shared>::value,
+ "it's going to get shared between forked processes");
+
+// Gets called after each child forks to run a test.
+void __attribute__((noreturn)) DoRunTest(
+ Shared *shared, const ::std::vector< ::std::string> &test, int pipes[2]) {
+ if (close(pipes[0]) == -1) {
+ Die("close(%d) of read end of pipe failed with %d: %s\n",
+ pipes[0], errno, strerror(errno));
+ }
+ if (close(STDIN_FILENO) == -1) {
+ Die("close(STDIN_FILENO(=%d)) failed with %d: %s\n",
+ STDIN_FILENO, errno, strerror(errno));
+ }
+ if (dup2(pipes[1], STDOUT_FILENO) == -1) {
+ Die("dup2(%d, STDOUT_FILENO(=%d)) failed with %d: %s\n",
+ pipes[1], STDOUT_FILENO, errno, strerror(errno));
+ }
+ if (dup2(pipes[1], STDERR_FILENO) == -1) {
+ Die("dup2(%d, STDERR_FILENO(=%d)) failed with %d: %s\n",
+ pipes[1], STDERR_FILENO, errno, strerror(errno));
+ }
+
+ size_t size = test.size();
+ size_t default_size = kDefaultArgs.size();
+ const char **args = new const char *[size + default_size + 1];
+ // The actual executable to run.
+ ::std::string executable;
+ int i = 0;
+ for (const ::std::string &c : test) {
+ if (i == 0) {
+ executable = ::std::string(shared->path) + "/" + c;
+ args[0] = executable.c_str();
+ for (const ::std::string &ci : kDefaultArgs) {
+ args[++i] = ci.c_str();
+ }
+ } else {
+ args[i] = c.c_str();
+ }
+ ++i;
+ }
+ args[size] = NULL;
+ execv(executable.c_str(), const_cast<char *const *>(args));
+ Die("execv(%s, %p) failed with %d: %s\n",
+ executable.c_str(), args, errno, strerror(errno));
+}
+
+void DoRun(Shared *shared) {
+ int iterations = 0;
+ // An iterator pointing to a random one of the tests.
+ auto test = kTests.begin() + (getpid() % kTests.size());
+ int pipes[2];
+ while (time::Time::Now() < shared->stop_time) {
+ if (pipe(pipes) == -1) {
+ Die("pipe(%p) failed with %d: %s\n", &pipes, errno, strerror(errno));
+ }
+ switch (fork()) {
+ case 0: // in runner
+ DoRunTest(shared, *test, pipes);
+ case -1:
+ Die("fork() failed with %d: %s\n", errno, strerror(errno));
+ }
+
+ if (close(pipes[1]) == -1) {
+ Die("close(%d) of write end of pipe failed with %d: %s\n",
+ pipes[1], errno, strerror(errno));
+ }
+
+ ::std::string output;
+ char buffer[2048];
+ while (true) {
+ ssize_t ret = read(pipes[0], &buffer, sizeof(buffer));
+ if (ret == 0) { // EOF
+ if (close(pipes[0]) == -1) {
+ Die("close(%d) of pipe at EOF failed with %d: %s\n",
+ pipes[0], errno, strerror(errno));
+ }
+ break;
+ } else if (ret == -1) {
+ Die("read(%d, %p, %zd) failed with %d: %s\n",
+ pipes[0], &buffer, sizeof(buffer), errno, strerror(errno));
+ }
+ output += ::std::string(buffer, ret);
+ }
+
+ int status;
+ while (true) {
+ if (wait(&status) == -1) {
+ if (errno == EINTR) continue;
+ Die("wait(%p) in child failed with %d: %s\n",
+ &status, errno, strerror(errno));
+ } else {
+ break;
+ }
+ }
+ if (WIFEXITED(status)) {
+ if (WEXITSTATUS(status) != 0) {
+ MutexLocker sync(&shared->output_mutex);
+ fprintf(stderr, "Test %s exited with status %d. output:\n",
+ test->at(0).c_str(), WEXITSTATUS(status));
+ fputs(output.c_str(), stderr);
+ }
+ } else if (WIFSIGNALED(status)) {
+ MutexLocker sync(&shared->output_mutex);
+ fprintf(stderr, "Test %s terminated by signal %d: %s.\n",
+ test->at(0).c_str(),
+ WTERMSIG(status), strsignal(WTERMSIG(status)));
+ fputs(output.c_str(), stderr);
+ } else {
+ assert(WIFSTOPPED(status));
+ Die("Test %s was stopped.\n", test->at(0).c_str());
+ }
+
+ ++test;
+ if (test == kTests.end()) test = kTests.begin();
+ ++iterations;
+ }
+ {
+ MutexLocker sync(&shared->total_iterations_mutex);
+ shared->total_iterations += iterations;
+ }
+}
+
+void Run(Shared *shared) {
+ switch (fork()) {
+ case 0: // in child
+ DoRun(shared);
+ _exit(EXIT_SUCCESS);
+ case -1:
+ Die("fork() of child failed with %d: %s\n", errno, strerror(errno));
+ }
+}
+
+int Main(int argc, char **argv) {
+ assert(argc >= 1);
+
+ ::aos::common::testing::GlobalCoreInstance global_core;
+
+ Shared *shared = static_cast<Shared *>(shm_malloc(sizeof(Shared)));
+ new (shared) Shared(time::Time::Now() + kTestTime);
+
+ char *temp = strdup(argv[0]);
+ shared->path = strdup(dirname(temp));
+ free(temp);
+
+ for (int i = 0; i < kTesters; ++i) {
+ Run(shared);
+ }
+
+ bool error = false;
+ for (int i = 0; i < kTesters; ++i) {
+ int status;
+ if (wait(&status) == -1) {
+ if (errno == EINTR) {
+ --i;
+ } else {
+ Die("wait(%p) failed with %d: %s\n", &status, errno, strerror(errno));
+ }
+ }
+ if (!WIFEXITED(status) || WEXITSTATUS(status) != 0) {
+ error = true;
+ }
+ }
+
+ printf("Ran a total of %d tests.\n", shared->total_iterations);
+ if (error) {
+ printf("A child had a problem during the test.\n");
+ }
+ return error ? EXIT_FAILURE : EXIT_SUCCESS;
+}
+
+} // namespace aos
+
+int main(int argc, char **argv) {
+ return ::aos::Main(argc, argv);
+}
diff --git a/aos/linux_code/ipc_lib/mutex.cpp b/aos/linux_code/ipc_lib/mutex.cpp
new file mode 100644
index 0000000..47fc92a
--- /dev/null
+++ b/aos/linux_code/ipc_lib/mutex.cpp
@@ -0,0 +1,36 @@
+#include "aos/common/mutex.h"
+
+#include <inttypes.h>
+#include <errno.h>
+#include <stdio.h>
+#include <string.h>
+
+#include "aos/common/type_traits.h"
+#include "aos/common/logging/logging.h"
+
+namespace aos {
+
+Mutex::Mutex() : impl_(0) {
+ static_assert(shm_ok<Mutex>::value,
+ "Mutex is not safe for use in shared memory.");
+}
+
+// Lock and Unlock use the return values of mutex_lock/mutex_unlock
+// to determine whether the lock/unlock succeeded.
+
+void Mutex::Lock() {
+ if (mutex_grab(&impl_) != 0) {
+ LOG(FATAL, "mutex_grab(%p(=%" PRIu32 ")) failed because of %d: %s\n",
+ &impl_, impl_, errno, strerror(errno));
+ }
+}
+
+void Mutex::Unlock() {
+ mutex_unlock(&impl_);
+}
+
+bool Mutex::TryLock() {
+ return mutex_trylock(&impl_) == 0;
+}
+
+} // namespace aos
diff --git a/aos/linux_code/ipc_lib/queue.cc b/aos/linux_code/ipc_lib/queue.cc
new file mode 100644
index 0000000..e67f22c
--- /dev/null
+++ b/aos/linux_code/ipc_lib/queue.cc
@@ -0,0 +1,491 @@
+#include "aos/linux_code/ipc_lib/queue.h"
+
+#include <stdio.h>
+#include <string.h>
+#include <errno.h>
+#include <assert.h>
+
+#include <memory>
+
+#include "aos/common/logging/logging.h"
+#include "aos/common/type_traits.h"
+#include "aos/linux_code/ipc_lib/core_lib.h"
+
+namespace aos {
+namespace {
+
+static_assert(shm_ok<RawQueue>::value,
+ "RawQueue instances go into shared memory");
+
+const bool kReadDebug = false;
+const bool kWriteDebug = false;
+const bool kRefDebug = false;
+const bool kFetchDebug = false;
+
+// The number of extra messages the pool associated with each queue will be able
+// to hold (for readers who are slow about freeing them or who leak one when
+// they get killed).
+const int kExtraMessages = 20;
+
+} // namespace
+
+const int RawQueue::kPeek;
+const int RawQueue::kFromEnd;
+const int RawQueue::kNonBlock;
+const int RawQueue::kBlock;
+const int RawQueue::kOverride;
+
+struct RawQueue::MessageHeader {
+ int ref_count;
+ int index; // in pool_
+ static MessageHeader *Get(const void *msg) {
+ return reinterpret_cast<MessageHeader *>(__builtin_assume_aligned(
+ static_cast<uint8_t *>(const_cast<void *>(msg)) - sizeof(MessageHeader),
+ alignof(MessageHeader)));
+ }
+ void Swap(MessageHeader *other) {
+ MessageHeader temp;
+ memcpy(&temp, other, sizeof(temp));
+ memcpy(other, this, sizeof(*other));
+ memcpy(this, &temp, sizeof(*this));
+ }
+};
+static_assert(shm_ok<RawQueue::MessageHeader>::value, "the whole point"
+ " is to stick it in shared memory");
+
+struct RawQueue::ReadData {
+ bool writable_start;
+};
+
+// TODO(brians) maybe do this with atomic integer instructions so it doesn't
+// have to lock/unlock pool_lock_
+void RawQueue::DecrementMessageReferenceCount(const void *msg) {
+ MutexLocker locker(&pool_lock_);
+ MessageHeader *header = MessageHeader::Get(msg);
+ --header->ref_count;
+ assert(header->ref_count >= 0);
+ if (kRefDebug) {
+ printf("ref_dec_count: %p count=%d\n", msg, header->ref_count);
+ }
+ if (header->ref_count == 0) {
+ DoFreeMessage(msg);
+ }
+}
+
+RawQueue::RawQueue(const char *name, size_t length, int hash, int queue_length)
+ : readable_(&data_lock_), writable_(&data_lock_) {
+ const size_t name_size = strlen(name) + 1;
+ char *temp = static_cast<char *>(shm_malloc(name_size));
+ memcpy(temp, name, name_size);
+ name_ = temp;
+ length_ = length;
+ hash_ = hash;
+ queue_length_ = queue_length;
+
+ next_ = NULL;
+ recycle_ = NULL;
+
+ if (kFetchDebug) {
+ printf("initializing name=%s, length=%zd, hash=%d, queue_length=%d\n",
+ name, length, hash, queue_length);
+ }
+
+ data_length_ = queue_length + 1;
+ if (data_length_ < 2) { // TODO(brians) when could this happen?
+ data_length_ = 2;
+ }
+ data_ = static_cast<void **>(shm_malloc(sizeof(void *) * data_length_));
+ data_start_ = 0;
+ data_end_ = 0;
+ messages_ = 0;
+
+ mem_length_ = queue_length + kExtraMessages;
+ pool_length_ = 0;
+ messages_used_ = 0;
+ msg_length_ = length + sizeof(MessageHeader);
+ pool_ = static_cast<MessageHeader **>(
+ shm_malloc(sizeof(MessageHeader *) * mem_length_));
+
+ if (kFetchDebug) {
+ printf("made queue %s\n", name);
+ }
+}
+RawQueue *RawQueue::Fetch(const char *name, size_t length, int hash,
+ int queue_length) {
+ if (kFetchDebug) {
+ printf("fetching queue %s\n", name);
+ }
+ if (mutex_lock(&global_core->mem_struct->queues.alloc_lock) != 0) {
+ return NULL;
+ }
+ RawQueue *current = static_cast<RawQueue *>(
+ global_core->mem_struct->queues.queue_list);
+ if (current != NULL) {
+ while (true) {
+ // If we found a matching queue.
+ if (strcmp(current->name_, name) == 0 && current->length_ == length &&
+ current->hash_ == hash && current->queue_length_ == queue_length) {
+ mutex_unlock(&global_core->mem_struct->queues.alloc_lock);
+ return current;
+ } else {
+ if (kFetchDebug) {
+ printf("rejected queue %s strcmp=%d target=%s\n", current->name_,
+ strcmp(current->name_, name), name);
+ }
+ }
+ // If this is the last one.
+ if (current->next_ == NULL) break;
+ current = current->next_;
+ }
+ }
+
+ RawQueue *r = new (shm_malloc(sizeof(RawQueue)))
+ RawQueue(name, length, hash, queue_length);
+ if (current == NULL) { // if we don't already have one
+ global_core->mem_struct->queues.queue_list = r;
+ } else {
+ current->next_ = r;
+ }
+
+ mutex_unlock(&global_core->mem_struct->queues.alloc_lock);
+ return r;
+}
+RawQueue *RawQueue::Fetch(const char *name, size_t length, int hash,
+ int queue_length,
+ int recycle_hash, int recycle_length, RawQueue **recycle) {
+ RawQueue *r = Fetch(name, length, hash, queue_length);
+ r->recycle_ = Fetch(name, length, recycle_hash, recycle_length);
+ if (r == r->recycle_) {
+ fprintf(stderr, "queue: r->recycle_(=%p) == r(=%p)\n", r->recycle_, r);
+ printf("see stderr\n");
+ r->recycle_ = NULL;
+ abort();
+ }
+ *recycle = r->recycle_;
+ return r;
+}
+
+void RawQueue::DoFreeMessage(const void *msg) {
+ MessageHeader *header = MessageHeader::Get(msg);
+ if (pool_[header->index] != header) { // if something's messed up
+ fprintf(stderr, "queue: something is very very wrong with queue %p."
+ " pool_(=%p)[header->index(=%d)] != header(=%p)\n",
+ this, pool_, header->index, header);
+ printf("queue: see stderr\n");
+ abort();
+ }
+ if (kRefDebug) {
+ printf("ref free: %p\n", msg);
+ }
+ --messages_used_;
+
+ if (recycle_ != NULL) {
+ void *const new_msg = recycle_->GetMessage();
+ if (new_msg == NULL) {
+ fprintf(stderr, "queue: couldn't get a message"
+ " for recycle queue %p\n", recycle_);
+ } else {
+ // Take a message from recycle_ and switch its
+ // header with the one being freed, which effectively
+ // switches which queue each message belongs to.
+ MessageHeader *const new_header = MessageHeader::Get(new_msg);
+ // Also switch the messages between the pools.
+ pool_[header->index] = new_header;
+ {
+ MutexLocker locker(&recycle_->pool_lock_);
+ recycle_->pool_[new_header->index] = header;
+ // Swap the information in both headers.
+ header->Swap(new_header);
+ // Don't unlock the other pool until all of its messages are valid.
+ }
+ // use the header for new_msg which is now for this pool
+ header = new_header;
+ if (!recycle_->WriteMessage(const_cast<void *>(msg), kOverride)) {
+ fprintf(stderr, "queue: %p->WriteMessage(%p, kOverride) failed."
+ " aborting\n", recycle_, msg);
+ printf("see stderr\n");
+ abort();
+ }
+ msg = new_msg;
+ }
+ }
+
+ // Where the one we're freeing was.
+ int index = header->index;
+ header->index = -1;
+ if (index != messages_used_) { // if we're not freeing the one on the end
+ // Put the last one where the one we're freeing was.
+ header = pool_[index] = pool_[messages_used_];
+ // Put the one we're freeing at the end.
+ pool_[messages_used_] = MessageHeader::Get(msg);
+ // Update the former last one's index.
+ header->index = index;
+ }
+}
+
+bool RawQueue::WriteMessage(void *msg, int options) {
+ if (kWriteDebug) {
+ printf("queue: %p->WriteMessage(%p, %x)\n", this, msg, options);
+ }
+ if (msg == NULL || msg < reinterpret_cast<void *>(global_core->mem_struct) ||
+ msg > static_cast<void *>((
+ reinterpret_cast<char *>(global_core->mem_struct) +
+ global_core->size))) {
+ fprintf(stderr, "queue: attempt to write bad message %p to %p. aborting\n",
+ msg, this);
+ printf("see stderr\n");
+ abort();
+ }
+ {
+ MutexLocker locker(&data_lock_);
+ bool writable_waited = false;
+
+ int new_end;
+ while (true) {
+ new_end = (data_end_ + 1) % data_length_;
+ // If there is room in the queue right now.
+ if (new_end != data_start_) break;
+ if (options & kNonBlock) {
+ if (kWriteDebug) {
+ printf("queue: not blocking on %p. returning false\n", this);
+ }
+ return false;
+ } else if (options & kOverride) {
+ if (kWriteDebug) {
+ printf("queue: overriding on %p\n", this);
+ }
+ // Avoid leaking the message that we're going to overwrite.
+ DecrementMessageReferenceCount(data_[data_start_]);
+ data_start_ = (data_start_ + 1) % data_length_;
+ } else { // kBlock
+ if (kWriteDebug) {
+ printf("queue: going to wait for writable_ of %p\n", this);
+ }
+ writable_.Wait();
+ writable_waited = true;
+ }
+ }
+ data_[data_end_] = msg;
+ ++messages_;
+ data_end_ = new_end;
+
+ if (kWriteDebug) {
+ printf("queue: broadcasting to readable_ of %p\n", this);
+ }
+ readable_.Broadcast();
+
+ // If we got a signal on writable_ here and it's still writable, then we
+ // need to signal the next person in line (if any).
+ if (writable_waited && is_writable()) {
+ if (kWriteDebug) {
+ printf("queue: resignalling writable_ of %p\n", this);
+ }
+ writable_.Signal();
+ }
+ }
+ if (kWriteDebug) {
+ printf("queue: write returning true on queue %p\n", this);
+ }
+ return true;
+}
+
+void RawQueue::ReadCommonEnd(ReadData *read_data) {
+ if (is_writable()) {
+ if (kReadDebug) {
+ printf("queue: %ssignalling writable_ of %p\n",
+ read_data->writable_start ? "not " : "", this);
+ }
+ if (!read_data->writable_start) writable_.Signal();
+ }
+}
+bool RawQueue::ReadCommonStart(int options, int *index, ReadData *read_data) {
+ read_data->writable_start = is_writable();
+ while (data_start_ == data_end_ || ((index != NULL) && messages_ <= *index)) {
+ if (options & kNonBlock) {
+ if (kReadDebug) {
+ printf("queue: not going to block waiting on %p\n", this);
+ }
+ return false;
+ } else { // kBlock
+ if (kReadDebug) {
+ printf("queue: going to wait for readable_ of %p\n", this);
+ }
+ // Wait for a message to become readable.
+ readable_.Wait();
+ if (kReadDebug) {
+ printf("queue: done waiting for readable_ of %p\n", this);
+ }
+ }
+ }
+ if (kReadDebug) {
+ printf("queue: %p->read start=%d end=%d\n", this, data_start_, data_end_);
+ }
+ return true;
+}
+void *RawQueue::ReadPeek(int options, int start) {
+ void *ret;
+ if (options & kFromEnd) {
+ int pos = data_end_ - 1;
+ if (pos < 0) { // if it needs to wrap
+ pos = data_length_ - 1;
+ }
+ if (kReadDebug) {
+ printf("queue: %p reading from line %d: %d\n", this, __LINE__, pos);
+ }
+ ret = data_[pos];
+ } else {
+ if (kReadDebug) {
+ printf("queue: %p reading from line %d: %d\n", this, __LINE__, start);
+ }
+ ret = data_[start];
+ }
+ MessageHeader *const header = MessageHeader::Get(ret);
+ ++header->ref_count;
+ if (kRefDebug) {
+ printf("ref inc count: %p\n", ret);
+ }
+ return ret;
+}
+const void *RawQueue::ReadMessage(int options) {
+ if (kReadDebug) {
+ printf("queue: %p->ReadMessage(%x)\n", this, options);
+ }
+ void *msg = NULL;
+
+ MutexLocker locker(&data_lock_);
+
+ ReadData read_data;
+ if (!ReadCommonStart(options, NULL, &read_data)) {
+ if (kReadDebug) {
+ printf("queue: %p common returned false\n", this);
+ }
+ return NULL;
+ }
+
+ if (options & kPeek) {
+ msg = ReadPeek(options, data_start_);
+ } else {
+ if (options & kFromEnd) {
+ while (true) {
+ if (kReadDebug) {
+ printf("queue: %p start of c2\n", this);
+ }
+ // This loop pulls each message out of the buffer.
+ const int pos = data_start_;
+ data_start_ = (data_start_ + 1) % data_length_;
+ // If this is the last one.
+ if (data_start_ == data_end_) {
+ if (kReadDebug) {
+ printf("queue: %p reading from c2: %d\n", this, pos);
+ }
+ msg = data_[pos];
+ break;
+ }
+ // This message is not going to be in the queue any more.
+ DecrementMessageReferenceCount(data_[pos]);
+ }
+ } else {
+ if (kReadDebug) {
+ printf("queue: %p reading from d2: %d\n", this, data_start_);
+ }
+ msg = data_[data_start_];
+ // TODO(brians): Doesn't this need to increment the ref count?
+ data_start_ = (data_start_ + 1) % data_length_;
+ }
+ }
+ ReadCommonEnd(&read_data);
+ if (kReadDebug) {
+ printf("queue: %p read returning %p\n", this, msg);
+ }
+ return msg;
+}
+const void *RawQueue::ReadMessageIndex(int options, int *index) {
+ if (kReadDebug) {
+ printf("queue: %p->ReadMessageIndex(%x, %p(*=%d))\n",
+ this, options, index, *index);
+ }
+ void *msg = NULL;
+
+ MutexLocker locker(&data_lock_);
+
+ ReadData read_data;
+ if (!ReadCommonStart(options, index, &read_data)) {
+ if (kReadDebug) {
+ printf("queue: %p common returned false\n", this);
+ }
+ return NULL;
+ }
+
+ // TODO(parker): Handle integer wrap on the index.
+
+ // How many unread messages we have.
+ const int offset = messages_ - *index;
+ // Where we're going to start reading.
+ int my_start = data_end_ - offset;
+ if (my_start < 0) { // If we want to read off the end of the buffer.
+ // Unwrap it.
+ my_start += data_length_;
+ }
+ if (offset >= data_length_) { // If we're behind the available messages.
+ // Catch index up to the last available message.
+ *index += data_start_ - my_start;
+ // And that's the one we're going to read.
+ my_start = data_start_;
+ }
+ if (options & kPeek) {
+ msg = ReadPeek(options, my_start);
+ } else {
+ if (options & kFromEnd) {
+ if (kReadDebug) {
+ printf("queue: %p start of c1\n", this);
+ }
+ int pos = data_end_ - 1;
+ if (pos < 0) { // If it wrapped.
+ pos = data_length_ - 1; // Unwrap it.
+ }
+ if (kReadDebug) {
+ printf("queue: %p reading from c1: %d\n", this, pos);
+ }
+ msg = data_[pos];
+ *index = messages_;
+ } else {
+ if (kReadDebug) {
+ printf("queue: %p reading from d1: %d\n", this, my_start);
+ }
+ msg = data_[my_start];
+ ++(*index);
+ }
+ MessageHeader *const header = MessageHeader::Get(msg);
+ ++header->ref_count;
+ if (kRefDebug) {
+ printf("ref_inc_count: %p\n", msg);
+ }
+ }
+ ReadCommonEnd(&read_data);
+ return msg;
+}
+
+void *RawQueue::GetMessage() {
+ MutexLocker locker(&pool_lock_);
+ MessageHeader *header;
+ if (pool_length_ - messages_used_ > 0) {
+ header = pool_[messages_used_];
+ } else {
+ if (pool_length_ >= mem_length_) {
+ LOG(FATAL, "overused pool of queue %p\n", this);
+ }
+ header = pool_[pool_length_] =
+ static_cast<MessageHeader *>(shm_malloc(msg_length_));
+ ++pool_length_;
+ }
+ void *msg = reinterpret_cast<uint8_t *>(header) + sizeof(MessageHeader);
+ header->ref_count = 1;
+ if (kRefDebug) {
+ printf("%p ref alloc: %p\n", this, msg);
+ }
+ header->index = messages_used_;
+ ++messages_used_;
+ return msg;
+}
+
+} // namespace aos
diff --git a/aos/linux_code/ipc_lib/queue.h b/aos/linux_code/ipc_lib/queue.h
new file mode 100644
index 0000000..a58b65e
--- /dev/null
+++ b/aos/linux_code/ipc_lib/queue.h
@@ -0,0 +1,171 @@
+#ifndef AOS_LINUX_CODE_IPC_LIB_QUEUE_H_
+#define AOS_LINUX_CODE_IPC_LIB_QUEUE_H_
+
+#include "aos/linux_code/ipc_lib/shared_mem.h"
+#include "aos/common/mutex.h"
+#include "aos/common/condition.h"
+
+// TODO(brians) add valgrind client requests to the queue and shared_mem_malloc
+// code to make checking for leaks work better
+// <http://www.valgrind.org/docs/manual/mc-manual.html#mc-manual.mempools>
+// describes how
+
+// Any pointers returned from these functions can be safely passed to other
+// processes because they are all shared memory pointers.
+// IMPORTANT: Any message pointer must be passed back in some way
+// (FreeMessage and WriteMessage are common ones) or the
+// application will leak shared memory.
+// NOTE: Taking a message from ReadMessage and then passing it to WriteMessage
+// might work, but it is not guaranteed to.
+
+namespace aos {
+
+// Queues are the primary way to use shared memory. Basic use consists of
+// calling Queue::Fetch and then reading and/or writing messages.
+// Queues (as the name suggests) are a FIFO stack of messages. Each combination
+// of name and type signature will result in a different queue, which means
+// that if you only recompile some code that uses differently sized messages,
+// it will simply use a different queue than the old code.
+class RawQueue {
+ public:
+ // Retrieves (and creates if necessary) a queue. Each combination of name and
+ // signature refers to a completely independent queue.
+ // length is how large each message will be
+ // hash can differentiate multiple otherwise identical queues
+ // queue_length is how many messages the queue will be able to hold
+ static RawQueue *Fetch(const char *name, size_t length, int hash,
+ int queue_length);
+ // Same as above, except sets up the returned queue so that it will put
+ // messages on *recycle when they are freed (after they have been released by
+ // all other readers/writers and are not in the queue).
+ // recycle_queue_length determines how many freed messages will be kept.
+ // Other code can retrieve the 2 queues separately (the recycle queue will
+ // have the same length and hash as the main one). However, any frees made
+ // using a queue with only (name,length,hash,queue_length) before the
+ // recycle queue has been associated with it will not go on to the recycle
+ // queue.
+ // NOTE: calling this function with the same (name,length,hash,queue_length)
+ // but multiple recycle_queue_lengths will result in each freed message being
+ // put onto an undefined one of the recycle queues.
+ static RawQueue *Fetch(const char *name, size_t length, int hash,
+ int queue_length,
+ int recycle_hash, int recycle_queue_length,
+ RawQueue **recycle);
+
+ // Constants for passing to options arguments.
+ // The non-conflicting ones can be combined with bitwise-or.
+
+ // Causes the returned message to be left in the queue.
+ // For reading only.
+ static const int kPeek = 0x0001;
+ // Reads the last message in the queue instead of just the next one.
+ // NOTE: This removes all of the messages until the last one from the queue
+ // (which means that nobody else will read them). However, PEEK means to not
+ // remove any from the queue, including the ones that are skipped.
+ // For reading only.
+ static const int kFromEnd = 0x0002;
+ // Causes reads to return NULL and writes to fail instead of waiting.
+ // For reading and writing.
+ static const int kNonBlock = 0x0004;
+ // Causes things to block.
+ // IMPORTANT: Has a value of 0 so that it is the default. This has to stay.
+ // For reading and writing.
+ static const int kBlock = 0x0000;
+ // Causes writes to overwrite the oldest message in the queue instead of
+ // blocking.
+ // For writing only.
+ static const int kOverride = 0x0008;
+
+ // Writes a message into the queue.
+ // This function takes ownership of msg.
+ // NOTE: msg must point to a valid message from this queue
+ // Returns truen on success.
+ bool WriteMessage(void *msg, int options);
+
+ // Reads a message out of the queue.
+ // The return value will have at least the length of this queue's worth of
+ // valid data where it's pointing to.
+ // The return value is const because other people might be viewing the same
+ // messsage. Do not cast the const away!
+ // IMPORTANT: The return value (if not NULL) must eventually be passed to
+ // FreeMessage.
+ const void *ReadMessage(int options);
+ // Exactly the same as aos_queue_read_msg, except it will never return the
+ // same message twice with the same index argument. However, it may not
+ // return some messages that pass through the queue.
+ // *index should start as 0. index does not have to be in shared memory, but
+ // it can be
+ const void *ReadMessageIndex(int options, int *index);
+
+ // Retrieves ("allocates") a message that can then be written to the queue.
+ // NOTE: the return value will be completely uninitialized
+ // The return value will have at least the length of this queue's worth of
+ // valid memory where it's pointing to.
+ // Returns NULL for error.
+ // IMPORTANT: The return value (if not NULL) must eventually be passed to
+ // FreeMessage.
+ void *GetMessage();
+
+ // It is ok to call this method with a NULL msg.
+ void FreeMessage(const void *msg) {
+ if (msg != NULL) DecrementMessageReferenceCount(msg);
+ }
+
+ private:
+ struct MessageHeader;
+ struct ReadData;
+
+ bool is_readable() { return data_end_ != data_start_; }
+ bool is_writable() { return ((data_end_ + 1) % data_length_) != data_start_; }
+
+ // These next 4 allow finding the right one.
+ const char *name_;
+ size_t length_;
+ int hash_;
+ int queue_length_;
+ // The next one in the linked list of queues.
+ RawQueue *next_;
+
+ RawQueue *recycle_;
+
+ Mutex data_lock_; // protects operations on data_ etc
+ // Always gets broadcasted to because different readers might have different
+ // ideas of what "readable" means (ie ones using separated indices).
+ Condition readable_;
+ Condition writable_;
+ int data_length_; // max length into data + 1
+ int data_start_; // is an index into data
+ int data_end_; // is an index into data
+ int messages_; // that have passed through
+ void **data_; // array of messages (with headers)
+
+ Mutex pool_lock_;
+ size_t msg_length_; // sizeof(each message) including the header
+ int mem_length_; // the max number of messages that will ever be allocated
+ int messages_used_;
+ int pool_length_; // the number of allocated messages
+ MessageHeader **pool_; // array of pointers to messages
+
+ // Actually frees the given message.
+ void DoFreeMessage(const void *msg);
+ // Calls DoFreeMessage if appropriate.
+ void DecrementMessageReferenceCount(const void *msg);
+
+ // Should be called with data_lock_ locked.
+ // *read_data will be initialized.
+ // Returns with a readable message in data_ or false.
+ bool ReadCommonStart(int options, int *index, ReadData *read_data);
+ // Deals with setting/unsetting readable_ and writable_.
+ // Should be called after data_lock_ has been unlocked.
+ // read_data should be the same thing that was passed in to ReadCommonStart.
+ void ReadCommonEnd(ReadData *read_data);
+ // Handles reading with kPeek.
+ void *ReadPeek(int options, int start);
+
+ // Gets called by Fetch when necessary (with placement new).
+ RawQueue(const char *name, size_t length, int hash, int queue_length);
+};
+
+} // namespace aos
+
+#endif // AOS_LINUX_CODE_IPC_LIB_QUEUE_H_
diff --git a/aos/linux_code/ipc_lib/queue_test.cc b/aos/linux_code/ipc_lib/queue_test.cc
new file mode 100644
index 0000000..c87bd7b
--- /dev/null
+++ b/aos/linux_code/ipc_lib/queue_test.cc
@@ -0,0 +1,424 @@
+#include "aos/common/queue.h"
+
+#include <unistd.h>
+#include <sys/mman.h>
+#include <inttypes.h>
+
+#include <ostream>
+#include <memory>
+#include <map>
+
+#include "gtest/gtest.h"
+
+#include "aos/linux_code/ipc_lib/core_lib.h"
+#include "aos/common/type_traits.h"
+#include "aos/common/queue_testutils.h"
+#include "aos/common/time.h"
+#include "aos/common/logging/logging.h"
+
+using ::testing::AssertionResult;
+using ::testing::AssertionSuccess;
+using ::testing::AssertionFailure;
+using ::aos::common::testing::GlobalCoreInstance;
+
+namespace aos {
+namespace testing {
+
+class QueueTest : public ::testing::Test {
+ protected:
+ static const size_t kFailureSize = 400;
+ static char *fatal_failure;
+ private:
+ enum class ResultType : uint8_t {
+ NotCalled,
+ Called,
+ Returned,
+ };
+ const std::string ResultTypeString(volatile const ResultType &result) {
+ switch (result) {
+ case ResultType::Returned:
+ return "Returned";
+ case ResultType::Called:
+ return "Called";
+ case ResultType::NotCalled:
+ return "NotCalled";
+ default:
+ return std::string("unknown(" + static_cast<uint8_t>(result)) + ")";
+ }
+ }
+ static_assert(aos::shm_ok<ResultType>::value,
+ "this will get put in shared memory");
+ template<typename T>
+ struct FunctionToCall {
+ FunctionToCall() : result(ResultType::NotCalled) {
+ started.Lock();
+ }
+
+ volatile ResultType result;
+ bool expected;
+ void (*function)(T*, char*);
+ T *arg;
+ volatile char failure[kFailureSize];
+ Mutex started;
+ };
+ template<typename T>
+ static void Hangs_(FunctionToCall<T> *const to_call) {
+ to_call->started.Unlock();
+ to_call->result = ResultType::Called;
+ to_call->function(to_call->arg, const_cast<char *>(to_call->failure));
+ to_call->result = ResultType::Returned;
+ }
+
+ // How long until a function is considered to have hung.
+ static constexpr time::Time kHangTime = time::Time::InSeconds(0.035);
+ // How long to sleep after forking (for debugging).
+ static constexpr time::Time kForkSleep = time::Time::InSeconds(0);
+
+ // Represents a process that has been forked off. The destructor kills the
+ // process and wait(2)s for it.
+ class ForkedProcess {
+ public:
+ ForkedProcess(pid_t pid, mutex *lock) : pid_(pid), lock_(lock) {};
+ ~ForkedProcess() {
+ if (kill(pid_, SIGINT) == -1) {
+ if (errno == ESRCH) {
+ printf("process %jd was already dead\n", static_cast<intmax_t>(pid_));
+ } else {
+ fprintf(stderr, "kill(SIGKILL, %jd) failed with %d: %s\n",
+ static_cast<intmax_t>(pid_), errno, strerror(errno));
+ }
+ return;
+ }
+ const pid_t ret = wait(NULL);
+ if (ret == -1) {
+ LOG(WARNING, "wait(NULL) failed."
+ " child %jd might still be alive\n",
+ static_cast<intmax_t>(pid_));
+ } else if (ret == 0) {
+ LOG(WARNING, "child %jd wasn't waitable. it might still be alive\n",
+ static_cast<intmax_t>(pid_));
+ } else if (ret != pid_) {
+ LOG(WARNING, "child %d is now confirmed dead"
+ ", but child %jd might still be alive\n",
+ ret, static_cast<intmax_t>(pid_));
+ }
+ }
+
+ enum class JoinResult {
+ Finished, Hung, Error
+ };
+ JoinResult Join(time::Time timeout = kHangTime) {
+ timespec lock_timeout = (kForkSleep + timeout).ToTimespec();
+ switch (mutex_lock_timeout(lock_, &lock_timeout)) {
+ case 2:
+ return JoinResult::Hung;
+ case 0:
+ return JoinResult::Finished;
+ default:
+ return JoinResult::Error;
+ }
+ }
+
+ private:
+ const pid_t pid_;
+ mutex *const lock_;
+ } __attribute__((unused));
+
+ // State for HangsFork and HangsCheck.
+ typedef uint8_t ChildID;
+ static void ReapExitHandler() {
+ for (auto it = children_.begin(); it != children_.end(); ++it) {
+ delete it->second;
+ }
+ }
+ static std::map<ChildID, ForkedProcess *> children_;
+ std::map<ChildID, FunctionToCall<void> *> to_calls_;
+
+ void SetUp() override {
+ ::testing::Test::SetUp();
+ fatal_failure = static_cast<char *>(shm_malloc(sizeof(fatal_failure)));
+ static bool registered = false;
+ if (!registered) {
+ atexit(ReapExitHandler);
+ registered = true;
+ }
+ }
+
+ protected:
+ // function gets called with arg in a forked process.
+ // Leaks shared memory.
+ template<typename T> __attribute__((warn_unused_result))
+ std::unique_ptr<ForkedProcess> ForkExecute(void (*function)(T*), T *arg) {
+ mutex *lock = static_cast<mutex *>(shm_malloc_aligned(
+ sizeof(*lock), sizeof(int)));
+ assert(mutex_lock(lock) == 0);
+ const pid_t pid = fork();
+ switch (pid) {
+ case 0: // child
+ if (kForkSleep != time::Time(0, 0)) {
+ LOG(INFO, "pid %jd sleeping for %ds%dns\n",
+ static_cast<intmax_t>(getpid()),
+ kForkSleep.sec(), kForkSleep.nsec());
+ time::SleepFor(kForkSleep);
+ }
+ ::aos::common::testing::PreventExit();
+ function(arg);
+ mutex_unlock(lock);
+ exit(EXIT_SUCCESS);
+ case -1: // parent failure
+ LOG(ERROR, "fork() failed with %d: %s\n", errno, strerror(errno));
+ return std::unique_ptr<ForkedProcess>();
+ default: // parent
+ return std::unique_ptr<ForkedProcess>(new ForkedProcess(pid, lock));
+ }
+ }
+
+ // Checks whether or not the given function hangs.
+ // expected is whether to return success or failure if the function hangs
+ // NOTE: There are other reasons for it to return a failure than the function
+ // doing the wrong thing.
+ // Leaks shared memory.
+ template<typename T>
+ AssertionResult Hangs(void (*function)(T*, char*), T *arg, bool expected) {
+ AssertionResult fork_result(HangsFork<T>(function, arg, expected, 0));
+ if (!fork_result) {
+ return fork_result;
+ }
+ return HangsCheck(0);
+ }
+ // Starts the first part of Hangs.
+ // Use HangsCheck to get the result.
+ // Returns whether the fork succeeded or not, NOT whether or not the hang
+ // check succeeded.
+ template<typename T>
+ AssertionResult HangsFork(void (*function)(T*, char *), T *arg,
+ bool expected, ChildID id) {
+ static_assert(aos::shm_ok<FunctionToCall<T>>::value,
+ "this is going into shared memory");
+ FunctionToCall<T> *const to_call =
+ static_cast<FunctionToCall<T> *>(
+ shm_malloc_aligned(sizeof(*to_call), alignof(FunctionToCall<T>)));
+ new (to_call) FunctionToCall<T>();
+ to_call->function = function;
+ to_call->arg = arg;
+ to_call->expected = expected;
+ to_call->failure[0] = '\0';
+ static_cast<char *>(fatal_failure)[0] = '\0';
+ children_[id] = ForkExecute(Hangs_, to_call).release();
+ if (!children_[id]) return AssertionFailure() << "ForkExecute failed";
+ to_calls_[id] = reinterpret_cast<FunctionToCall<void> *>(to_call);
+ to_call->started.Lock();
+ return AssertionSuccess();
+ }
+ // Checks whether or not a function hung like it was supposed to.
+ // Use HangsFork first.
+ // NOTE: calls to HangsFork and HangsCheck with the same id argument will
+ // correspond, but they do not nest. Also, id 0 is used by Hangs.
+ // Return value is the same as Hangs.
+ AssertionResult HangsCheck(ChildID id) {
+ std::unique_ptr<ForkedProcess> child(children_[id]);
+ children_.erase(id);
+ const ForkedProcess::JoinResult result = child->Join();
+ if (to_calls_[id]->failure[0] != '\0') {
+ return AssertionFailure() << "function says: "
+ << const_cast<char *>(to_calls_[id]->failure);
+ }
+ if (result == ForkedProcess::JoinResult::Finished) {
+ return !to_calls_[id]->expected ? AssertionSuccess() : (AssertionFailure()
+ << "something happened and the the test only got to "
+ << ResultTypeString(to_calls_[id]->result));
+ } else {
+ if (to_calls_[id]->result == ResultType::Called) {
+ return to_calls_[id]->expected ? AssertionSuccess() :
+ AssertionFailure();
+ } else if (result == ForkedProcess::JoinResult::Error) {
+ return AssertionFailure() << "error joining child";
+ } else {
+ abort();
+ return AssertionFailure() << "something weird happened";
+ }
+ }
+ }
+#define EXPECT_HANGS(function, arg) \
+ EXPECT_HANGS_COND(function, arg, true, EXPECT_TRUE)
+#define EXPECT_RETURNS(function, arg) \
+ EXPECT_HANGS_COND(function, arg, false, EXPECT_TRUE)
+#define EXPECT_RETURNS_FAILS(function, arg) \
+ EXPECT_HANGS_COND(function, arg, false, EXPECT_FALSE)
+#define EXPECT_HANGS_COND(function, arg, hangs, cond) do { \
+ cond(Hangs(function, arg, hangs)); \
+ if (fatal_failure[0] != '\0') { \
+ FAIL() << fatal_failure; \
+ } \
+} while (false)
+
+ struct TestMessage {
+ // Some contents because we don't really want to test empty messages.
+ int16_t data;
+ };
+ struct MessageArgs {
+ RawQueue *const queue;
+ int flags;
+ int16_t data; // -1 means NULL expected
+ };
+ static void WriteTestMessage(MessageArgs *args, char *failure) {
+ TestMessage *msg = static_cast<TestMessage *>(args->queue->GetMessage());
+ if (msg == NULL) {
+ snprintf(fatal_failure, kFailureSize,
+ "couldn't get_msg from %p", args->queue);
+ return;
+ }
+ msg->data = args->data;
+ if (!args->queue->WriteMessage(msg, args->flags)) {
+ snprintf(failure, kFailureSize, "write_msg_free(%p, %p, %d) failed",
+ args->queue, msg, args->flags);
+ }
+ }
+ static void ReadTestMessage(MessageArgs *args, char *failure) {
+ const TestMessage *msg = static_cast<const TestMessage *>(
+ args->queue->ReadMessage(args->flags));
+ if (msg == NULL) {
+ if (args->data != -1) {
+ snprintf(failure, kFailureSize,
+ "expected data of %" PRId16 " but got NULL message",
+ args->data);
+ }
+ } else {
+ if (args->data != msg->data) {
+ snprintf(failure, kFailureSize,
+ "expected data of %" PRId16 " but got %" PRId16 " instead",
+ args->data, msg->data);
+ }
+ args->queue->FreeMessage(msg);
+ }
+ }
+
+ private:
+ GlobalCoreInstance my_core;
+};
+char *QueueTest::fatal_failure;
+std::map<QueueTest::ChildID, QueueTest::ForkedProcess *> QueueTest::children_;
+constexpr time::Time QueueTest::kHangTime;
+constexpr time::Time QueueTest::kForkSleep;
+
+TEST_F(QueueTest, Reading) {
+ RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 1);
+ MessageArgs args{queue, 0, -1};
+
+ args.flags = RawQueue::kNonBlock;
+ EXPECT_RETURNS(ReadTestMessage, &args);
+ args.flags = RawQueue::kNonBlock | RawQueue::kPeek;
+ EXPECT_RETURNS(ReadTestMessage, &args);
+ args.flags = 0;
+ EXPECT_HANGS(ReadTestMessage, &args);
+ args.flags = RawQueue::kPeek;
+ EXPECT_HANGS(ReadTestMessage, &args);
+ args.data = 254;
+ args.flags = RawQueue::kBlock;
+ EXPECT_RETURNS(WriteTestMessage, &args);
+ args.flags = RawQueue::kPeek;
+ EXPECT_RETURNS(ReadTestMessage, &args);
+ args.flags = RawQueue::kPeek;
+ EXPECT_RETURNS(ReadTestMessage, &args);
+ args.flags = RawQueue::kPeek | RawQueue::kNonBlock;
+ EXPECT_RETURNS(ReadTestMessage, &args);
+ args.flags = 0;
+ EXPECT_RETURNS(ReadTestMessage, &args);
+ args.flags = 0;
+ args.data = -1;
+ EXPECT_HANGS(ReadTestMessage, &args);
+ args.flags = RawQueue::kNonBlock;
+ EXPECT_RETURNS(ReadTestMessage, &args);
+ args.flags = 0;
+ args.data = 971;
+ EXPECT_RETURNS_FAILS(ReadTestMessage, &args);
+}
+TEST_F(QueueTest, Writing) {
+ RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 1);
+ MessageArgs args{queue, 0, 973};
+
+ args.flags = RawQueue::kBlock;
+ EXPECT_RETURNS(WriteTestMessage, &args);
+ args.flags = RawQueue::kBlock;
+ EXPECT_HANGS(WriteTestMessage, &args);
+ args.flags = RawQueue::kNonBlock;
+ EXPECT_RETURNS_FAILS(WriteTestMessage, &args);
+ args.flags = RawQueue::kNonBlock;
+ EXPECT_RETURNS_FAILS(WriteTestMessage, &args);
+ args.flags = RawQueue::kPeek;
+ EXPECT_RETURNS(ReadTestMessage, &args);
+ args.data = 971;
+ args.flags = RawQueue::kOverride;
+ EXPECT_RETURNS(WriteTestMessage, &args);
+ args.flags = RawQueue::kOverride;
+ EXPECT_RETURNS(WriteTestMessage, &args);
+ args.flags = 0;
+ EXPECT_RETURNS(ReadTestMessage, &args);
+ args.flags = RawQueue::kNonBlock;
+ EXPECT_RETURNS(WriteTestMessage, &args);
+ args.flags = 0;
+ EXPECT_RETURNS(ReadTestMessage, &args);
+ args.flags = RawQueue::kOverride;
+ EXPECT_RETURNS(WriteTestMessage, &args);
+ args.flags = 0;
+ EXPECT_RETURNS(ReadTestMessage, &args);
+}
+
+TEST_F(QueueTest, MultiRead) {
+ RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage), 1, 1);
+ MessageArgs args{queue, 0, 1323};
+
+ args.flags = RawQueue::kBlock;
+ EXPECT_RETURNS(WriteTestMessage, &args);
+ args.flags = RawQueue::kBlock;
+ ASSERT_TRUE(HangsFork(ReadTestMessage, &args, true, 1));
+ ASSERT_TRUE(HangsFork(ReadTestMessage, &args, true, 2));
+ EXPECT_TRUE(HangsCheck(1) != HangsCheck(2));
+ // TODO(brians) finish this
+}
+
+TEST_F(QueueTest, Recycle) {
+ // TODO(brians) basic test of recycle queue
+ // include all of the ways a message can get into the recycle queue
+ RawQueue *recycle_queue = reinterpret_cast<RawQueue *>(23);
+ RawQueue *const queue = RawQueue::Fetch("Queue", sizeof(TestMessage),
+ 1, 2, 2, 2, &recycle_queue);
+ ASSERT_NE(reinterpret_cast<RawQueue *>(23), recycle_queue);
+ MessageArgs args{queue, 0, 973}, recycle{recycle_queue, 0, 973};
+
+ args.flags = RawQueue::kBlock;
+ EXPECT_RETURNS(WriteTestMessage, &args);
+ EXPECT_HANGS(ReadTestMessage, &recycle);
+ args.data = 254;
+ EXPECT_RETURNS(WriteTestMessage, &args);
+ EXPECT_HANGS(ReadTestMessage, &recycle);
+ args.data = 971;
+ args.flags = RawQueue::kOverride;
+ EXPECT_RETURNS(WriteTestMessage, &args);
+ recycle.flags = RawQueue::kBlock;
+ EXPECT_RETURNS(ReadTestMessage, &recycle);
+
+ EXPECT_HANGS(ReadTestMessage, &recycle);
+
+ TestMessage *msg = static_cast<TestMessage *>(queue->GetMessage());
+ ASSERT_TRUE(msg != NULL);
+ msg->data = 341;
+ queue->FreeMessage(msg);
+ recycle.data = 341;
+ EXPECT_RETURNS(ReadTestMessage, &recycle);
+
+ EXPECT_HANGS(ReadTestMessage, &recycle);
+
+ args.data = 254;
+ args.flags = RawQueue::kPeek;
+ EXPECT_RETURNS(ReadTestMessage, &args);
+ recycle.flags = RawQueue::kBlock;
+ EXPECT_HANGS(ReadTestMessage, &recycle);
+ args.flags = RawQueue::kBlock;
+ EXPECT_RETURNS(ReadTestMessage, &args);
+ recycle.data = 254;
+ EXPECT_RETURNS(ReadTestMessage, &recycle);
+}
+
+} // namespace testing
+} // namespace aos
diff --git a/aos/linux_code/ipc_lib/shared_mem.c b/aos/linux_code/ipc_lib/shared_mem.c
new file mode 100644
index 0000000..4126d65
--- /dev/null
+++ b/aos/linux_code/ipc_lib/shared_mem.c
@@ -0,0 +1,129 @@
+#include "aos/linux_code/ipc_lib/shared_mem.h"
+
+#include <stdio.h>
+#include <string.h>
+#include <sys/mman.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <sys/types.h>
+#include <errno.h>
+
+#include "aos/linux_code/ipc_lib/core_lib.h"
+
+// the path for the shared memory segment. see shm_open(3) for restrictions
+#define AOS_SHM_NAME "/aos_shared_mem"
+// Size of the shared mem segment.
+// Set to the maximum number that worked. Any bigger than this and the kernel
+// thinks you should be able to access all of it but it doesn't work with the
+// ARM kernel Brian was using on 2013-12-20.
+#define SIZEOFSHMSEG (4096 * 25074)
+
+void init_shared_mem_core(aos_shm_core *shm_core) {
+ clock_gettime(CLOCK_REALTIME, &shm_core->identifier);
+ shm_core->msg_alloc_lock = 0;
+ shm_core->queues.queue_list = NULL;
+ shm_core->queues.alloc_lock = 0;
+}
+
+ptrdiff_t aos_core_get_mem_usage(void) {
+ return global_core->size -
+ ((ptrdiff_t)global_core->mem_struct->msg_alloc -
+ (ptrdiff_t)global_core->mem_struct);
+}
+
+struct aos_core *global_core = NULL;
+
+int aos_core_create_shared_mem(enum aos_core_create to_create) {
+ static struct aos_core global_core_data;
+ global_core = &global_core_data;
+ int shm;
+before:
+ if (to_create == create) {
+ printf("shared_mem: creating\n");
+ shm = shm_open(AOS_SHM_NAME, O_RDWR | O_CREAT | O_EXCL, 0666);
+ global_core->owner = 1;
+ if (shm == -1 && errno == EEXIST) {
+ printf("shared_mem: going to shm_unlink(" AOS_SHM_NAME ")\n");
+ if (shm_unlink(AOS_SHM_NAME) == -1) {
+ fprintf(stderr, "shared_mem: shm_unlink(" AOS_SHM_NAME ") failed with of %d: %s\n", errno, strerror(errno));
+ } else {
+ goto before;
+ }
+ }
+ } else {
+ printf("shared_mem: not creating\n");
+ shm = shm_open(AOS_SHM_NAME, O_RDWR, 0);
+ global_core->owner = 0;
+ }
+ if (shm == -1) {
+ fprintf(stderr, "shared_mem:"
+ " shm_open(" AOS_SHM_NAME ", O_RDWR [| O_CREAT | O_EXCL, 0|0666)"
+ " failed with %d: %s\n", errno, strerror(errno));
+ return -1;
+ }
+ if (global_core->owner) {
+ if (ftruncate(shm, SIZEOFSHMSEG) == -1) {
+ fprintf(stderr, "shared_mem: fruncate(%d, 0x%zx) failed with %d: %s\n",
+ shm, (size_t)SIZEOFSHMSEG, errno, strerror(errno));
+ return -1;
+ }
+ }
+ void *shm_address = mmap(
+ (void *)SHM_START, SIZEOFSHMSEG, PROT_READ | PROT_WRITE,
+ MAP_SHARED | MAP_FIXED | MAP_LOCKED | MAP_POPULATE, shm, 0);
+ if (shm_address == MAP_FAILED) {
+ fprintf(stderr, "shared_mem: mmap(%p, 0x%zx, stuff, stuff, %d, 0) failed"
+ " with %d: %s\n",
+ (void *)SHM_START, SIZEOFSHMSEG, shm, errno, strerror(errno));
+ return -1;
+ }
+ printf("shared_mem: shm at: %p\n", shm_address);
+ if (close(shm) == -1) {
+ printf("shared_mem: close(%d(=shm) failed with %d: %s\n",
+ shm, errno, strerror(errno));
+ }
+ if (shm_address != (void *)SHM_START) {
+ fprintf(stderr, "shared_mem: shm isn't at hard-coded %p. at %p instead\n",
+ (void *)SHM_START, shm_address);
+ return -1;
+ }
+ return aos_core_use_address_as_shared_mem(shm_address, SIZEOFSHMSEG);
+}
+
+int aos_core_use_address_as_shared_mem(void *address, size_t size) {
+ global_core->mem_struct = address;
+ global_core->size = size;
+ global_core->shared_mem = (uint8_t *)address + sizeof(*global_core->mem_struct);
+ if (global_core->owner) {
+ global_core->mem_struct->msg_alloc = (uint8_t *)address + global_core->size;
+ init_shared_mem_core(global_core->mem_struct);
+ }
+ if (global_core->owner) {
+ futex_set(&global_core->mem_struct->creation_condition);
+ } else {
+ if (futex_wait(&global_core->mem_struct->creation_condition) != 0) {
+ fprintf(stderr, "waiting on creation_condition failed\n");
+ return -1;
+ }
+ }
+ fprintf(stderr, "shared_mem: end of create_shared_mem owner=%d\n",
+ global_core->owner);
+ return 0;
+}
+
+int aos_core_free_shared_mem(){
+ void *shm_address = global_core->shared_mem;
+ if (munmap((void *)SHM_START, SIZEOFSHMSEG) == -1) {
+ fprintf(stderr, "shared_mem: munmap(%p, 0x%zx) failed with %d: %s\n",
+ shm_address, SIZEOFSHMSEG, errno, strerror(errno));
+ return -1;
+ }
+ if (global_core->owner) {
+ if (shm_unlink(AOS_SHM_NAME)) {
+ fprintf(stderr, "shared_mem: shm_unlink(" AOS_SHM_NAME ") failed with %d: %s\n",
+ errno, strerror(errno));
+ return -1;
+ }
+ }
+ return 0;
+}
diff --git a/aos/linux_code/ipc_lib/shared_mem.h b/aos/linux_code/ipc_lib/shared_mem.h
new file mode 100644
index 0000000..e5059c4
--- /dev/null
+++ b/aos/linux_code/ipc_lib/shared_mem.h
@@ -0,0 +1,69 @@
+#ifndef _SHARED_MEM_H_
+#define _SHARED_MEM_H_
+
+#include <stddef.h>
+#include <unistd.h>
+#include <time.h>
+
+#include "aos/linux_code/ipc_lib/aos_sync.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+extern struct aos_core *global_core;
+
+// Where the shared memory segment starts in each process's address space.
+// Has to be the same in all of them so that stuff in shared memory
+// can have regular pointers to other stuff in shared memory.
+#define SHM_START 0x20000000
+
+typedef struct aos_queue_global_t {
+ mutex alloc_lock;
+ void *queue_list; // an aos::Queue* declared in C code
+} aos_queue_global;
+
+typedef struct aos_shm_core_t {
+ // clock_gettime(CLOCK_REALTIME, &identifier) gets called to identify
+ // this shared memory area
+ struct timespec identifier;
+ // gets 0-initialized at the start (as part of shared memory) and
+ // the owner sets as soon as it finishes setting stuff up
+ mutex creation_condition;
+ mutex msg_alloc_lock;
+ void *msg_alloc;
+ aos_queue_global queues;
+} aos_shm_core;
+
+enum aos_core_create {
+ create,
+ reference
+};
+struct aos_core {
+ int owner;
+ void *shared_mem;
+ // How large the chunk of shared memory is.
+ ptrdiff_t size;
+ aos_shm_core *mem_struct;
+};
+
+void init_shared_mem_core(aos_shm_core *shm_core);
+
+ptrdiff_t aos_core_get_mem_usage(void);
+
+// Takes the specified memory address and uses it as the shared memory.
+// address is the memory address, and size is the size of the memory.
+// global_core needs to point to an instance of struct aos_core, and owner
+// should be set correctly there.
+// The owner should verify that the first sizeof(mutex) of data is set to 0
+// before passing the memory to this function.
+int aos_core_use_address_as_shared_mem(void *address, size_t size);
+
+int aos_core_create_shared_mem(enum aos_core_create to_create);
+int aos_core_free_shared_mem(void);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
diff --git a/aos/linux_code/linux_code.gyp b/aos/linux_code/linux_code.gyp
new file mode 100644
index 0000000..49be0ce
--- /dev/null
+++ b/aos/linux_code/linux_code.gyp
@@ -0,0 +1,27 @@
+{
+ 'targets': [
+ {
+ 'target_name': 'init',
+ 'type': 'static_library',
+ 'sources': [
+ '<(AOS)/linux_code/init.cc',
+ ],
+ 'dependencies': [
+ '<(AOS)/linux_code/ipc_lib/ipc_lib.gyp:shared_mem',
+ '<(AOS)/common/common.gyp:die',
+ '<(AOS)/build/aos.gyp:logging',
+ ],
+ },
+ {
+ 'target_name': 'configuration',
+ 'type': 'static_library',
+ 'sources': [
+ 'configuration.cc',
+ ],
+ 'dependencies': [
+ '<(AOS)/common/common.gyp:once',
+ '<(AOS)/build/aos.gyp:logging',
+ ],
+ },
+ ],
+}
diff --git a/aos/linux_code/logging/linux_logging.cc b/aos/linux_code/logging/linux_logging.cc
new file mode 100644
index 0000000..faeb04a
--- /dev/null
+++ b/aos/linux_code/logging/linux_logging.cc
@@ -0,0 +1,142 @@
+#include "aos/linux_code/logging/linux_logging.h"
+
+#include <stdarg.h>
+#include <stdio.h>
+#include <string.h>
+#include <time.h>
+#include <sys/types.h>
+#include <errno.h>
+#include <unistd.h>
+#include <limits.h>
+#include <sys/prctl.h>
+
+#include <algorithm>
+
+#include "aos/common/die.h"
+#include "aos/common/logging/logging_impl.h"
+#include "aos/linux_code/thread_local.h"
+#include "aos/linux_code/ipc_lib/queue.h"
+
+namespace aos {
+namespace logging {
+namespace {
+
+using internal::Context;
+
+AOS_THREAD_LOCAL Context *my_context(NULL);
+
+::std::string GetMyName() {
+ // The maximum number of characters that can make up a thread name.
+ // The docs are unclear if it can be 16 characters with no '\0', so we'll be
+ // safe by adding our own where necessary.
+ static const size_t kThreadNameLength = 16;
+
+ ::std::string process_name(program_invocation_short_name);
+
+ char thread_name_array[kThreadNameLength + 1];
+ if (prctl(PR_GET_NAME, thread_name_array) != 0) {
+ Die("prctl(PR_GET_NAME, %p) failed with %d: %s\n",
+ thread_name_array, errno, strerror(errno));
+ }
+ thread_name_array[sizeof(thread_name_array) - 1] = '\0';
+ ::std::string thread_name(thread_name_array);
+
+ // If the first bunch of characters are the same.
+ // We cut off comparing at the shorter of the 2 strings because one or the
+ // other often ends up cut off.
+ if (strncmp(thread_name.c_str(), process_name.c_str(),
+ ::std::min(thread_name.length(), process_name.length())) == 0) {
+ // This thread doesn't have an actual name.
+ return process_name;
+ }
+
+ return process_name + '.' + thread_name;
+}
+
+RawQueue *queue;
+
+} // namespace
+namespace internal {
+
+Context *Context::Get() {
+ if (my_context == NULL) {
+ my_context = new Context();
+ my_context->name = GetMyName();
+ if (my_context->name.size() + 1 > sizeof(LogMessage::name)) {
+ Die("logging: process/thread name '%s' is too long\n",
+ my_context->name.c_str());
+ }
+ my_context->source = getpid();
+ }
+ return my_context;
+}
+
+void Context::Delete() {
+ delete my_context;
+ my_context = NULL;
+}
+
+} // namespace internal
+namespace linux_code {
+namespace {
+
+class linuxQueueLogImplementation : public LogImplementation {
+ virtual void DoLog(log_level level, const char *format, va_list ap) {
+ LogMessage *message = static_cast<LogMessage *>(queue->GetMessage());
+ if (message == NULL) {
+ LOG(FATAL, "queue get message failed\n");
+ }
+
+ internal::FillInMessage(level, format, ap, message);
+
+ Write(message);
+ }
+};
+
+} // namespace
+
+void Register() {
+ Init();
+
+ queue = RawQueue::Fetch("LoggingQueue", sizeof(LogMessage), 1323, 1500);
+ if (queue == NULL) {
+ Die("logging: couldn't fetch queue\n");
+ }
+
+ AddImplementation(new linuxQueueLogImplementation());
+}
+
+const LogMessage *ReadNext(int flags, int *index) {
+ return static_cast<const LogMessage *>(queue->ReadMessageIndex(flags, index));
+}
+
+const LogMessage *ReadNext() {
+ return ReadNext(RawQueue::kBlock);
+}
+
+const LogMessage *ReadNext(int flags) {
+ const LogMessage *r = NULL;
+ do {
+ r = static_cast<const LogMessage *>(queue->ReadMessage(flags));
+ // not blocking means return a NULL if that's what it gets
+ } while ((flags & RawQueue::kBlock) && r == NULL);
+ return r;
+}
+
+LogMessage *Get() {
+ return static_cast<LogMessage *>(queue->GetMessage());
+}
+
+void Free(const LogMessage *msg) {
+ queue->FreeMessage(msg);
+}
+
+void Write(LogMessage *msg) {
+ if (!queue->WriteMessage(msg, RawQueue::kOverride)) {
+ LOG(FATAL, "writing failed");
+ }
+}
+
+} // namespace linux_code
+} // namespace logging
+} // namespace aos
diff --git a/aos/linux_code/logging/linux_logging.h b/aos/linux_code/logging/linux_logging.h
new file mode 100644
index 0000000..928a480
--- /dev/null
+++ b/aos/linux_code/logging/linux_logging.h
@@ -0,0 +1,30 @@
+#ifndef AOS_LINUX_CODE_LOGGING_LOGGING_H_
+#define AOS_LINUX_CODE_LOGGING_LOGGING_H_
+
+#include "aos/common/logging/logging_impl.h"
+
+namespace aos {
+namespace logging {
+namespace linux_code {
+
+// Calls AddImplementation to register the usual linux logging implementation
+// which sends the messages through a queue. This implementation relies on
+// another process(es) to read the log messages that it puts into the queue.
+// It gets called by aos::Init*.
+void Register();
+
+// Fairly simple wrappers around the raw queue calls.
+
+// This one never returns NULL if flags contains BLOCK.
+const LogMessage *ReadNext(int flags);
+const LogMessage *ReadNext(int flags, int *index);
+const LogMessage *ReadNext();
+LogMessage *Get();
+void Free(const LogMessage *msg);
+void Write(LogMessage *msg);
+
+} // namespace linux_code
+} // namespace logging
+} // namespace aos
+
+#endif
diff --git a/aos/linux_code/logging/logging.gyp b/aos/linux_code/logging/logging.gyp
new file mode 100644
index 0000000..dfb189c
--- /dev/null
+++ b/aos/linux_code/logging/logging.gyp
@@ -0,0 +1,4 @@
+{
+ 'targets': [
+ ],
+}
diff --git a/aos/linux_code/output/HTTPServer.cpp b/aos/linux_code/output/HTTPServer.cpp
new file mode 100644
index 0000000..1703d39
--- /dev/null
+++ b/aos/linux_code/output/HTTPServer.cpp
@@ -0,0 +1,153 @@
+#include "aos/linux_code/output/HTTPServer.h"
+
+#include <inttypes.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#include <string.h>
+
+#include <memory>
+
+#include "event2/event.h"
+
+#include "aos/common/scoped_fd.h"
+#include "aos/common/unique_malloc_ptr.h"
+
+namespace aos {
+namespace http {
+
+HTTPServer::HTTPServer(const char *directory, uint16_t port) :
+ directory_(directory), base_(event_base_new()), http_(evhttp_new(base_)) {
+ if (base_ == NULL) {
+ LOG(FATAL, "couldn't create an event_base\n");
+ }
+ if (http_ == NULL) {
+ LOG(FATAL, "couldn't create an evhttp\n");
+ }
+ if (evhttp_bind_socket(http_, "0.0.0.0", port) != 0) {
+ LOG(FATAL, "evhttp_bind_socket(%p, \"0.0.0.0\", %" PRIu16 ") failed\n",
+ http_, port);
+ }
+ evhttp_set_gencb(http_, StaticServeFile, this);
+}
+
+void HTTPServer::AddPage(const std::string &path,
+ void (*handler)(evhttp_request *, void *), void *data) {
+ switch (evhttp_set_cb(http_, path.c_str(), handler, data)) {
+ case 0:
+ LOG(DEBUG, "set callback handler for '%s'\n", path.c_str());
+ break;
+ case -1:
+ LOG(INFO, "changed callback handler for '%s'\n", path.c_str());
+ break;
+ default:
+ LOG(WARNING, "evhttp_set_cb(%p, %s, %p, %p) failed\n", http_, path.c_str(),
+ handler, data);
+ break;
+ }
+}
+
+void HTTPServer::AddStandardHeaders(evhttp_request *request) {
+ if (evhttp_add_header(evhttp_request_get_output_headers(request),
+ "Server", "aos::HTTPServer/0.0") == -1) {
+ LOG(WARNING, "adding Server header failed\n");
+ }
+}
+
+namespace {
+// All of these functions return false, NULL, or -1 if they fail (and send back
+// an error).
+
+// Returns the path of the file that is being requested.
+const char *GetPath(evhttp_request *request) {
+ // Docs are unclear whether this needs freeing, but it looks like it just
+ // returns an internal field of the request.
+ // Running valgrind with no freeing of uri or path doesn't report anything
+ // related to this code.
+ const evhttp_uri *uri = evhttp_request_get_evhttp_uri(request);
+ const char *path = evhttp_uri_get_path(uri);
+ if (path == NULL) {
+ evhttp_send_error(request, HTTP_BADREQUEST, "need a path");
+ return NULL;
+ }
+ if (strstr(path, "..") != NULL) {
+ evhttp_send_error(request, HTTP_NOTFOUND, "no .. allowed!!");
+ return NULL;
+ }
+ return path;
+}
+// Returns an fd open for reading for the file at "directory/path".
+int OpenFile(evhttp_request *request, const char *path,
+ const char *directory) {
+ char *temp;
+ if (asprintf(&temp, "%s/%s", directory, path) == -1) {
+ LOG(WARNING, "asprintf(%p, \"%%s/%%s\", %p, %p) failed with %d: %s\n",
+ &temp, directory, path, errno, strerror(errno));
+ evhttp_send_error(request, HTTP_INTERNAL, NULL);
+ return -1;
+ }
+ const unique_c_ptr<char> filename(temp);
+ ScopedFD file(open(filename.get(), O_RDONLY));
+ if (!file) {
+ if (errno == ENOENT) {
+ evhttp_send_error(request, HTTP_NOTFOUND, NULL);
+ return -1;
+ }
+ LOG(ERROR, "open('%s', 0) failed with %d: %s\n", filename.get(),
+ errno, strerror(errno));
+ evhttp_send_error(request, HTTP_INTERNAL, NULL);
+ return -1;
+ }
+ return file.release();
+}
+// Returns the size of the file specified by the given fd.
+off_t GetSize(int file) {
+ struct stat info;
+ if (fstat(file, &info) == -1) {
+ LOG(ERROR, "stat(%d, %p) failed with %d: %s\n", file, &info,
+ errno, strerror(errno));
+ return -1;
+ }
+ return info.st_size;
+}
+bool SendFileResponse(evhttp_request *request, int file_num) {
+ ScopedFD file(file_num);
+ const off_t size = GetSize(file.get());
+ if (size == -1) {
+ evhttp_send_error(request, HTTP_INTERNAL, NULL);
+ return false;
+ }
+ evbuffer *const buf = evhttp_request_get_output_buffer(request);
+ if (evbuffer_add_file(buf, file.get(), 0, size) == -1) {
+ LOG(WARNING, "evbuffer_add_file(%p, %d, 0, %jd) failed\n", buf,
+ file.get(), static_cast<intmax_t>(size));
+ evhttp_send_error(request, HTTP_INTERNAL, NULL);
+ return false;
+ } else {
+ // it succeeded, so evhttp takes ownership
+ file.release();
+ }
+ evhttp_send_reply(request, HTTP_OK, NULL, NULL);
+ return true;
+}
+
+} // namespace
+void HTTPServer::ServeFile(evhttp_request *request) {
+ AddStandardHeaders(request);
+
+ const char *path = GetPath(request);
+ if (path == NULL) return;
+
+ ScopedFD file(OpenFile(request, path, directory_));
+ if (!file) return;
+
+ if (!SendFileResponse(request, file.release())) return;
+}
+
+void HTTPServer::Run() {
+ event_base_dispatch(base_);
+ LOG(FATAL, "event_base_dispatch returned\n");
+}
+
+} // namespace http
+} // namespace aos
diff --git a/aos/linux_code/output/HTTPServer.h b/aos/linux_code/output/HTTPServer.h
new file mode 100644
index 0000000..99eb295
--- /dev/null
+++ b/aos/linux_code/output/HTTPServer.h
@@ -0,0 +1,58 @@
+#include "event2/buffer.h"
+#include "event2/http.h"
+
+#include <string>
+
+namespace aos {
+namespace http {
+
+// An HTTP server that serves files from a directory using libevent.
+// Also allows configuring certain URLs to be dynamically generated.
+class HTTPServer {
+ public:
+ HTTPServer(const char *directory, uint16_t port);
+ // Starts serving pages.
+ // Might not clean up everything before returning.
+ void Run();
+ protected:
+ template<class T> class MemberHandler {
+ public:
+ typedef void (T::*Handler)(evhttp_request *);
+ struct Holder {
+ T *self;
+ Handler handler;
+ };
+ static void Call(evhttp_request *request, void *handler_in) {
+ const Holder *const holder = static_cast<Holder *>(handler_in);
+ AddStandardHeaders(request);
+ ((holder->self)->*(holder->handler))(request);
+ }
+ };
+ void AddPage(const std::string &path, void (*handler)(evhttp_request *, void *),
+ void *data);
+ template<class T> void AddPage(const std::string &path,
+ typename MemberHandler<T>::Handler handler,
+ T *self) {
+ // have to put "typename" in, so the typedef makes it clearer
+ typedef typename MemberHandler<T>::Holder HolderType;
+ AddPage(path, MemberHandler<T>::Call, new HolderType{self, handler});
+ }
+ // This gets set up as the generic handler.
+ // It can also be called separately to serve the file that the request is
+ // requesting from the filesystem.
+ void ServeFile(evhttp_request *request);
+ private:
+ // The directory where files to be served come from.
+ const char *directory_;
+ // The main libevent structure.
+ event_base *const base_;
+ // The libevent HTTP server handle.
+ evhttp *const http_;
+ static void AddStandardHeaders(evhttp_request *request);
+ static void StaticServeFile(evhttp_request *request, void *self) {
+ static_cast<HTTPServer *>(self)->ServeFile(request);
+ }
+};
+
+} // namespace http
+} // namespace aos
diff --git a/aos/linux_code/output/ctemplate_cache.cc b/aos/linux_code/output/ctemplate_cache.cc
new file mode 100644
index 0000000..cf961c6
--- /dev/null
+++ b/aos/linux_code/output/ctemplate_cache.cc
@@ -0,0 +1,24 @@
+#include "aos/linux_code/output/ctemplate_cache.h"
+
+#include "aos/linux_code/configuration.h"
+#include "aos/common/once.h"
+
+namespace aos {
+namespace http {
+
+namespace {
+ctemplate::TemplateCache *CreateTemplateCache() {
+ ctemplate::TemplateCache *r = new ctemplate::TemplateCache();
+
+ r->SetTemplateRootDirectory(configuration::GetRootDirectory());
+
+ return r;
+}
+} // namespace
+ctemplate::TemplateCache *get_template_cache() {
+ static Once<ctemplate::TemplateCache> once(CreateTemplateCache);
+ return once.Get();
+}
+
+} // namespace http
+} // namespace aos
diff --git a/aos/linux_code/output/ctemplate_cache.h b/aos/linux_code/output/ctemplate_cache.h
new file mode 100644
index 0000000..7e5dc3d
--- /dev/null
+++ b/aos/linux_code/output/ctemplate_cache.h
@@ -0,0 +1,12 @@
+#include "ctemplate/template_cache.h"
+
+namespace aos {
+namespace http {
+
+// Retrieves the cache used by all of the aos functions etc.
+// This cache will have its root directory set to the directory where the
+// executable is running from.
+ctemplate::TemplateCache *get_template_cache();
+
+} // namespace http
+} // namespace aos
diff --git a/aos/linux_code/output/evhttp_ctemplate_emitter.cc b/aos/linux_code/output/evhttp_ctemplate_emitter.cc
new file mode 100644
index 0000000..2301ffb
--- /dev/null
+++ b/aos/linux_code/output/evhttp_ctemplate_emitter.cc
@@ -0,0 +1,18 @@
+#include "aos/linux_code/output/evhttp_ctemplate_emitter.h"
+
+#include "aos/common/logging/logging.h"
+
+namespace aos {
+namespace http {
+
+void EvhttpCtemplateEmitter::Emit(const char *s, size_t slen) {
+ if (error_) return;
+ if (evbuffer_add(buf_, s, slen) != 0) {
+ LOG(ERROR, "evbuffer_add(%p, %p, %zd) failed\n",
+ buf_, s, slen);
+ error_ = true;
+ }
+}
+
+} // namespace http
+} // namespace aos
diff --git a/aos/linux_code/output/evhttp_ctemplate_emitter.h b/aos/linux_code/output/evhttp_ctemplate_emitter.h
new file mode 100644
index 0000000..5b8e96a
--- /dev/null
+++ b/aos/linux_code/output/evhttp_ctemplate_emitter.h
@@ -0,0 +1,34 @@
+#ifndef AOS_LINUX_CODE_OUTPUT_EVHTTP_CTEMPLATE_EMITTER_H_
+#define AOS_LINUX_CODE_OUTPUT_EVHTTP_CTEMPLATE_EMITTER_H_
+
+#include <string.h>
+
+#include "event2/buffer.h"
+#include "ctemplate/template_emitter.h"
+
+namespace aos {
+namespace http {
+
+// Writes everything directly into an evbuffer*.
+// Handles errors by refusing to write anything else into the buffer and storing
+// the state (which can be retrieved with error()).
+class EvhttpCtemplateEmitter : public ctemplate::ExpandEmitter {
+ public:
+ EvhttpCtemplateEmitter(evbuffer *buf) : buf_(buf), error_(false) {}
+ virtual void Emit(char c) { Emit(&c, 1); };
+ virtual void Emit(const std::string& s) { Emit(s.data(), s.size()); };
+ virtual void Emit(const char* s) { Emit(s, strlen(s)); }
+ virtual void Emit(const char* s, size_t slen);
+ // Retrieves whether or not there has been an error. If true, the error will
+ // already have been logged.
+ bool error() { return error_; }
+
+ private:
+ evbuffer *const buf_;
+ bool error_;
+};
+
+} // namespace http
+} // namespace aos
+
+#endif // AOS_LINUX_CODE_OUTPUT_EVHTTP_CTEMPLATE_EMITTER_H_
diff --git a/aos/linux_code/output/output.gyp b/aos/linux_code/output/output.gyp
new file mode 100644
index 0000000..f697630
--- /dev/null
+++ b/aos/linux_code/output/output.gyp
@@ -0,0 +1,24 @@
+{
+ 'targets': [
+ {
+ 'target_name': 'http_server',
+ 'type': 'static_library',
+ 'sources': [
+ 'HTTPServer.cpp',
+ 'evhttp_ctemplate_emitter.cc',
+ 'ctemplate_cache.cc',
+ ],
+ 'dependencies': [
+ '<(EXTERNALS):libevent',
+ '<(EXTERNALS):ctemplate',
+ '<(AOS)/common/common.gyp:once',
+ '<(AOS)/common/common.gyp:scoped_fd',
+ '<(AOS)/build/aos.gyp:logging',
+ ],
+ 'export_dependent_settings': [
+ '<(EXTERNALS):libevent',
+ '<(EXTERNALS):ctemplate',
+ ],
+ },
+ ],
+}
diff --git a/aos/linux_code/queue-tmpl.h b/aos/linux_code/queue-tmpl.h
new file mode 100644
index 0000000..15b8608
--- /dev/null
+++ b/aos/linux_code/queue-tmpl.h
@@ -0,0 +1,272 @@
+namespace aos {
+
+template <class T>
+bool ScopedMessagePtr<T>::Send() {
+ assert(msg_ != NULL);
+ msg_->SetTimeToNow();
+ assert(queue_ != NULL);
+ bool return_value = queue_->WriteMessage(msg_, RawQueue::kOverride);
+ msg_ = NULL;
+ return return_value;
+}
+
+template <class T>
+bool ScopedMessagePtr<T>::SendBlocking() {
+ assert(msg_ != NULL);
+ msg_->SetTimeToNow();
+ assert(queue_ != NULL);
+ bool return_value = queue_->WriteMessage(msg_, RawQueue::kBlock);
+ msg_ = NULL;
+ return return_value;
+}
+
+template <class T>
+void ScopedMessagePtr<T>::reset(T *msg) {
+ if (queue_ != NULL && msg_ != NULL) {
+ queue_->FreeMessage(msg_);
+ }
+ msg_ = msg;
+}
+
+// A SafeScopedMessagePtr<> manages a message pointer.
+// It frees it properly when the ScopedMessagePtr<> goes out of scope or gets
+// sent. By design, there is no way to get the ScopedMessagePtr to release the
+// message pointer. When the message gets sent, it allocates a queue message,
+// copies the data into it, and then sends it. Copies copy the message.
+template <class T>
+class SafeScopedMessagePtr {
+ public:
+ // Returns a pointer to the message.
+ // This stays valid until Send or the destructor have been called.
+ T *get() { return msg_; }
+
+ T &operator*() {
+ T *msg = get();
+ assert(msg != NULL);
+ return *msg;
+ }
+
+ T *operator->() {
+ T *msg = get();
+ assert(msg != NULL);
+ return msg;
+ }
+
+#ifndef SWIG
+ operator bool() {
+ return msg_ != NULL;
+ }
+
+ const T *get() const { return msg_; }
+
+ const T &operator*() const {
+ const T *msg = get();
+ assert(msg != NULL);
+ return *msg;
+ }
+
+ const T *operator->() const {
+ const T *msg = get();
+ assert(msg != NULL);
+ return msg;
+ }
+#endif // SWIG
+
+ // Sends the message and removes our reference to it.
+ // If the queue is full, over-rides the oldest message in it with our new
+ // message.
+ // Returns true on success, and false otherwise.
+ // The message will be freed.
+ bool Send() {
+ assert(msg_ != NULL);
+ assert(queue_ != NULL);
+ msg_->SetTimeToNow();
+ T *shm_msg = static_cast<T *>(queue_->GetMessage());
+ if (shm_msg == NULL) {
+ return false;
+ }
+ *shm_msg = *msg_;
+ bool return_value = queue_->WriteMessage(shm_msg, RawQueue::kOverride);
+ reset();
+ return return_value;
+ }
+
+ // Sends the message and removes our reference to it.
+ // If the queue is full, blocks until it is no longer full.
+ // Returns true on success, and false otherwise.
+ // Frees the message.
+ bool SendBlocking() {
+ assert(msg_ != NULL);
+ assert(queue_ != NULL);
+ msg_->SetTimeToNow();
+ T *shm_msg = static_cast<T *>(queue_->GetMessage());
+ if (shm_msg == NULL) {
+ return false;
+ }
+ *shm_msg = *msg_;
+ bool return_value = queue_->WriteMessage(shm_msg, RawQueue::kBlock);
+ reset();
+ return return_value;
+ }
+
+ // Frees the contained message.
+ ~SafeScopedMessagePtr() {
+ reset();
+ }
+
+#ifndef SWIG
+ // Implements a move constructor to take the message pointer from the
+ // temporary object to save work.
+ SafeScopedMessagePtr(SafeScopedMessagePtr<T> &&ptr)
+ : queue_(ptr.queue_),
+ msg_(ptr.msg_) {
+ ptr.msg_ = NULL;
+ }
+#endif // SWIG
+
+ // Copy constructor actually copies the data.
+ SafeScopedMessagePtr(const SafeScopedMessagePtr<T> &ptr)
+ : queue_(ptr.queue_),
+ msg_(NULL) {
+ reset(new T(*ptr.get()));
+ }
+#ifndef SWIG
+ // Equal operator copies the data.
+ void operator=(const SafeScopedMessagePtr<T> &ptr) {
+ queue_ = ptr.queue_;
+ reset(new T(*ptr.get()));
+ }
+#endif // SWIG
+
+ private:
+ // Provide access to private constructor.
+ friend class aos::Queue<typename std::remove_const<T>::type>;
+ friend class aos::SafeMessageBuilder<T>;
+
+ // Only Queue should be able to build a message pointer.
+ SafeScopedMessagePtr(RawQueue *queue)
+ : queue_(queue), msg_(new T()) {}
+
+ // Sets the pointer to msg, freeing the old value if it was there.
+ // This is private because nobody should be able to get a pointer to a message
+ // that needs to be scoped without using the queue.
+ void reset(T *msg = NULL) {
+ if (msg_) {
+ delete msg_;
+ }
+ msg_ = msg;
+ }
+
+ // Sets the queue that owns this message.
+ void set_queue(RawQueue *queue) { queue_ = queue; }
+
+ // The queue that the message is a part of.
+ RawQueue *queue_;
+ // The message or NULL.
+ T *msg_;
+};
+
+template <class T>
+void Queue<T>::Init() {
+ if (queue_ == NULL) {
+ queue_ = RawQueue::Fetch(queue_name_, sizeof(T),
+ static_cast<int>(T::kHash),
+ T::kQueueLength);
+ queue_msg_.set_queue(queue_);
+ }
+}
+
+template <class T>
+void Queue<T>::Clear() {
+ if (queue_ != NULL) {
+ queue_msg_.reset();
+ queue_ = NULL;
+ queue_msg_.set_queue(NULL);
+ }
+}
+
+template <class T>
+bool Queue<T>::FetchNext() {
+ Init();
+ // TODO(aschuh): Use RawQueue::ReadMessageIndex so that multiple readers
+ // reading don't randomly get only part of the messages.
+ // Document here the tradoffs that are part of each method.
+ const T *msg = static_cast<const T *>(
+ queue_->ReadMessage(RawQueue::kNonBlock));
+ // Only update the internal pointer if we got a new message.
+ if (msg != NULL) {
+ queue_msg_.reset(msg);
+ }
+ return msg != NULL;
+}
+
+template <class T>
+bool Queue<T>::FetchNextBlocking() {
+ Init();
+ const T *msg = static_cast<const T *>(queue_->ReadMessage(RawQueue::kBlock));
+ queue_msg_.reset(msg);
+ assert (msg != NULL);
+ return true;
+}
+
+template <class T>
+bool Queue<T>::FetchLatest() {
+ Init();
+ const T *msg = static_cast<const T *>(queue_->ReadMessage(
+ RawQueue::kFromEnd | RawQueue::kNonBlock | RawQueue::kPeek));
+ // Only update the internal pointer if we got a new message.
+ if (msg != NULL && msg != queue_msg_.get()) {
+ queue_msg_.reset(msg);
+ return true;
+ }
+ // The message has to get freed if we didn't use it (and RawQueue::FreeMessage
+ // is ok to call on NULL).
+ queue_->FreeMessage(msg);
+ return false;
+}
+
+template <class T>
+SafeScopedMessagePtr<T> Queue<T>::SafeMakeMessage() {
+ Init();
+ SafeScopedMessagePtr<T> safe_msg(queue_);
+ safe_msg->Zero();
+ return safe_msg;
+}
+
+template <class T>
+ScopedMessagePtr<T> Queue<T>::MakeMessage() {
+ Init();
+ return ScopedMessagePtr<T>(queue_, MakeRawMessage());
+}
+
+template <class T>
+T *Queue<T>::MakeRawMessage() {
+ T *ret = static_cast<T *>(queue_->GetMessage());
+ assert(ret != NULL);
+ return ret;
+}
+
+template <class T>
+aos::MessageBuilder<T> Queue<T>::MakeWithBuilder() {
+ Init();
+ return aos::MessageBuilder<T>(queue_, MakeRawMessage());
+}
+
+
+// This builder uses the safe message pointer so that it can be safely copied
+// and used by SWIG or in places where it could be leaked.
+template <class T>
+class SafeMessageBuilder {
+ public:
+ typedef T Message;
+ bool Send();
+};
+
+template <class T>
+aos::SafeMessageBuilder<T> Queue<T>::SafeMakeWithBuilder() {
+ Init();
+ return aos::SafeMessageBuilder<T>(queue_);
+}
+
+
+} // namespace aos
diff --git a/aos/linux_code/starter/netconsole.cc b/aos/linux_code/starter/netconsole.cc
new file mode 100644
index 0000000..e8e76c7
--- /dev/null
+++ b/aos/linux_code/starter/netconsole.cc
@@ -0,0 +1,209 @@
+#include <stdio.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <pthread.h>
+#include <sys/stat.h>
+#include <assert.h>
+
+#include "aos/common/logging/logging_impl.h"
+#include "aos/common/util.h"
+#include "aos/linux_code/configuration.h"
+
+namespace aos {
+namespace {
+
+struct FDsToCopy {
+ const int input;
+ const int output;
+
+ const struct sockaddr_in *const interface_address;
+};
+
+void *FDCopyThread(void *to_copy_in) {
+ FDsToCopy *to_copy = static_cast<FDsToCopy *>(to_copy_in);
+
+ char buffer[32768];
+ ssize_t position = 0;
+ while (true) {
+ assert(position >= 0);
+ assert(position <= static_cast<ssize_t>(sizeof(buffer)));
+ if (position != sizeof(buffer)) {
+ ssize_t read_bytes;
+ bool good_data = true;
+ if (to_copy->interface_address != NULL) {
+ char control_buffer[0x100];
+ struct msghdr header;
+ memset(static_cast<void *>(&header), 0, sizeof(header));
+ header.msg_control = control_buffer;
+ header.msg_controllen = sizeof(control_buffer);
+ struct iovec iovecs[1];
+ iovecs[0].iov_base = buffer + position;
+ iovecs[0].iov_len = position - sizeof(buffer);
+ header.msg_iov = iovecs;
+ header.msg_iovlen = sizeof(iovecs) / sizeof(iovecs[0]);
+ read_bytes = recvmsg(to_copy->input, &header, 0);
+ if (read_bytes != -1) {
+ for (struct cmsghdr *cmsg = CMSG_FIRSTHDR(&header);
+ cmsg != NULL;
+ cmsg = CMSG_NXTHDR(&header, cmsg)) {
+ if (cmsg->cmsg_level == IPPROTO_IP &&
+ cmsg->cmsg_type == IP_PKTINFO) {
+ unsigned char *data = CMSG_DATA(cmsg);
+ struct in_pktinfo *pktinfo;
+ memcpy(&pktinfo, &data, sizeof(void *));
+ good_data = pktinfo->ipi_spec_dst.s_addr ==
+ to_copy->interface_address->sin_addr.s_addr;
+ }
+ }
+ }
+ } else {
+ read_bytes = read(to_copy->input,
+ buffer + position, position - sizeof(buffer));
+ }
+ if (read_bytes == -1) {
+ if (errno != EINTR) {
+ LOG(FATAL, "read(%d, %p, %zd) failed with %d: %s\n",
+ to_copy->input, buffer + position, position - sizeof(buffer),
+ errno, strerror(errno));
+ }
+ } else if (read_bytes == 0 && to_copy->interface_address == NULL) {
+ // read(2) says that this means EOF
+ return NULL;
+ }
+ if (good_data) {
+ position += read_bytes;
+ }
+ }
+
+ assert(position >= 0);
+ assert(position <= static_cast<ssize_t>(sizeof(buffer)));
+ if (position > 0) {
+ ssize_t sent_bytes = write(to_copy->output, buffer, position);
+ if (sent_bytes == -1) {
+ if (errno != EINTR) {
+ LOG(FATAL, "write(%d, %p, %zd) failed with %d: %s\n",
+ to_copy->output, buffer, position, errno, strerror(errno));
+ }
+ } else if (sent_bytes != 0) {
+ if (sent_bytes == position) {
+ position = 0;
+ } else {
+ memmove(buffer, buffer + sent_bytes, position - sent_bytes);
+ position -= sent_bytes;
+ }
+ }
+ }
+ }
+}
+
+int NetconsoleMain(int argc, char **argv) {
+ logging::Init();
+
+ int input, output;
+ if (argc > 1) {
+ output = open(argv[1], O_APPEND | O_CREAT | O_WRONLY | O_TRUNC, 0666);
+ if (output == -1) {
+ if (errno == EACCES || errno == ELOOP || errno == ENOSPC ||
+ errno == ENOTDIR || errno == EROFS || errno == ETXTBSY) {
+ fprintf(stderr, "Opening output file '%s' failed because of %s.\n",
+ argv[1], strerror(errno));
+ exit(EXIT_FAILURE);
+ }
+ LOG(FATAL, "open('%s', stuff, 0644) failed with %d: %s\n", argv[1],
+ errno, strerror(errno));
+ }
+ fprintf(stderr, "Writing output to '%s'.\n", argv[1]);
+ input = -1;
+ fprintf(stderr, "Not taking any input.\n");
+ } else {
+ output = STDOUT_FILENO;
+ fprintf(stderr, "Writing output to stdout.\n");
+ input = STDIN_FILENO;
+ fprintf(stderr, "Reading stdin.\n");
+ }
+
+ int on = 1;
+
+ int from_crio = socket(AF_INET, SOCK_DGRAM, 0);
+ if (from_crio == -1) {
+ LOG(FATAL, "socket(AF_INET, SOCK_DGRAM, 0) failed with %d: %s\n",
+ errno, strerror(errno));
+ }
+ if (setsockopt(from_crio, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) == -1) {
+ LOG(FATAL, "SOL_SOCKET::SO_REUSEADDR=%d(%d) failed with %d: %s\n",
+ on, from_crio, errno, strerror(errno));
+ }
+ if (setsockopt(from_crio, SOL_SOCKET, SO_BROADCAST, &on, sizeof(on)) == -1) {
+ LOG(FATAL, "SOL_SOCKET::SO_BROADCAST=%d(%d) failed with %d: %s\n",
+ on, from_crio, errno, strerror(errno));
+ }
+ if (setsockopt(from_crio, IPPROTO_IP, IP_PKTINFO, &on, sizeof(on)) == -1) {
+ LOG(FATAL, "IPROTO_IP::IP_PKTINFO=%d(%d) failed with %d: %s\n",
+ on, from_crio, errno, strerror(errno));
+ }
+ union {
+ struct sockaddr_in in;
+ struct sockaddr addr;
+ } address;
+ address.in.sin_family = AF_INET;
+ address.in.sin_port = htons(6666);
+ address.in.sin_addr.s_addr = INADDR_ANY;
+ if (bind(from_crio, &address.addr, sizeof(address)) == -1) {
+ LOG(FATAL, "bind(%d, %p, %zu) failed with %d: %s\n",
+ from_crio, &address.addr, sizeof(address), errno, strerror(errno));
+ }
+
+ pthread_t input_thread, output_thread;
+
+ address.in.sin_addr = ::aos::configuration::GetOwnIPAddress();
+ ::aos::util::SetLastSegment(&address.in.sin_addr, NetworkAddress::kCRIO);
+ fprintf(stderr, "Using cRIO IP %s.\n",
+ inet_ntoa(address.in.sin_addr));
+
+ if (input != -1) {
+ int to_crio = socket(AF_INET, SOCK_DGRAM, 0);
+ if (to_crio == -1) {
+ LOG(FATAL, "socket(AF_INET, SOCK_DGRAM, 0) failed with %d: %s\n",
+ errno, strerror(errno));
+ }
+ if (setsockopt(to_crio, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) == -1) {
+ LOG(FATAL, "SOL_SOCKET::SO_REUSEADDR=%d(%d) failed with %d: %s\n",
+ on, to_crio, errno, strerror(errno));
+ }
+ address.in.sin_port = htons(6668);
+ if (connect(to_crio, &address.addr, sizeof(address)) == -1) {
+ LOG(FATAL, "connect(%d, %p, %zu) failed with %d: %s\n",
+ to_crio, &address.addr, sizeof(address), errno, strerror(errno));
+ }
+ FDsToCopy input_fds{input, to_crio, NULL};
+ if (pthread_create(&input_thread, NULL, FDCopyThread, &input_fds) == -1) {
+ LOG(FATAL, "pthread_create(%p, NULL, %p, %p) failed with %d: %s\n",
+ &input_thread, FDCopyThread, &input_fds, errno, strerror(errno));
+ }
+ }
+
+ address.in.sin_addr = ::aos::configuration::GetOwnIPAddress();
+ FDsToCopy output_fds{from_crio, output, &address.in};
+ if (pthread_create(&output_thread, NULL, FDCopyThread, &output_fds) == -1) {
+ LOG(FATAL, "pthread_create(%p, NULL, %p, %p) failed with %d: %s\n",
+ &output_thread, FDCopyThread, &output_fds, errno, strerror(errno));
+ }
+
+ // input_thread will finish when stdin gets an EOF
+ if (pthread_join((input == -1) ? output_thread : input_thread, NULL) == -1) {
+ LOG(FATAL, "pthread_join(a_thread, NULL) failed with %d: %s\n",
+ errno, strerror(errno));
+ }
+ exit(EXIT_SUCCESS);
+}
+
+} // namespace
+} // namespace aos
+
+int main(int argc, char **argv) {
+ return ::aos::NetconsoleMain(argc, argv);
+}
diff --git a/aos/linux_code/starter/starter.cc b/aos/linux_code/starter/starter.cc
new file mode 100644
index 0000000..187acc3
--- /dev/null
+++ b/aos/linux_code/starter/starter.cc
@@ -0,0 +1,786 @@
+#include <stdio.h>
+#include <stdlib.h>
+#include <sys/types.h>
+#include <fcntl.h>
+#include <sys/inotify.h>
+#include <sys/stat.h>
+#include <sys/ioctl.h>
+#include <assert.h>
+#include <signal.h>
+#include <stdint.h>
+#include <errno.h>
+#include <string.h>
+#include <sys/wait.h>
+#include <inttypes.h>
+
+#include <map>
+#include <functional>
+#include <deque>
+#include <fstream>
+#include <queue>
+#include <list>
+#include <string>
+#include <vector>
+#include <memory>
+
+#include <event2/event.h>
+
+#include "aos/common/logging/logging.h"
+#include "aos/common/logging/logging_impl.h"
+#include "aos/linux_code/init.h"
+#include "aos/common/unique_malloc_ptr.h"
+#include "aos/common/time.h"
+#include "aos/common/once.h"
+
+// This is the main piece of code that starts all of the rest of the code and
+// restarts it when the binaries are modified.
+//
+// Throughout, the code is not terribly concerned with thread safety because
+// there is only 1 thread. It does some setup and then lets inotify run things
+// when appropriate.
+//
+// NOTE: This program should never exit nicely. It catches all nice attempts to
+// exit, forwards them to all of the children that it has started, waits for
+// them to exit nicely, and then SIGKILLs anybody left (which will always
+// include itself).
+
+using ::std::unique_ptr;
+
+namespace aos {
+namespace starter {
+
+// TODO(brians): split out the c++ libevent wrapper stuff into its own file(s)
+class EventBaseDeleter {
+ public:
+ void operator()(event_base *base) {
+ if (base == NULL) return;
+ event_base_free(base);
+ }
+};
+typedef unique_ptr<event_base, EventBaseDeleter> EventBaseUniquePtr;
+EventBaseUniquePtr libevent_base;
+
+class EventDeleter {
+ public:
+ void operator()(event *evt) {
+ if (evt == NULL) return;
+ if (event_del(evt) != 0) {
+ LOG(WARNING, "event_del(%p) failed\n", evt);
+ }
+ }
+};
+typedef unique_ptr<event, EventDeleter> EventUniquePtr;
+
+// Watches a file path for modifications. Once created, keeps watching until
+// destroyed or RemoveWatch() is called.
+// TODO(brians): split this out into its own file + tests
+class FileWatch {
+ public:
+ // Will call callback(value) when filename is modified.
+ // If value is NULL, then a pointer to this object will be passed instead.
+ //
+ // Watching for file creations is slightly different. To do that, pass true
+ // as create, the directory where the file will be created for filename, and
+ // the name of the file (without directory name) for check_filename.
+ FileWatch(std::string filename,
+ std::function<void(void *)> callback,
+ void *value,
+ bool create = false,
+ std::string check_filename = "")
+ : filename_(filename),
+ callback_(callback),
+ value_(value),
+ create_(create),
+ check_filename_(check_filename),
+ watch_(-1) {
+ init_once.Get();
+
+ CreateWatch();
+ }
+ // Cleans up everything.
+ ~FileWatch() {
+ if (watch_ != -1) {
+ RemoveWatch();
+ }
+ }
+
+ // After calling this method, this object won't really be doing much of
+ // anything besides possibly running its callback or something.
+ void RemoveWatch() {
+ assert(watch_ != -1);
+
+ if (inotify_rm_watch(notify_fd, watch_) == -1) {
+ LOG(WARNING, "inotify_rm_watch(%d, %d) failed with %d: %s\n",
+ notify_fd, watch_, errno, strerror(errno));
+ }
+
+ RemoveWatchFromMap();
+ }
+
+ private:
+ // Performs the static initialization. Called by init_once from the
+ // constructor.
+ static void *Init() {
+ notify_fd = inotify_init1(IN_CLOEXEC);
+ EventUniquePtr notify_event(event_new(libevent_base.get(), notify_fd,
+ EV_READ | EV_PERSIST,
+ FileWatch::INotifyReadable, NULL));
+ event_add(notify_event.release(), NULL);
+ return NULL;
+ }
+
+ void RemoveWatchFromMap() {
+ if (watchers[watch_] != this) {
+ LOG(WARNING, "watcher for %s (%p) didn't find itself in the map\n",
+ filename_.c_str(), this);
+ } else {
+ watchers.erase(watch_);
+ }
+ LOG(DEBUG, "removed watch ID %d\n", watch_);
+ watch_ = -1;
+ }
+
+ void CreateWatch() {
+ assert(watch_ == -1);
+ watch_ = inotify_add_watch(notify_fd, filename_.c_str(),
+ create_ ? IN_CREATE : (IN_ATTRIB |
+ IN_MODIFY |
+ IN_DELETE_SELF |
+ IN_MOVE_SELF));
+ if (watch_ == -1) {
+ LOG(FATAL, "inotify_add_watch(%d, %s,"
+ " %s ? IN_CREATE : (IN_ATTRIB | IN_MODIFY)) failed with %d: %s\n",
+ notify_fd, filename_.c_str(), create_ ? "true" : "false",
+ errno, strerror(errno));
+ }
+ watchers[watch_] = this;
+ LOG(DEBUG, "watch for %s is %d\n", filename_.c_str(), watch_);
+ }
+
+ // This gets set up as the callback for EV_READ on the inotify file
+ // descriptor. It calls FileNotified on the appropriate instance.
+ static void INotifyReadable(int /*fd*/, short /*events*/, void *) {
+ unsigned int to_read;
+ // Use FIONREAD to figure out how many bytes there are to read.
+ if (ioctl(notify_fd, FIONREAD, &to_read) < 0) {
+ LOG(FATAL, "FIONREAD(%d, %p) failed with %d: %s\n",
+ notify_fd, &to_read, errno, strerror(errno));
+ }
+ inotify_event *notifyevt = static_cast<inotify_event *>(malloc(to_read));
+ const char *end = reinterpret_cast<char *>(notifyevt) + to_read;
+ aos::unique_c_ptr<inotify_event> freer(notifyevt);
+
+ ssize_t ret = read(notify_fd, notifyevt, to_read);
+ if (ret < 0) {
+ LOG(FATAL, "read(%d, %p, %u) failed with %d: %s\n",
+ notify_fd, notifyevt, to_read, errno, strerror(errno));
+ }
+ if (static_cast<size_t>(ret) != to_read) {
+ LOG(ERROR, "read(%d, %p, %u) returned %zd instead of %u\n",
+ notify_fd, notifyevt, to_read, ret, to_read);
+ return;
+ }
+
+ // Keep looping through until we get to the end because inotify does return
+ // multiple events at once.
+ while (reinterpret_cast<char *>(notifyevt) < end) {
+ if (watchers.count(notifyevt->wd) != 1) {
+ LOG(WARNING, "couldn't find whose watch ID %d is\n", notifyevt->wd);
+ } else {
+ LOG(DEBUG, "mask=%" PRIu32 "\n", notifyevt->mask);
+ // If it was something that means the file got deleted.
+ if (notifyevt->mask & (IN_MOVE_SELF | IN_DELETE_SELF | IN_IGNORED)) {
+ watchers[notifyevt->wd]->WatchDeleted();
+ } else {
+ watchers[notifyevt->wd]->FileNotified((notifyevt->len > 0) ?
+ notifyevt->name : NULL);
+ }
+ }
+
+ notifyevt = reinterpret_cast<inotify_event *>(
+ __builtin_assume_aligned(reinterpret_cast<char *>(notifyevt) +
+ sizeof(*notifyevt) + notifyevt->len,
+ alignof(notifyevt)));
+ }
+ }
+
+ // INotifyReadable calls this method whenever the watch for our file gets
+ // removed somehow.
+ void WatchDeleted() {
+ LOG(DEBUG, "watch for %s deleted\n", filename_.c_str());
+ RemoveWatchFromMap();
+ CreateWatch();
+ }
+
+ // INotifyReadable calls this method whenever the watch for our file triggers.
+ void FileNotified(const char *filename) {
+ assert(watch_ != -1);
+ LOG(DEBUG, "got a notification for %s\n", filename_.c_str());
+
+ if (!check_filename_.empty()) {
+ if (filename == NULL) {
+ return;
+ }
+ if (std::string(filename) != check_filename_) {
+ return;
+ }
+ }
+
+ callback_((value_ == NULL) ? this : value_);
+ }
+
+ // To make sure that Init gets called exactly once.
+ static ::aos::Once<void> init_once;
+
+ const std::string filename_;
+ const std::function<void(void *)> callback_;
+ void *const value_;
+ const bool create_;
+ std::string check_filename_;
+
+ // The watch descriptor or -1 if we don't have one any more.
+ int watch_;
+
+ // Map from watch IDs to instances of this class.
+ // <https://patchwork.kernel.org/patch/73192/> ("inotify: do not reuse watch
+ // descriptors") says they won't get reused, but that shouldn't be counted on
+ // because we might have a modified/different version/whatever kernel.
+ static std::map<int, FileWatch *> watchers;
+ // The inotify(7) file descriptor.
+ static int notify_fd;
+
+ DISALLOW_COPY_AND_ASSIGN(FileWatch);
+};
+::aos::Once<void> FileWatch::init_once(FileWatch::Init);
+std::map<int, FileWatch *> FileWatch::watchers;
+int FileWatch::notify_fd;
+
+// Runs the given command and returns its first line of output (not including
+// the \n). LOG(FATAL)s if the command has an exit status other than 0 or does
+// not print out an entire line.
+std::string RunCommand(std::string command) {
+ // popen(3) might fail and not set it.
+ errno = 0;
+ FILE *pipe = popen(command.c_str(), "r");
+ if (pipe == NULL) {
+ LOG(FATAL, "popen(\"%s\", \"r\") failed with %d: %s\n",
+ command.c_str(), errno, strerror(errno));
+ }
+
+ // result_size is how many bytes result is currently allocated to.
+ size_t result_size = 128, read = 0;
+ unique_c_ptr<char> result(static_cast<char *>(malloc(result_size)));
+ while (true) {
+ // If we filled up the buffer, then realloc(3) it bigger.
+ if (read == result_size) {
+ result_size *= 2;
+ void *new_result = realloc(result.get(), result_size);
+ if (new_result == NULL) {
+ LOG(FATAL, "realloc(%p, %zd) failed because of %d: %s\n",
+ result.get(), result_size, errno, strerror(errno));
+ } else {
+ result.release();
+ result = unique_c_ptr<char>(static_cast<char *>(new_result));
+ }
+ }
+
+ size_t ret = fread(result.get() + read, 1, result_size - read, pipe);
+ // If the read didn't fill up the whole buffer, check to see if it was
+ // because of an error.
+ if (ret < result_size - read) {
+ if (ferror(pipe)) {
+ LOG(FATAL, "couldn't finish reading output of \"%s\"\n",
+ command.c_str());
+ }
+ }
+ read += ret;
+ if (read > 0 && result.get()[read - 1] == '\n') {
+ break;
+ }
+
+ if (feof(pipe)) {
+ LOG(FATAL, "`%s` failed. didn't print a whole line\n", command.c_str());
+ }
+ }
+
+ // Get rid of the first \n and anything after it.
+ *strchrnul(result.get(), '\n') = '\0';
+
+ int child_status = pclose(pipe);
+ if (child_status == -1) {
+ LOG(FATAL, "pclose(%p) failed with %d: %s\n", pipe,
+ errno, strerror(errno));
+ }
+
+ if (child_status != 0) {
+ LOG(FATAL, "`%s` failed. return %d\n", command.c_str(), child_status);
+ }
+
+ return std::string(result.get());
+}
+
+// Will call callback(arg) after time.
+void Timeout(time::Time time, void (*callback)(int, short, void *), void *arg) {
+ EventUniquePtr timeout(evtimer_new(libevent_base.get(), callback, arg));
+ struct timeval time_timeval = time.ToTimeval();
+ evtimer_add(timeout.release(), &time_timeval);
+}
+
+// Represents a child process. It will take care of restarting itself etc.
+class Child {
+ public:
+ // command is the (space-separated) command to run and its arguments.
+ Child(const std::string &command) : pid_(-1),
+ restart_timeout_(
+ evtimer_new(libevent_base.get(), StaticDoRestart, this)),
+ stat_at_start_valid_(false) {
+ const char *start, *end;
+ start = command.c_str();
+ while (true) {
+ end = strchrnul(start, ' ');
+ args_.push_back(std::string(start, end - start));
+ start = end + 1;
+ if (*end == '\0') {
+ break;
+ }
+ }
+
+ original_binary_ = RunCommand("which " + args_[0]);
+ binary_ = original_binary_ + ".stm";
+
+ watcher_ = unique_ptr<FileWatch>(
+ new FileWatch(original_binary_, StaticFileModified, this));
+
+ Start();
+ }
+
+ pid_t pid() { return pid_; }
+
+ // This gets called whenever the actual process dies and should (probably) be
+ // restarted.
+ void ProcessDied() {
+ pid_ = -1;
+ restarts_.push(time::Time::Now());
+ if (restarts_.size() > kMaxRestartsNumber) {
+ time::Time oldest = restarts_.front();
+ restarts_.pop();
+ if ((time::Time::Now() - oldest) <= kMaxRestartsTime) {
+ LOG(WARNING, "process %s getting restarted too often\n", name());
+ Timeout(kResumeWait, StaticStart, this);
+ return;
+ }
+ }
+ Start();
+ }
+
+ // Returns a name for logging purposes.
+ const char *name() {
+ return args_[0].c_str();
+ }
+
+ private:
+ struct CheckDiedStatus {
+ Child *self;
+ pid_t old_pid;
+ };
+
+ // How long to wait for a child to die nicely.
+ static constexpr time::Time kProcessDieTime = time::Time::InSeconds(0.75);
+
+ // How long to wait after the file is modified to restart it.
+ // This is important because some programs like modifying the binaries by
+ // writing them in little bits, which results in attempting to start partial
+ // binaries without this.
+ static constexpr time::Time kRestartWaitTime = time::Time::InSeconds(1.5);
+
+ // Only kMaxRestartsNumber restarts will be allowed in kMaxRestartsTime.
+ static constexpr time::Time kMaxRestartsTime = time::Time::InSeconds(4);
+ static const size_t kMaxRestartsNumber = 3;
+ // How long to wait if it gets restarted too many times.
+ static constexpr time::Time kResumeWait = time::Time::InSeconds(5);
+
+ static void StaticFileModified(void *self) {
+ static_cast<Child *>(self)->FileModified();
+ }
+
+ void FileModified() {
+ LOG(DEBUG, "file for %s modified\n", name());
+ struct timeval restart_time_timeval = kRestartWaitTime.ToTimeval();
+ // This will reset the timeout again if it hasn't run yet.
+ if (evtimer_add(restart_timeout_.get(), &restart_time_timeval) != 0) {
+ LOG(FATAL, "evtimer_add(%p, %p) failed\n",
+ restart_timeout_.get(), &restart_time_timeval);
+ }
+ }
+
+ static void StaticDoRestart(int, short, void *self) {
+ static_cast<Child *>(self)->DoRestart();
+ }
+
+ // Called after somebody else has finished modifying the file.
+ void DoRestart() {
+ if (stat_at_start_valid_) {
+ struct stat current_stat;
+ if (stat(original_binary_.c_str(), ¤t_stat) == -1) {
+ LOG(FATAL, "stat(%s, %p) failed with %d: %s\n",
+ original_binary_.c_str(), ¤t_stat, errno, strerror(errno));
+ }
+ if (current_stat.st_mtime == stat_at_start_.st_mtime) {
+ LOG(DEBUG, "ignoring trigger for %s because mtime didn't change\n",
+ name());
+ return;
+ }
+ }
+
+ if (pid_ != -1) {
+ LOG(DEBUG, "sending SIGTERM to child %d to restart it\n", pid_);
+ if (kill(pid_, SIGTERM) == -1) {
+ LOG(WARNING, "kill(%d, SIGTERM) failed with %d: %s\n",
+ pid_, errno, strerror(errno));
+ }
+ CheckDiedStatus *status = new CheckDiedStatus();
+ status->self = this;
+ status->old_pid = pid_;
+ Timeout(kProcessDieTime, StaticCheckDied, status);
+ } else {
+ LOG(WARNING, "%s restart attempted but not running\n", name());
+ }
+ }
+
+ static void StaticCheckDied(int, short, void *status_in) {
+ CheckDiedStatus *status = static_cast<CheckDiedStatus *>(status_in);
+ status->self->CheckDied(status->old_pid);
+ delete status;
+ }
+
+ // Checks to see if the child using the PID old_pid is still running.
+ void CheckDied(pid_t old_pid) {
+ if (pid_ == old_pid) {
+ LOG(WARNING, "child %d refused to die\n", old_pid);
+ if (kill(old_pid, SIGKILL) == -1) {
+ LOG(WARNING, "kill(%d, SIGKILL) failed with %d: %s\n",
+ old_pid, errno, strerror(errno));
+ }
+ }
+ }
+
+ static void StaticStart(int, short, void *self) {
+ static_cast<Child *>(self)->Start();
+ }
+
+ // Actually starts the child.
+ void Start() {
+ if (pid_ != -1) {
+ LOG(WARNING, "calling Start() but already have child %d running\n",
+ pid_);
+ if (kill(pid_, SIGKILL) == -1) {
+ LOG(WARNING, "kill(%d, SIGKILL) failed with %d: %s\n",
+ pid_, errno, strerror(errno));
+ return;
+ }
+ pid_ = -1;
+ }
+
+ // Remove the name that we run from (ie from a previous execution) and then
+ // hard link the real filename to it.
+ if (unlink(binary_.c_str()) != 0 && errno != ENOENT) {
+ LOG(FATAL, "removing %s failed because of %d: %s\n",
+ binary_.c_str(), errno, strerror(errno));
+ }
+ if (link(original_binary_.c_str(), binary_.c_str()) != 0) {
+ LOG(FATAL, "link('%s', '%s') failed because of %d: %s\n",
+ original_binary_.c_str(), binary_.c_str(), errno, strerror(errno));
+ }
+
+ if (stat(original_binary_.c_str(), &stat_at_start_) == -1) {
+ LOG(FATAL, "stat(%s, %p) failed with %d: %s\n",
+ original_binary_.c_str(), &stat_at_start_, errno, strerror(errno));
+ }
+ stat_at_start_valid_ = true;
+
+ if ((pid_ = fork()) == 0) {
+ ssize_t args_size = args_.size();
+ const char **argv = new const char *[args_size + 1];
+ for (int i = 0; i < args_size; ++i) {
+ argv[i] = args_[i].c_str();
+ }
+ argv[args_size] = NULL;
+ // The const_cast is safe because no code that might care if it gets
+ // modified can run afterwards.
+ execv(binary_.c_str(), const_cast<char **>(argv));
+ LOG(FATAL, "execv(%s, %p) failed with %d: %s\n",
+ binary_.c_str(), argv, errno, strerror(errno));
+ _exit(EXIT_FAILURE);
+ }
+ if (pid_ == -1) {
+ LOG(FATAL, "forking to run \"%s\" failed with %d: %s\n",
+ binary_.c_str(), errno, strerror(errno));
+ }
+ LOG(DEBUG, "started \"%s\" successfully\n", binary_.c_str());
+ }
+
+ // A history of the times that this process has been restarted.
+ std::queue<time::Time, std::list<time::Time>> restarts_;
+
+ // The currently running child's PID or NULL.
+ pid_t pid_;
+
+ // All of the arguments (including the name of the binary).
+ std::deque<std::string> args_;
+
+ // The name of the real binary that we were told to run.
+ std::string original_binary_;
+ // The name of the file that we're actually running.
+ std::string binary_;
+
+ // Watches original_binary_.
+ unique_ptr<FileWatch> watcher_;
+
+ // An event that restarts after kRestartWaitTime.
+ EventUniquePtr restart_timeout_;
+
+ // Captured from the original file when we most recently started a new child
+ // process. Used to see if it actually changes or not.
+ struct stat stat_at_start_;
+ bool stat_at_start_valid_;
+
+ DISALLOW_COPY_AND_ASSIGN(Child);
+};
+
+constexpr time::Time Child::kProcessDieTime;
+constexpr time::Time Child::kRestartWaitTime;
+constexpr time::Time Child::kMaxRestartsTime;
+constexpr time::Time Child::kResumeWait;
+
+// This is where all of the Child instances except core live.
+std::vector<unique_ptr<Child>> children;
+// A global place to hold on to which child is core.
+unique_ptr<Child> core;
+
+// Kills off the entire process group (including ourself).
+void KillChildren(bool try_nice) {
+ if (try_nice) {
+ static const int kNiceStopSignal = SIGTERM;
+ static const time::Time kNiceWaitTime = time::Time::InSeconds(1);
+
+ // Make sure that we don't just nicely stop ourself...
+ sigset_t mask;
+ sigemptyset(&mask);
+ sigaddset(&mask, kNiceStopSignal);
+ sigprocmask(SIG_BLOCK, &mask, NULL);
+
+ kill(-getpid(), kNiceStopSignal);
+
+ fflush(NULL);
+ time::SleepFor(kNiceWaitTime);
+ }
+
+ // Send SIGKILL to our whole process group, which will forcibly terminate any
+ // of them that are still running (us for sure, maybe more too).
+ kill(-getpid(), SIGKILL);
+}
+
+void ExitHandler() {
+ KillChildren(true);
+}
+
+void KillChildrenSignalHandler(int signum) {
+ // If we get SIGSEGV or some other random signal who knows what's happening
+ // and we should just kill everybody immediately.
+ // This is a list of all of the signals that mean some form of "nicely stop".
+ KillChildren(signum == SIGHUP || signum == SIGINT || signum == SIGQUIT ||
+ signum == SIGABRT || signum == SIGPIPE || signum == SIGTERM ||
+ signum == SIGXCPU);
+}
+
+// Returns the currently running child with PID pid or an empty unique_ptr.
+const unique_ptr<Child> &FindChild(pid_t pid) {
+ for (auto it = children.begin(); it != children.end(); ++it) {
+ if (pid == (*it)->pid()) {
+ return *it;
+ }
+ }
+
+ if (pid == core->pid()) {
+ return core;
+ }
+
+ static const unique_ptr<Child> kNothing;
+ return kNothing;
+}
+
+// Gets set up as a libevent handler for SIGCHLD.
+// Handles calling Child::ProcessDied() on the appropriate one.
+void SigCHLDReceived(int /*fd*/, short /*events*/, void *) {
+ // In a while loop in case we miss any SIGCHLDs.
+ while (true) {
+ siginfo_t infop;
+ infop.si_pid = 0;
+ if (waitid(P_ALL, 0, &infop, WEXITED | WSTOPPED | WNOHANG) != 0) {
+ LOG(WARNING, "waitid failed with %d: %s", errno, strerror(errno));
+ continue;
+ }
+ // If there are no more child process deaths to process.
+ if (infop.si_pid == 0) {
+ return;
+ }
+
+ pid_t pid = infop.si_pid;
+ int status = infop.si_status;
+ const unique_ptr<Child> &child = FindChild(pid);
+ if (child) {
+ switch (infop.si_code) {
+ case CLD_EXITED:
+ LOG(WARNING, "child %d (%s) exited with status %d\n",
+ pid, child->name(), status);
+ break;
+ case CLD_DUMPED:
+ LOG(INFO, "child %d actually dumped core. "
+ "falling through to killed by signal case\n", pid);
+ case CLD_KILLED:
+ // If somebody (possibly us) sent it SIGTERM that means that they just
+ // want it to stop, so it stopping isn't a WARNING.
+ LOG((status == SIGTERM) ? DEBUG : WARNING,
+ "child %d (%s) was killed by signal %d (%s)\n",
+ pid, child->name(), status,
+ strsignal(status));
+ break;
+ case CLD_STOPPED:
+ LOG(WARNING, "child %d (%s) was stopped by signal %d "
+ "(giving it a SIGCONT(%d))\n",
+ pid, child->name(), status, SIGCONT);
+ kill(pid, SIGCONT);
+ continue;
+ default:
+ LOG(WARNING, "something happened to child %d (%s) (killing it)\n",
+ pid, child->name());
+ kill(pid, SIGKILL);
+ continue;
+ }
+ } else {
+ LOG(WARNING, "couldn't find a Child for pid %d\n", pid);
+ return;
+ }
+
+ if (child == core) {
+ LOG(FATAL, "core died\n");
+ }
+ child->ProcessDied();
+ }
+}
+
+// This is used for communicating the name of the file to read processes to
+// start from main to Run.
+const char *child_list_file;
+
+void Run(void *watch);
+void Main() {
+ logging::Init();
+ // TODO(brians): tell logging that using the root logger from here until we
+ // bring up shm is ok
+
+ if (setpgid(0 /*self*/, 0 /*make PGID the same as PID*/) != 0) {
+ LOG(FATAL, "setpgid(0, 0) failed with %d: %s\n", errno, strerror(errno));
+ }
+
+ // Make sure that we kill all children when we exit.
+ atexit(ExitHandler);
+ // Do it on some signals too (ones that we otherwise tend to receive and then
+ // leave all of our children going).
+ signal(SIGHUP, KillChildrenSignalHandler);
+ signal(SIGINT, KillChildrenSignalHandler);
+ signal(SIGQUIT, KillChildrenSignalHandler);
+ signal(SIGILL, KillChildrenSignalHandler);
+ signal(SIGABRT, KillChildrenSignalHandler);
+ signal(SIGFPE, KillChildrenSignalHandler);
+ signal(SIGSEGV, KillChildrenSignalHandler);
+ signal(SIGPIPE, KillChildrenSignalHandler);
+ signal(SIGTERM, KillChildrenSignalHandler);
+ signal(SIGBUS, KillChildrenSignalHandler);
+ signal(SIGXCPU, KillChildrenSignalHandler);
+
+ libevent_base = EventBaseUniquePtr(event_base_new());
+
+ std::string core_touch_file = "/tmp/starter.";
+ core_touch_file += std::to_string(static_cast<intmax_t>(getpid()));
+ core_touch_file += ".core_touch_file";
+ if (system(("touch '" + core_touch_file + "'").c_str()) != 0) {
+ LOG(FATAL, "running `touch '%s'` failed\n", core_touch_file.c_str());
+ }
+ FileWatch core_touch_file_watch(core_touch_file, Run, NULL);
+ core = unique_ptr<Child>(
+ new Child("core " + core_touch_file));
+
+ FILE *pid_file = fopen("/tmp/starter.pid", "w");
+ if (pid_file == NULL) {
+ LOG(FATAL, "fopen(\"/tmp/starter.pid\", \"w\") failed with %d: %s\n",
+ errno, strerror(errno));
+ } else {
+ if (fprintf(pid_file, "%d", core->pid()) == -1) {
+ LOG(WARNING, "fprintf(%p, \"%%d\", %d) failed with %d: %s\n",
+ pid_file, core->pid(), errno, strerror(errno));
+ }
+ fclose(pid_file);
+ }
+
+ LOG(INFO, "waiting for %s to appear\n", core_touch_file.c_str());
+
+ event_base_dispatch(libevent_base.get());
+ LOG(FATAL, "event_base_dispatch(%p) returned\n", libevent_base.get());
+}
+
+// This is the callback for when core creates the file indicating that it has
+// started.
+void Run(void *watch) {
+ // Make it so it doesn't keep on seeing random changes in /tmp.
+ static_cast<FileWatch *>(watch)->RemoveWatch();
+
+ // It's safe now because core is up.
+ aos::InitNRT();
+
+ std::ifstream list_file(child_list_file);
+
+ while (true) {
+ std::string child_name;
+ getline(list_file, child_name);
+ if ((list_file.rdstate() & std::ios_base::eofbit) != 0) {
+ break;
+ }
+ if (list_file.rdstate() != 0) {
+ LOG(FATAL, "reading input file %s failed\n", child_list_file);
+ }
+ children.push_back(unique_ptr<Child>(new Child(child_name)));
+ }
+
+ EventUniquePtr sigchld(event_new(libevent_base.get(), SIGCHLD,
+ EV_SIGNAL | EV_PERSIST,
+ SigCHLDReceived, NULL));
+ event_add(sigchld.release(), NULL);
+}
+
+const char *kArgsHelp = "[OPTION]... START_LIST\n"
+ "Start all of the robot code binaries in START_LIST.\n"
+ "\n"
+ "START_LIST is the file to read binaries (looked up on PATH) to run.\n"
+ " --help display this help and exit\n";
+void PrintHelp() {
+ fprintf(stderr, "Usage: %s %s", program_invocation_name, kArgsHelp);
+}
+
+} // namespace starter
+} // namespace aos
+
+int main(int argc, char *argv[]) {
+ if (argc != 2) {
+ aos::starter::PrintHelp();
+ exit(EXIT_FAILURE);
+ }
+ if (strcmp(argv[1], "--help") == 0) {
+ aos::starter::PrintHelp();
+ exit(EXIT_SUCCESS);
+ }
+
+ aos::starter::child_list_file = argv[1];
+
+ aos::starter::Main();
+}
diff --git a/aos/linux_code/starter/starter.gyp b/aos/linux_code/starter/starter.gyp
new file mode 100644
index 0000000..2419b33
--- /dev/null
+++ b/aos/linux_code/starter/starter.gyp
@@ -0,0 +1,39 @@
+{
+ 'targets': [
+ {
+ 'target_name': 'netconsole',
+ 'type': 'executable',
+ 'sources': [
+ 'netconsole.cc',
+ ],
+ 'dependencies': [
+ '<(AOS)/build/aos.gyp:logging',
+ '<(AOS)/linux_code/linux_code.gyp:configuration',
+ '<(AOS)/common/common.gyp:util',
+ ],
+ },
+ {
+ 'target_name': 'starter_exe',
+ 'type': 'executable',
+ 'sources': [
+ 'starter.cc',
+ ],
+ 'dependencies': [
+ '<(AOS)/linux_code/linux_code.gyp:init',
+ '<(EXTERNALS):libevent',
+ '<(AOS)/build/aos.gyp:logging',
+ '<(AOS)/common/common.gyp:once',
+ '<(AOS)/common/common.gyp:time',
+ ],
+ 'copies': [
+ {
+ 'destination': '<(rsync_dir)',
+ 'files': [
+ 'starter.sh',
+ 'starter_loop.sh',
+ ],
+ },
+ ],
+ },
+ ],
+}
diff --git a/aos/linux_code/starter/starter.sh b/aos/linux_code/starter/starter.sh
new file mode 100755
index 0000000..33906c3
--- /dev/null
+++ b/aos/linux_code/starter/starter.sh
@@ -0,0 +1,20 @@
+#!/bin/bash
+
+echo '/home/driver/tmp/robot_logs/%e-%s-%p-%t.coredump' > /proc/sys/kernel/core_pattern
+
+#echo $$ > /var/run/`basename $0`.pid IT FORKS AFTER THIS!!!!
+#insmod /home/driver/robot_code/bin/aos_module.ko
+#chrt -p 45 `pidof sshd`
+chrt -o 0 bash -c "export PATH=$PATH:/home/driver/robot_code/bin; starter_loop.sh $*" &
+#chrt -o 0 bash -c "while true; do cd /home/driver/mjpg-streamer2; ./server.sh; sleep 5; done" &
+
+# Log everything from the serial port...
+#SERIAL_LOG_FILE=$(date "/home/driver/tmp/robot_logs/serial_log.%F_%H-%M-%S")
+#chrt -o 0 bash -c "( stty -echo -echoe -echok 9600; cat > ${SERIAL_LOG_FILE} ) < /dev/ttyUSB0" &
+
+# Wireshark _everything_ we can see...
+#DUMPCAP_LOG_FILE=$(date "/home/driver/tmp/robot_logs/dumpcap.%F_%H-%M-%S")
+#DUMPCAP_STDOUT_FILE=$(date "/home/driver/tmp/robot_logs/stdout_dumpcap.%F_%H-%M-%S")
+#chrt -o 0 bash -c "dumpcap -i eth0 -w ${DUMPCAP_LOG_FILE} -f 'not port 8080 and not net 10.9.71.13' > ${DUMPCAP_STDOUT_FILE}" &
+
+chrt -o 0 bash -c 'while true; /home/driver/robot_code/bin/netconsole /home/driver/tmp/robot_logs/netconsole-`date +%s`; done' &
diff --git a/aos/linux_code/starter/starter_loop.sh b/aos/linux_code/starter/starter_loop.sh
new file mode 100755
index 0000000..83d34cc
--- /dev/null
+++ b/aos/linux_code/starter/starter_loop.sh
@@ -0,0 +1,6 @@
+#!/bin/bash
+
+for ((i=1; 1; i++)); do
+ starter_exe $* 1>/tmp/starter${i}_stdout 2>/tmp/starter${i}_stderr
+ sleep 2
+done
diff --git a/aos/linux_code/starter/testing_list.txt b/aos/linux_code/starter/testing_list.txt
new file mode 100644
index 0000000..96d412d
--- /dev/null
+++ b/aos/linux_code/starter/testing_list.txt
@@ -0,0 +1,3 @@
+../bin/LogReader
+../bin/JoystickCode
+../bin/AutoMode
diff --git a/aos/linux_code/thread_local.h b/aos/linux_code/thread_local.h
new file mode 100644
index 0000000..503e18c
--- /dev/null
+++ b/aos/linux_code/thread_local.h
@@ -0,0 +1,13 @@
+#ifndef AOS_LINUX_CODE_THREAD_LOCAL_H_
+#define AOS_LINUX_CODE_THREAD_LOCAL_H_
+
+// The storage class to use when declaring thread-local variables. This provides
+// a single place to change it if/when we want to switch to something standard.
+//
+// Example: AOS_THREAD_LOCAL void *bla; // at namespace (aka global) scope
+//
+// C++11 has thread_local, but it's not clear whether Clang supports that as of
+// 12/18/12.
+#define AOS_THREAD_LOCAL __thread
+
+#endif // AOS_LINUX_CODE_THREAD_LOCAL_H_