blob: ce6f5e2f42ee586c9f1d0e206382cfee95beef6e [file] [log] [blame]
Parker Schuhc1975fc2018-04-07 15:27:07 -07001#include "aos/vision/events/socket_types.h"
2#include "aos/vision/image/reader.h"
3#include "aos/vision/image/image_stream.h"
4#include "aos/common/logging/implementations.h"
5#include "aos/common/logging/logging.h"
6#include "aos/vision/blob/codec.h"
7#include <fstream>
8#include <sys/stat.h>
9#include <deque>
10#include <string>
11
12using aos::events::TCPServer;
13using aos::events::DataSocket;
14using aos::vision::Int32Codec;
15using aos::vision::DataRef;
16
17aos::vision::DataRef mjpg_header = "HTTP/1.0 200 OK\r\n"\
18 "Server: YourServerName\r\n"\
19 "Connection: close\r\n"\
20 "Max-Age: 0\r\n"\
21 "Expires: 0\r\n"\
22 "Cache-Control: no-cache, private\r\n"\
23 "Pragma: no-cache\r\n"\
24 "Content-Type: multipart/x-mixed-replace; "\
25 "boundary=--boundary\r\n\r\n";
26
27struct Frame {
28 std::string data;
29};
30
31inline bool FileExist(const std::string &name) {
32 struct stat buffer;
33 return (stat(name.c_str(), &buffer) == 0);
34}
35
36class BlobLog {
37 public:
38 explicit BlobLog(const char *prefix, const char *extension) {
39 int index = 0;
40 while (true) {
41 std::string file = prefix + std::to_string(index) + extension;
42 if (FileExist(file)) {
43 index++;
44 } else {
45 printf("Logging to file (%s)\n", file.c_str());
46 ofst_.open(file);
47 assert(ofst_.is_open());
48 break;
49 }
50 }
51 }
52
53 ~BlobLog() { ofst_.close(); }
54
55 void WriteLogEntry(DataRef data) { ofst_.write(&data[0], data.size()); }
56
57 private:
58 std::ofstream ofst_;
59};
60
61class MjpegDataSocket : public aos::events::SocketConnection {
62 public:
63
64 MjpegDataSocket(aos::events::TCPServerBase *serv, int fd)
65 : aos::events::SocketConnection(serv, fd) {
66 SetEvents(EPOLLOUT | EPOLLET);
67 }
68
69 ~MjpegDataSocket() { printf("Closed connection on descriptor %d\n", fd()); }
70
71 void DirectEvent(uint32_t events) override {
72 if (events & EPOLLOUT) {
73 NewDataToSend();
74 events &= ~EPOLLOUT;
75 }
76 if (events) {
77 aos::events::EpollEvent::DirectEvent(events);
78 }
79 }
80
81 void ReadEvent() override {
82 // Ignore reads, but don't leave them pending.
83 ssize_t count;
84 char buf[512];
85 while (true) {
86 count = read(fd(), &buf, sizeof buf);
87 if (count <= 0) {
88 if (errno != EAGAIN) {
89 CloseConnection();
90 return;
91 }
92 break;
93 } else if (!ready_to_recieve_) {
94 // This 4 should match "\r\n\r\n".length();
95 if (match_i_ >= 4) {
96 printf("reading after last match\n");
97 continue;
98 }
99 for (char c : aos::vision::DataRef(&buf[0], count)) {
100 if (c == "\r\n\r\n"[match_i_]) {
101 ++match_i_;
102 if (match_i_ >= 4) {
103 if (!ready_to_recieve_) {
104 ready_to_recieve_ = true;
105 RasterHeader();
106 }
107 }
108 } else if (match_i_ != 0) {
109 if (c == '\r') match_i_ = 1;
110 }
111 }
112 }
113 }
114 }
115
116 void RasterHeader() {
117 output_buffer_.push_back(mjpg_header);
118 NewDataToSend();
119 }
120
121 void RasterFrame(std::shared_ptr<Frame> frame) {
122 if (!output_buffer_.empty() || !ready_to_recieve_) return;
123 sending_frame_ = frame;
124 aos::vision::DataRef data = frame->data;
125
126 size_t n_written = snprintf(data_header_tmp_, sizeof(data_header_tmp_),
127 "--boundary\r\n"\
128 "Content-type: image/jpg\r\n"\
129 "Content-Length: %zu\r\n\r\n", data.size());
130 // This should never happen because the buffer should be properly sized.
131 if (n_written == sizeof(data_header_tmp_)) {
132 fprintf(stderr, "wrong sized buffer\n");
133 exit(-1);
134 }
135 output_buffer_.push_back(aos::vision::DataRef(data_header_tmp_, n_written));
136 output_buffer_.push_back(data);
137 output_buffer_.push_back("\r\n\r\n");
138 NewDataToSend();
139 }
140
141 void NewFrame(std::shared_ptr<Frame> frame) {
142 RasterFrame(std::move(frame));
143 }
144
145 void NewDataToSend() {
146 while (!output_buffer_.empty()) {
147 auto& data = *output_buffer_.begin();
148
149 while (!data.empty()) {
150 int len = send(fd(), data.data(), data.size(), MSG_NOSIGNAL);
151 if (len == -1) {
152 if (errno == EAGAIN) {
153 // Next thinggy will pick this up.
154 return;
155 } else {
156 CloseConnection();
157 return;
158 }
159 } else {
160 data.remove_prefix(len);
161 }
162 }
163 output_buffer_.pop_front();
164 }
165 }
166
167 private:
168 char data_header_tmp_[512];
169 std::shared_ptr<Frame> sending_frame_;
170 std::deque<aos::vision::DataRef> output_buffer_;
171
172 bool ready_to_recieve_ = false;
173 void CloseConnection() {
174 loop()->Delete(this);
175 close(fd());
176 delete this;
177 }
178 size_t match_i_ = 0;
179};
180
181using namespace aos::vision;
182class CameraStream : public ImageStreamEvent {
183 public:
184 CameraStream(aos::vision::CameraParams params,
185 const std::string &fname, TCPServer<MjpegDataSocket>* tcp_serv)
186 : ImageStreamEvent(fname, params), tcp_serv_(tcp_serv),
187 log_("./logging/blob_record_", ".dat") {}
188
189 void ProcessImage(DataRef data, aos::monotonic_clock::time_point tp) {
190 (void)tp;
191 ++sampling;
192 // 20 is the sampling rate.
193 if (sampling == 20) {
194 int tmp_size = data.size() + sizeof(int32_t);
195 char *buf;
196 std::string log_record;
197 log_record.resize(tmp_size, 0);
198 {
199 buf = Int32Codec::Write(&log_record[0], tmp_size);
200 data.copy(buf, data.size());
201 }
202 log_.WriteLogEntry(log_record);
203 sampling = 0;
204 }
205
206 auto frame = std::make_shared<Frame>(Frame{std::string(data)});
207 tcp_serv_->Broadcast([frame](MjpegDataSocket* event) {
208 event->NewFrame(frame);
209 });
210 }
211 private:
212 int sampling = 0;
213 TCPServer<MjpegDataSocket>* tcp_serv_;
214 BlobLog log_;
215};
216
217int main() {
218 ::aos::logging::Init();
219 ::aos::logging::AddImplementation(
220 new ::aos::logging::StreamLogImplementation(stderr));
221
222 TCPServer<MjpegDataSocket> tcp_serv_(80);
223 aos::vision::CameraParams params;
Michael Schuh8e692452018-04-07 17:35:04 -0700224 params.set_exposure(100);
Parker Schuhc1975fc2018-04-07 15:27:07 -0700225 params.set_width(320);
226 params.set_height(240);
227 CameraStream camera(params, "/dev/video0", &tcp_serv_);
228
229 aos::events::EpollLoop loop;
230 loop.Add(&tcp_serv_);
231 loop.Add(&camera);
232
233 printf("Running Camera\n");
234 loop.Run();
235}