Parker Schuh | c1975fc | 2018-04-07 15:27:07 -0700 | [diff] [blame] | 1 | #include "aos/vision/events/socket_types.h" |
| 2 | #include "aos/vision/image/reader.h" |
| 3 | #include "aos/vision/image/image_stream.h" |
| 4 | #include "aos/common/logging/implementations.h" |
| 5 | #include "aos/common/logging/logging.h" |
| 6 | #include "aos/vision/blob/codec.h" |
| 7 | #include <fstream> |
| 8 | #include <sys/stat.h> |
| 9 | #include <deque> |
| 10 | #include <string> |
| 11 | |
| 12 | using aos::events::TCPServer; |
| 13 | using aos::events::DataSocket; |
| 14 | using aos::vision::Int32Codec; |
| 15 | using aos::vision::DataRef; |
| 16 | |
| 17 | aos::vision::DataRef mjpg_header = "HTTP/1.0 200 OK\r\n"\ |
| 18 | "Server: YourServerName\r\n"\ |
| 19 | "Connection: close\r\n"\ |
| 20 | "Max-Age: 0\r\n"\ |
| 21 | "Expires: 0\r\n"\ |
| 22 | "Cache-Control: no-cache, private\r\n"\ |
| 23 | "Pragma: no-cache\r\n"\ |
| 24 | "Content-Type: multipart/x-mixed-replace; "\ |
| 25 | "boundary=--boundary\r\n\r\n"; |
| 26 | |
| 27 | struct Frame { |
| 28 | std::string data; |
| 29 | }; |
| 30 | |
| 31 | inline bool FileExist(const std::string &name) { |
| 32 | struct stat buffer; |
| 33 | return (stat(name.c_str(), &buffer) == 0); |
| 34 | } |
| 35 | |
| 36 | class BlobLog { |
| 37 | public: |
| 38 | explicit BlobLog(const char *prefix, const char *extension) { |
| 39 | int index = 0; |
| 40 | while (true) { |
| 41 | std::string file = prefix + std::to_string(index) + extension; |
| 42 | if (FileExist(file)) { |
| 43 | index++; |
| 44 | } else { |
| 45 | printf("Logging to file (%s)\n", file.c_str()); |
| 46 | ofst_.open(file); |
| 47 | assert(ofst_.is_open()); |
| 48 | break; |
| 49 | } |
| 50 | } |
| 51 | } |
| 52 | |
| 53 | ~BlobLog() { ofst_.close(); } |
| 54 | |
| 55 | void WriteLogEntry(DataRef data) { ofst_.write(&data[0], data.size()); } |
| 56 | |
| 57 | private: |
| 58 | std::ofstream ofst_; |
| 59 | }; |
| 60 | |
| 61 | class MjpegDataSocket : public aos::events::SocketConnection { |
| 62 | public: |
| 63 | |
| 64 | MjpegDataSocket(aos::events::TCPServerBase *serv, int fd) |
| 65 | : aos::events::SocketConnection(serv, fd) { |
| 66 | SetEvents(EPOLLOUT | EPOLLET); |
| 67 | } |
| 68 | |
| 69 | ~MjpegDataSocket() { printf("Closed connection on descriptor %d\n", fd()); } |
| 70 | |
| 71 | void DirectEvent(uint32_t events) override { |
| 72 | if (events & EPOLLOUT) { |
| 73 | NewDataToSend(); |
| 74 | events &= ~EPOLLOUT; |
| 75 | } |
| 76 | if (events) { |
| 77 | aos::events::EpollEvent::DirectEvent(events); |
| 78 | } |
| 79 | } |
| 80 | |
| 81 | void ReadEvent() override { |
| 82 | // Ignore reads, but don't leave them pending. |
| 83 | ssize_t count; |
| 84 | char buf[512]; |
| 85 | while (true) { |
| 86 | count = read(fd(), &buf, sizeof buf); |
| 87 | if (count <= 0) { |
| 88 | if (errno != EAGAIN) { |
| 89 | CloseConnection(); |
| 90 | return; |
| 91 | } |
| 92 | break; |
| 93 | } else if (!ready_to_recieve_) { |
| 94 | // This 4 should match "\r\n\r\n".length(); |
| 95 | if (match_i_ >= 4) { |
| 96 | printf("reading after last match\n"); |
| 97 | continue; |
| 98 | } |
| 99 | for (char c : aos::vision::DataRef(&buf[0], count)) { |
| 100 | if (c == "\r\n\r\n"[match_i_]) { |
| 101 | ++match_i_; |
| 102 | if (match_i_ >= 4) { |
| 103 | if (!ready_to_recieve_) { |
| 104 | ready_to_recieve_ = true; |
| 105 | RasterHeader(); |
| 106 | } |
| 107 | } |
| 108 | } else if (match_i_ != 0) { |
| 109 | if (c == '\r') match_i_ = 1; |
| 110 | } |
| 111 | } |
| 112 | } |
| 113 | } |
| 114 | } |
| 115 | |
| 116 | void RasterHeader() { |
| 117 | output_buffer_.push_back(mjpg_header); |
| 118 | NewDataToSend(); |
| 119 | } |
| 120 | |
| 121 | void RasterFrame(std::shared_ptr<Frame> frame) { |
| 122 | if (!output_buffer_.empty() || !ready_to_recieve_) return; |
| 123 | sending_frame_ = frame; |
| 124 | aos::vision::DataRef data = frame->data; |
| 125 | |
| 126 | size_t n_written = snprintf(data_header_tmp_, sizeof(data_header_tmp_), |
| 127 | "--boundary\r\n"\ |
| 128 | "Content-type: image/jpg\r\n"\ |
| 129 | "Content-Length: %zu\r\n\r\n", data.size()); |
| 130 | // This should never happen because the buffer should be properly sized. |
| 131 | if (n_written == sizeof(data_header_tmp_)) { |
| 132 | fprintf(stderr, "wrong sized buffer\n"); |
| 133 | exit(-1); |
| 134 | } |
| 135 | output_buffer_.push_back(aos::vision::DataRef(data_header_tmp_, n_written)); |
| 136 | output_buffer_.push_back(data); |
| 137 | output_buffer_.push_back("\r\n\r\n"); |
| 138 | NewDataToSend(); |
| 139 | } |
| 140 | |
| 141 | void NewFrame(std::shared_ptr<Frame> frame) { |
| 142 | RasterFrame(std::move(frame)); |
| 143 | } |
| 144 | |
| 145 | void NewDataToSend() { |
| 146 | while (!output_buffer_.empty()) { |
| 147 | auto& data = *output_buffer_.begin(); |
| 148 | |
| 149 | while (!data.empty()) { |
| 150 | int len = send(fd(), data.data(), data.size(), MSG_NOSIGNAL); |
| 151 | if (len == -1) { |
| 152 | if (errno == EAGAIN) { |
| 153 | // Next thinggy will pick this up. |
| 154 | return; |
| 155 | } else { |
| 156 | CloseConnection(); |
| 157 | return; |
| 158 | } |
| 159 | } else { |
| 160 | data.remove_prefix(len); |
| 161 | } |
| 162 | } |
| 163 | output_buffer_.pop_front(); |
| 164 | } |
| 165 | } |
| 166 | |
| 167 | private: |
| 168 | char data_header_tmp_[512]; |
| 169 | std::shared_ptr<Frame> sending_frame_; |
| 170 | std::deque<aos::vision::DataRef> output_buffer_; |
| 171 | |
| 172 | bool ready_to_recieve_ = false; |
| 173 | void CloseConnection() { |
| 174 | loop()->Delete(this); |
| 175 | close(fd()); |
| 176 | delete this; |
| 177 | } |
| 178 | size_t match_i_ = 0; |
| 179 | }; |
| 180 | |
| 181 | using namespace aos::vision; |
| 182 | class CameraStream : public ImageStreamEvent { |
| 183 | public: |
| 184 | CameraStream(aos::vision::CameraParams params, |
| 185 | const std::string &fname, TCPServer<MjpegDataSocket>* tcp_serv) |
| 186 | : ImageStreamEvent(fname, params), tcp_serv_(tcp_serv), |
| 187 | log_("./logging/blob_record_", ".dat") {} |
| 188 | |
| 189 | void ProcessImage(DataRef data, aos::monotonic_clock::time_point tp) { |
| 190 | (void)tp; |
| 191 | ++sampling; |
| 192 | // 20 is the sampling rate. |
| 193 | if (sampling == 20) { |
| 194 | int tmp_size = data.size() + sizeof(int32_t); |
| 195 | char *buf; |
| 196 | std::string log_record; |
| 197 | log_record.resize(tmp_size, 0); |
| 198 | { |
| 199 | buf = Int32Codec::Write(&log_record[0], tmp_size); |
| 200 | data.copy(buf, data.size()); |
| 201 | } |
| 202 | log_.WriteLogEntry(log_record); |
| 203 | sampling = 0; |
| 204 | } |
| 205 | |
| 206 | auto frame = std::make_shared<Frame>(Frame{std::string(data)}); |
| 207 | tcp_serv_->Broadcast([frame](MjpegDataSocket* event) { |
| 208 | event->NewFrame(frame); |
| 209 | }); |
| 210 | } |
| 211 | private: |
| 212 | int sampling = 0; |
| 213 | TCPServer<MjpegDataSocket>* tcp_serv_; |
| 214 | BlobLog log_; |
| 215 | }; |
| 216 | |
| 217 | int main() { |
| 218 | ::aos::logging::Init(); |
| 219 | ::aos::logging::AddImplementation( |
| 220 | new ::aos::logging::StreamLogImplementation(stderr)); |
| 221 | |
| 222 | TCPServer<MjpegDataSocket> tcp_serv_(80); |
| 223 | aos::vision::CameraParams params; |
Michael Schuh | 8e69245 | 2018-04-07 17:35:04 -0700 | [diff] [blame] | 224 | params.set_exposure(100); |
Parker Schuh | c1975fc | 2018-04-07 15:27:07 -0700 | [diff] [blame] | 225 | params.set_width(320); |
| 226 | params.set_height(240); |
| 227 | CameraStream camera(params, "/dev/video0", &tcp_serv_); |
| 228 | |
| 229 | aos::events::EpollLoop loop; |
| 230 | loop.Add(&tcp_serv_); |
| 231 | loop.Add(&camera); |
| 232 | |
| 233 | printf("Running Camera\n"); |
| 234 | loop.Run(); |
| 235 | } |