Checking in a jpeg streamer server library for sending jpegs to clients.

Change-Id: Ibc4cabe5e278c834226737815a9deb982f5186bc
diff --git a/aos/vision/events/epoll_events.cc b/aos/vision/events/epoll_events.cc
index d49043c..b0c66fa 100644
--- a/aos/vision/events/epoll_events.cc
+++ b/aos/vision/events/epoll_events.cc
@@ -18,11 +18,12 @@
   event->loop_ = this;
   struct epoll_event temp_event;
   temp_event.data.ptr = static_cast<void *>(event);
-  temp_event.events = EPOLLIN;
+  temp_event.events = event->events();
   PCHECK(epoll_ctl(epoll_fd(), EPOLL_CTL_ADD, event->fd(), &temp_event));
 }
 
 void EpollLoop::Delete(EpollEvent *event) {
+  event->loop_ = nullptr;
   PCHECK(epoll_ctl(epoll_fd(), EPOLL_CTL_DEL, event->fd(), NULL));
 }
 
@@ -35,12 +36,7 @@
         PCHECK(epoll_wait(epoll_fd(), events, kNumberOfEvents, timeout));
 
     for (int i = 0; i < number_events; i++) {
-      EpollEvent *event = static_cast<EpollEvent *>(events[i].data.ptr);
-      if ((events[i].events & ~(EPOLLIN | EPOLLPRI | EPOLLERR)) != 0) {
-        LOG(FATAL, "unexpected epoll events set in %x on %d\n",
-            events[i].events, event->fd());
-      }
-      event->ReadEvent();
+      static_cast<EpollEvent *>(events[i].data.ptr)->DirectEvent(events[i].events);
     }
   }
 }
diff --git a/aos/vision/events/epoll_events.h b/aos/vision/events/epoll_events.h
index 3b7d75a..234d949 100644
--- a/aos/vision/events/epoll_events.h
+++ b/aos/vision/events/epoll_events.h
@@ -5,6 +5,7 @@
 #include <stdint.h>
 #include <memory>
 #include <vector>
+#include <sys/epoll.h>
 
 #include "aos/common/scoped_fd.h"
 #include "aos/common/time.h"
@@ -73,10 +74,27 @@
   // loop degrades into a busy loop.
   virtual void ReadEvent() = 0;
 
+  // Handle Events directly from epoll.
+  virtual void DirectEvent(uint32_t events) {
+    if ((events & ~(EPOLLIN | EPOLLPRI | EPOLLERR)) != 0) {
+      LOG(FATAL, "unexpected epoll events set in %x on %d\n",
+          events, fd());
+    }
+    ReadEvent();
+  }
+
   EpollLoop *loop() { return loop_; }
 
+  void SetEvents(uint32_t events) {
+    events_ |= events;
+    CHECK(!loop_);
+  }
+
+  uint32_t events() const { return events_; }
+
  private:
   const int fd_;
+  uint32_t events_ = EPOLLIN;
   friend class EpollLoop;
   EpollLoop *loop_ = nullptr;
 };
@@ -113,6 +131,7 @@
   int epoll_fd() { return epoll_fd_.get(); }
 
   int CalculateTimeout();
+  friend class EpollEvent;
 
   ::aos::ScopedFD epoll_fd_;
   ::std::vector<EpollWait *> waits_;
diff --git a/y2018/vision/BUILD b/y2018/vision/BUILD
new file mode 100644
index 0000000..11fb7c5
--- /dev/null
+++ b/y2018/vision/BUILD
@@ -0,0 +1,13 @@
+cc_binary(
+  name = "image_streamer",
+  srcs = ["image_streamer.cc"],
+  deps = [
+    "//aos/vision/events:socket_types",
+    '//aos/vision/events:epoll_events',
+    '//aos/vision/image:reader',
+    '//aos/vision/image:image_stream',
+    '//aos/vision/blob:codec',
+    '//aos/common/logging:logging',
+    '//aos/common/logging:implementations',
+  ],
+)
diff --git a/y2018/vision/README_backup_command.txt b/y2018/vision/README_backup_command.txt
new file mode 100644
index 0000000..7cfccf7
--- /dev/null
+++ b/y2018/vision/README_backup_command.txt
@@ -0,0 +1,2 @@
+
+tar -cvzf /backups/20160403_JetsonPracticBot_root_files.tgz root --exclude="logging/*"
diff --git a/y2018/vision/README_jetsonboard_setup.txt b/y2018/vision/README_jetsonboard_setup.txt
new file mode 100644
index 0000000..87177da
--- /dev/null
+++ b/y2018/vision/README_jetsonboard_setup.txt
@@ -0,0 +1,21 @@
+# Put the following lines in the file shown.
+
+echo "Current contents are: "
+cat  /etc/supervisor/conf.d/vision.conf 
+cat > /etc/supervisor/conf.d/vision.conf <<EOF
+[program:vision]
+command=/root/startup.sh
+autorestart=true
+startsecs=3
+startretries=200
+redirect_stderr=true
+EOF
+
+
+# before we switched at svr at 9:47 am, the cameras were.
+# The left camera is right side up and red
+# The right camera is upside down and green
+
+# before we switched at svr at 9:47 am, the cameras were.
+# The left camera is green
+# The right camera is red
diff --git a/y2018/vision/README_usb_to_ethernet.txt b/y2018/vision/README_usb_to_ethernet.txt
new file mode 100644
index 0000000..06bcea2
--- /dev/null
+++ b/y2018/vision/README_usb_to_ethernet.txt
@@ -0,0 +1,25 @@
+
+in ie:/etc# cat sysctl.conf 
+set to 1 the following flag:
+# Uncomment the next line to enable packet forwarding for IPv4
+net.ipv4.ip_forward=1
+
+
+ cat > /etc/iptables.v4
+*nat
+-A POSTROUTING -o eth0 -j MASQUERADE
+COMMIT
+
+END
+
+add to /etc/rc.local:
+iptables-restore < /etc/iptables.v4
+
+
+
+
+create /etc/network/interfaces.d/eth1 with:
+allow-hotplug eth1
+iface eth1 inet static
+        address 11.0.0.179
+        netmask 255.0.0.0
diff --git a/y2018/vision/exposure_2018.sh b/y2018/vision/exposure_2018.sh
new file mode 100755
index 0000000..e021919
--- /dev/null
+++ b/y2018/vision/exposure_2018.sh
@@ -0,0 +1,35 @@
+#!/bin/bash
+
+set -e
+
+A=`ls /dev/video*`
+EXPOSURE='2000'
+
+echo $SHELL
+
+echo $A
+
+# Michael added this one to try and have the exposer set correctly sooner.
+sleep 1
+echo Setting exposure again after 1 seconds
+
+v4l2-ctl --set-ctrl="exposure_absolute=$EXPOSURE" -d $A
+
+echo Done setting exposure again after 1 seconds
+
+
+# Michael added this one to try and have the exposer set correctly sooner.
+sleep 5
+echo Setting exposure again after 5 seconds
+v4l2-ctl --set-ctrl="exposure_absolute=$EXPOSURE" -d $A
+
+
+echo Done setting exposure again after 5 seconds
+
+#sleep 15
+#
+#echo Setting exposure again after 20 seconds
+#
+#v4l2-ctl --set-ctrl="exposure_absolute=$EXPOSURE" -d $A
+#
+#echo Done setting exposure again after 20 seconds
diff --git a/y2018/vision/image_streamer.cc b/y2018/vision/image_streamer.cc
new file mode 100644
index 0000000..f4ad36a
--- /dev/null
+++ b/y2018/vision/image_streamer.cc
@@ -0,0 +1,235 @@
+#include "aos/vision/events/socket_types.h"
+#include "aos/vision/image/reader.h"
+#include "aos/vision/image/image_stream.h"
+#include "aos/common/logging/implementations.h"
+#include "aos/common/logging/logging.h"
+#include "aos/vision/blob/codec.h"
+#include <fstream>
+#include <sys/stat.h>
+#include <deque>
+#include <string>
+
+using aos::events::TCPServer;
+using aos::events::DataSocket;
+using aos::vision::Int32Codec;
+using aos::vision::DataRef;
+
+aos::vision::DataRef mjpg_header = "HTTP/1.0 200 OK\r\n"\
+      "Server: YourServerName\r\n"\
+      "Connection: close\r\n"\
+      "Max-Age: 0\r\n"\
+      "Expires: 0\r\n"\
+      "Cache-Control: no-cache, private\r\n"\
+      "Pragma: no-cache\r\n"\
+      "Content-Type: multipart/x-mixed-replace; "\
+      "boundary=--boundary\r\n\r\n";
+
+struct Frame {
+  std::string data;
+};
+
+inline bool FileExist(const std::string &name) {
+  struct stat buffer;
+  return (stat(name.c_str(), &buffer) == 0);
+}
+
+class BlobLog {
+ public:
+  explicit BlobLog(const char *prefix, const char *extension) {
+    int index = 0;
+    while (true) {
+      std::string file = prefix + std::to_string(index) + extension;
+      if (FileExist(file)) {
+        index++;
+      } else {
+        printf("Logging to file (%s)\n", file.c_str());
+        ofst_.open(file);
+        assert(ofst_.is_open());
+        break;
+      }
+    }
+  }
+
+  ~BlobLog() { ofst_.close(); }
+
+  void WriteLogEntry(DataRef data) { ofst_.write(&data[0], data.size()); }
+
+ private:
+  std::ofstream ofst_;
+};
+
+class MjpegDataSocket : public aos::events::SocketConnection {
+ public:
+
+  MjpegDataSocket(aos::events::TCPServerBase *serv, int fd)
+      : aos::events::SocketConnection(serv, fd) {
+    SetEvents(EPOLLOUT | EPOLLET);
+  }
+
+  ~MjpegDataSocket() { printf("Closed connection on descriptor %d\n", fd()); }
+
+  void DirectEvent(uint32_t events) override {
+    if (events & EPOLLOUT) {
+      NewDataToSend();
+      events &= ~EPOLLOUT;
+    }
+    if (events) {
+      aos::events::EpollEvent::DirectEvent(events);
+    }
+  }
+
+  void ReadEvent() override {
+    // Ignore reads, but don't leave them pending.
+    ssize_t count;
+    char buf[512];
+    while (true) {
+      count = read(fd(), &buf, sizeof buf);
+      if (count <= 0) {
+        if (errno != EAGAIN) {
+          CloseConnection();
+          return;
+        }
+        break;
+      } else if (!ready_to_recieve_) {
+        // This 4 should match "\r\n\r\n".length();
+        if (match_i_ >= 4) {
+          printf("reading after last match\n");
+          continue;
+        }
+        for (char c : aos::vision::DataRef(&buf[0], count)) {
+          if (c == "\r\n\r\n"[match_i_]) {
+            ++match_i_;
+            if (match_i_ >= 4) {
+              if (!ready_to_recieve_) {
+                ready_to_recieve_ = true;
+                RasterHeader();
+              }
+            }
+          } else if (match_i_ != 0) {
+            if (c == '\r') match_i_ = 1;
+          }
+        }
+      }
+    }
+  }
+
+  void RasterHeader() {
+    output_buffer_.push_back(mjpg_header);
+    NewDataToSend();
+  }
+
+  void RasterFrame(std::shared_ptr<Frame> frame) {
+    if (!output_buffer_.empty() || !ready_to_recieve_) return;
+    sending_frame_ = frame;
+    aos::vision::DataRef data = frame->data;
+
+    size_t n_written = snprintf(data_header_tmp_, sizeof(data_header_tmp_),
+                                "--boundary\r\n"\
+                                "Content-type: image/jpg\r\n"\
+                                "Content-Length: %zu\r\n\r\n", data.size());
+    // This should never happen because the buffer should be properly sized.
+    if (n_written == sizeof(data_header_tmp_)) {
+      fprintf(stderr, "wrong sized buffer\n");
+      exit(-1);
+    }
+    output_buffer_.push_back(aos::vision::DataRef(data_header_tmp_, n_written));
+    output_buffer_.push_back(data);
+    output_buffer_.push_back("\r\n\r\n");
+    NewDataToSend();
+  }
+
+  void NewFrame(std::shared_ptr<Frame> frame) {
+    RasterFrame(std::move(frame));
+  }
+
+  void NewDataToSend() {
+    while (!output_buffer_.empty()) {
+      auto& data = *output_buffer_.begin();
+
+      while (!data.empty()) {
+        int len = send(fd(), data.data(), data.size(), MSG_NOSIGNAL);
+        if (len == -1) {
+          if (errno == EAGAIN) {
+            // Next thinggy will pick this up.
+            return;
+          } else {
+            CloseConnection();
+            return;
+          }
+        } else {
+          data.remove_prefix(len);
+        }
+      }
+      output_buffer_.pop_front();
+    }
+  }
+
+ private:
+  char data_header_tmp_[512];
+  std::shared_ptr<Frame> sending_frame_;
+  std::deque<aos::vision::DataRef> output_buffer_;
+
+  bool ready_to_recieve_ = false;
+  void CloseConnection() {
+    loop()->Delete(this);
+    close(fd());
+    delete this;
+  }
+  size_t match_i_ = 0;
+};
+
+using namespace aos::vision;
+class CameraStream : public ImageStreamEvent {
+ public:
+  CameraStream(aos::vision::CameraParams params, 
+               const std::string &fname, TCPServer<MjpegDataSocket>* tcp_serv)
+      : ImageStreamEvent(fname, params), tcp_serv_(tcp_serv),
+        log_("./logging/blob_record_", ".dat") {}      
+
+  void ProcessImage(DataRef data, aos::monotonic_clock::time_point tp) {
+    (void)tp;
+    ++sampling;
+    // 20 is the sampling rate.
+    if (sampling == 20) {
+      int tmp_size = data.size() + sizeof(int32_t);
+      char *buf;
+      std::string log_record;
+      log_record.resize(tmp_size, 0);
+      {
+        buf = Int32Codec::Write(&log_record[0], tmp_size);
+        data.copy(buf, data.size());
+      }
+      log_.WriteLogEntry(log_record);
+      sampling = 0;
+    }
+
+    auto frame = std::make_shared<Frame>(Frame{std::string(data)});
+    tcp_serv_->Broadcast([frame](MjpegDataSocket* event) {
+                         event->NewFrame(frame);
+                         });
+  }
+ private:
+  int sampling = 0;
+  TCPServer<MjpegDataSocket>* tcp_serv_;
+  BlobLog log_;
+};
+
+int main() {
+  ::aos::logging::Init();
+  ::aos::logging::AddImplementation(
+      new ::aos::logging::StreamLogImplementation(stderr));
+
+  TCPServer<MjpegDataSocket> tcp_serv_(80);
+  aos::vision::CameraParams params;
+  params.set_exposure(300);
+  params.set_width(320);
+  params.set_height(240);
+  CameraStream camera(params, "/dev/video0", &tcp_serv_);
+
+  aos::events::EpollLoop loop;
+  loop.Add(&tcp_serv_);
+  loop.Add(&camera);
+
+  printf("Running Camera\n");
+  loop.Run();
+}
diff --git a/y2018/vision/startup.sh b/y2018/vision/startup.sh
new file mode 100755
index 0000000..5251380
--- /dev/null
+++ b/y2018/vision/startup.sh
@@ -0,0 +1,54 @@
+#!/bin/bash
+
+set -e
+
+cd /root/
+
+sleep 1
+
+echo performance > /sys/devices/system/cpu/cpu0/cpufreq/scaling_governor
+
+sleep 1
+
+
+# Comp Bot
+# while [ ! -e /dev/v4l/by-id/usb-046d_0825_A17C8DE0-video-index0 ] ; do echo no camera1 && sleep 1 ; done
+# while [ ! -e /dev/v4l/by-id/usb-046d_0825_B914CDE0-video-index0 ] ; do echo no camera2 && sleep 1 ; done
+# Practice Bot
+# while [ ! -e /dev/v4l/by-id/usb-046d_0825_9224CDE0-video-index0 ] ; do echo no camera1 && sleep 1 ; done
+# while [ ! -e /dev/v4l/by-id/usb-046d_0825_B07B8DE0-video-index0 ] ; do echo no camera2 && sleep 1 ; done
+
+# v4l2-ctl --set-ctrl="exposure_auto=1" -d /dev/video0
+# sleep 0.5
+# v4l2-ctl --set-ctrl="exposure_auto=1" -d /dev/video1
+# sleep 2
+# echo All done disabling auto-exposure
+# v4l2-ctl --set-ctrl="exposure_absolute=20" -d /dev/video0
+# sleep 0.5
+# v4l2-ctl --set-ctrl="exposure_absolute=20" -d /dev/video1
+# sleep 1
+# echo All done setting exposure
+
+# echo "Starting target sender now."
+A=`ls /dev/video*`
+
+echo $SHELL
+
+echo $A
+
+v4l2-ctl --set-ctrl="exposure_auto=1" -d $A
+sleep 0.5
+v4l2-ctl --set-ctrl="exposure_absolute=2000" -d $A
+sleep 0.5
+
+PATH="./;$PATH"
+
+/root/camera_primer $A
+
+# Run a script to reset the exposure a few times and exit.
+/root/exposure_2018.sh &
+
+/root/image_streamer
+#exec ./target_sender Practice
+#exec ./target_sender Comp
+#exec ./target_sender Spare