Checking in a jpeg streamer server library for sending jpegs to clients.

Change-Id: Ibc4cabe5e278c834226737815a9deb982f5186bc
diff --git a/y2018/vision/image_streamer.cc b/y2018/vision/image_streamer.cc
new file mode 100644
index 0000000..f4ad36a
--- /dev/null
+++ b/y2018/vision/image_streamer.cc
@@ -0,0 +1,235 @@
+#include "aos/vision/events/socket_types.h"
+#include "aos/vision/image/reader.h"
+#include "aos/vision/image/image_stream.h"
+#include "aos/common/logging/implementations.h"
+#include "aos/common/logging/logging.h"
+#include "aos/vision/blob/codec.h"
+#include <fstream>
+#include <sys/stat.h>
+#include <deque>
+#include <string>
+
+using aos::events::TCPServer;
+using aos::events::DataSocket;
+using aos::vision::Int32Codec;
+using aos::vision::DataRef;
+
+aos::vision::DataRef mjpg_header = "HTTP/1.0 200 OK\r\n"\
+      "Server: YourServerName\r\n"\
+      "Connection: close\r\n"\
+      "Max-Age: 0\r\n"\
+      "Expires: 0\r\n"\
+      "Cache-Control: no-cache, private\r\n"\
+      "Pragma: no-cache\r\n"\
+      "Content-Type: multipart/x-mixed-replace; "\
+      "boundary=--boundary\r\n\r\n";
+
+struct Frame {
+  std::string data;
+};
+
+inline bool FileExist(const std::string &name) {
+  struct stat buffer;
+  return (stat(name.c_str(), &buffer) == 0);
+}
+
+class BlobLog {
+ public:
+  explicit BlobLog(const char *prefix, const char *extension) {
+    int index = 0;
+    while (true) {
+      std::string file = prefix + std::to_string(index) + extension;
+      if (FileExist(file)) {
+        index++;
+      } else {
+        printf("Logging to file (%s)\n", file.c_str());
+        ofst_.open(file);
+        assert(ofst_.is_open());
+        break;
+      }
+    }
+  }
+
+  ~BlobLog() { ofst_.close(); }
+
+  void WriteLogEntry(DataRef data) { ofst_.write(&data[0], data.size()); }
+
+ private:
+  std::ofstream ofst_;
+};
+
+class MjpegDataSocket : public aos::events::SocketConnection {
+ public:
+
+  MjpegDataSocket(aos::events::TCPServerBase *serv, int fd)
+      : aos::events::SocketConnection(serv, fd) {
+    SetEvents(EPOLLOUT | EPOLLET);
+  }
+
+  ~MjpegDataSocket() { printf("Closed connection on descriptor %d\n", fd()); }
+
+  void DirectEvent(uint32_t events) override {
+    if (events & EPOLLOUT) {
+      NewDataToSend();
+      events &= ~EPOLLOUT;
+    }
+    if (events) {
+      aos::events::EpollEvent::DirectEvent(events);
+    }
+  }
+
+  void ReadEvent() override {
+    // Ignore reads, but don't leave them pending.
+    ssize_t count;
+    char buf[512];
+    while (true) {
+      count = read(fd(), &buf, sizeof buf);
+      if (count <= 0) {
+        if (errno != EAGAIN) {
+          CloseConnection();
+          return;
+        }
+        break;
+      } else if (!ready_to_recieve_) {
+        // This 4 should match "\r\n\r\n".length();
+        if (match_i_ >= 4) {
+          printf("reading after last match\n");
+          continue;
+        }
+        for (char c : aos::vision::DataRef(&buf[0], count)) {
+          if (c == "\r\n\r\n"[match_i_]) {
+            ++match_i_;
+            if (match_i_ >= 4) {
+              if (!ready_to_recieve_) {
+                ready_to_recieve_ = true;
+                RasterHeader();
+              }
+            }
+          } else if (match_i_ != 0) {
+            if (c == '\r') match_i_ = 1;
+          }
+        }
+      }
+    }
+  }
+
+  void RasterHeader() {
+    output_buffer_.push_back(mjpg_header);
+    NewDataToSend();
+  }
+
+  void RasterFrame(std::shared_ptr<Frame> frame) {
+    if (!output_buffer_.empty() || !ready_to_recieve_) return;
+    sending_frame_ = frame;
+    aos::vision::DataRef data = frame->data;
+
+    size_t n_written = snprintf(data_header_tmp_, sizeof(data_header_tmp_),
+                                "--boundary\r\n"\
+                                "Content-type: image/jpg\r\n"\
+                                "Content-Length: %zu\r\n\r\n", data.size());
+    // This should never happen because the buffer should be properly sized.
+    if (n_written == sizeof(data_header_tmp_)) {
+      fprintf(stderr, "wrong sized buffer\n");
+      exit(-1);
+    }
+    output_buffer_.push_back(aos::vision::DataRef(data_header_tmp_, n_written));
+    output_buffer_.push_back(data);
+    output_buffer_.push_back("\r\n\r\n");
+    NewDataToSend();
+  }
+
+  void NewFrame(std::shared_ptr<Frame> frame) {
+    RasterFrame(std::move(frame));
+  }
+
+  void NewDataToSend() {
+    while (!output_buffer_.empty()) {
+      auto& data = *output_buffer_.begin();
+
+      while (!data.empty()) {
+        int len = send(fd(), data.data(), data.size(), MSG_NOSIGNAL);
+        if (len == -1) {
+          if (errno == EAGAIN) {
+            // Next thinggy will pick this up.
+            return;
+          } else {
+            CloseConnection();
+            return;
+          }
+        } else {
+          data.remove_prefix(len);
+        }
+      }
+      output_buffer_.pop_front();
+    }
+  }
+
+ private:
+  char data_header_tmp_[512];
+  std::shared_ptr<Frame> sending_frame_;
+  std::deque<aos::vision::DataRef> output_buffer_;
+
+  bool ready_to_recieve_ = false;
+  void CloseConnection() {
+    loop()->Delete(this);
+    close(fd());
+    delete this;
+  }
+  size_t match_i_ = 0;
+};
+
+using namespace aos::vision;
+class CameraStream : public ImageStreamEvent {
+ public:
+  CameraStream(aos::vision::CameraParams params, 
+               const std::string &fname, TCPServer<MjpegDataSocket>* tcp_serv)
+      : ImageStreamEvent(fname, params), tcp_serv_(tcp_serv),
+        log_("./logging/blob_record_", ".dat") {}      
+
+  void ProcessImage(DataRef data, aos::monotonic_clock::time_point tp) {
+    (void)tp;
+    ++sampling;
+    // 20 is the sampling rate.
+    if (sampling == 20) {
+      int tmp_size = data.size() + sizeof(int32_t);
+      char *buf;
+      std::string log_record;
+      log_record.resize(tmp_size, 0);
+      {
+        buf = Int32Codec::Write(&log_record[0], tmp_size);
+        data.copy(buf, data.size());
+      }
+      log_.WriteLogEntry(log_record);
+      sampling = 0;
+    }
+
+    auto frame = std::make_shared<Frame>(Frame{std::string(data)});
+    tcp_serv_->Broadcast([frame](MjpegDataSocket* event) {
+                         event->NewFrame(frame);
+                         });
+  }
+ private:
+  int sampling = 0;
+  TCPServer<MjpegDataSocket>* tcp_serv_;
+  BlobLog log_;
+};
+
+int main() {
+  ::aos::logging::Init();
+  ::aos::logging::AddImplementation(
+      new ::aos::logging::StreamLogImplementation(stderr));
+
+  TCPServer<MjpegDataSocket> tcp_serv_(80);
+  aos::vision::CameraParams params;
+  params.set_exposure(300);
+  params.set_width(320);
+  params.set_height(240);
+  CameraStream camera(params, "/dev/video0", &tcp_serv_);
+
+  aos::events::EpollLoop loop;
+  loop.Add(&tcp_serv_);
+  loop.Add(&camera);
+
+  printf("Running Camera\n");
+  loop.Run();
+}