blob: dea6a77a6f555b5da9b8f48befcff08c5432fdeb [file] [log] [blame]
Parker Schuhc1975fc2018-04-07 15:27:07 -07001#include "aos/vision/image/image_stream.h"
Austin Schuh8d5fff42018-05-30 20:44:12 -07002
3#include <sys/stat.h>
4#include <deque>
5#include <fstream>
6#include <string>
7
John Park33858a32018-09-28 23:05:48 -07008#include "aos/logging/implementations.h"
9#include "aos/logging/logging.h"
Parker Schuhc1975fc2018-04-07 15:27:07 -070010#include "aos/vision/blob/codec.h"
Austin Schuh8d5fff42018-05-30 20:44:12 -070011#include "aos/vision/events/socket_types.h"
12#include "aos/vision/events/udp.h"
13#include "aos/vision/image/reader.h"
14#include "third_party/gflags/include/gflags/gflags.h"
15#include "y2018/vision.pb.h"
Parker Schuhc1975fc2018-04-07 15:27:07 -070016
Austin Schuh8d5fff42018-05-30 20:44:12 -070017using ::aos::events::DataSocket;
18using ::aos::events::RXUdpSocket;
19using ::aos::events::TCPServer;
20using ::aos::vision::DataRef;
21using ::aos::vision::Int32Codec;
22using ::aos::monotonic_clock;
23using ::y2018::VisionControl;
Parker Schuhc1975fc2018-04-07 15:27:07 -070024
Austin Schuh8d5fff42018-05-30 20:44:12 -070025DEFINE_bool(single_camera, true, "If true, only use video0");
26DEFINE_int32(camera0_exposure, 300, "Exposure for video0");
27DEFINE_int32(camera1_exposure, 300, "Exposure for video1");
28
29aos::vision::DataRef mjpg_header =
30 "HTTP/1.0 200 OK\r\n"
31 "Server: YourServerName\r\n"
32 "Connection: close\r\n"
33 "Max-Age: 0\r\n"
34 "Expires: 0\r\n"
35 "Cache-Control: no-cache, private\r\n"
36 "Pragma: no-cache\r\n"
37 "Content-Type: multipart/x-mixed-replace; "
38 "boundary=--boundary\r\n\r\n";
Parker Schuhc1975fc2018-04-07 15:27:07 -070039
40struct Frame {
41 std::string data;
42};
43
44inline bool FileExist(const std::string &name) {
45 struct stat buffer;
46 return (stat(name.c_str(), &buffer) == 0);
47}
48
49class BlobLog {
50 public:
51 explicit BlobLog(const char *prefix, const char *extension) {
52 int index = 0;
53 while (true) {
54 std::string file = prefix + std::to_string(index) + extension;
55 if (FileExist(file)) {
56 index++;
57 } else {
58 printf("Logging to file (%s)\n", file.c_str());
59 ofst_.open(file);
60 assert(ofst_.is_open());
61 break;
62 }
63 }
64 }
65
66 ~BlobLog() { ofst_.close(); }
67
68 void WriteLogEntry(DataRef data) { ofst_.write(&data[0], data.size()); }
69
70 private:
71 std::ofstream ofst_;
72};
73
Austin Schuh8d5fff42018-05-30 20:44:12 -070074class UdpClient : public ::aos::events::EpollEvent {
75 public:
76 UdpClient(int port, ::std::function<void(void *, size_t)> callback)
77 : ::aos::events::EpollEvent(RXUdpSocket::SocketBindListenOnPort(port)),
78 callback_(callback) {}
79
80 private:
81 ::std::function<void(void *, size_t)> callback_;
82
83 void ReadEvent() override {
84 char data[1024];
85 size_t received_data_size = Recv(data, sizeof(data));
86 callback_(data, received_data_size);
87 }
88
89 size_t Recv(void *data, int size) {
90 return PCHECK(recv(fd(), static_cast<char *>(data), size, 0));
91 }
92};
93
94template <typename PB>
95class ProtoUdpClient : public UdpClient {
96 public:
97 ProtoUdpClient(int port, ::std::function<void(const PB &)> proto_callback)
98 : UdpClient(port, ::std::bind(&ProtoUdpClient::ReadData, this,
99 ::std::placeholders::_1,
100 ::std::placeholders::_2)),
101 proto_callback_(proto_callback) {}
102
103 private:
104 ::std::function<void(const PB &)> proto_callback_;
105
106 void ReadData(void *data, size_t size) {
107 PB pb;
108 pb.ParseFromArray(data, size);
109 proto_callback_(pb);
110 }
111};
112
Parker Schuhc1975fc2018-04-07 15:27:07 -0700113class MjpegDataSocket : public aos::events::SocketConnection {
114 public:
Austin Schuh8d5fff42018-05-30 20:44:12 -0700115 MjpegDataSocket(aos::events::TCPServerBase *server, int fd)
116 : aos::events::SocketConnection(server, fd) {
Parker Schuhc1975fc2018-04-07 15:27:07 -0700117 SetEvents(EPOLLOUT | EPOLLET);
118 }
119
120 ~MjpegDataSocket() { printf("Closed connection on descriptor %d\n", fd()); }
121
122 void DirectEvent(uint32_t events) override {
123 if (events & EPOLLOUT) {
124 NewDataToSend();
125 events &= ~EPOLLOUT;
126 }
Austin Schuh8d5fff42018-05-30 20:44:12 -0700127 // Other end hung up. Ditch the connection.
128 if (events & EPOLLHUP) {
129 CloseConnection();
130 events &= ~EPOLLHUP;
131 return;
132 }
Parker Schuhc1975fc2018-04-07 15:27:07 -0700133 if (events) {
134 aos::events::EpollEvent::DirectEvent(events);
135 }
136 }
137
138 void ReadEvent() override {
139 // Ignore reads, but don't leave them pending.
140 ssize_t count;
141 char buf[512];
142 while (true) {
143 count = read(fd(), &buf, sizeof buf);
144 if (count <= 0) {
145 if (errno != EAGAIN) {
146 CloseConnection();
147 return;
148 }
149 break;
150 } else if (!ready_to_recieve_) {
151 // This 4 should match "\r\n\r\n".length();
152 if (match_i_ >= 4) {
153 printf("reading after last match\n");
154 continue;
155 }
156 for (char c : aos::vision::DataRef(&buf[0], count)) {
157 if (c == "\r\n\r\n"[match_i_]) {
158 ++match_i_;
159 if (match_i_ >= 4) {
160 if (!ready_to_recieve_) {
161 ready_to_recieve_ = true;
162 RasterHeader();
163 }
164 }
165 } else if (match_i_ != 0) {
166 if (c == '\r') match_i_ = 1;
167 }
168 }
169 }
170 }
171 }
172
173 void RasterHeader() {
174 output_buffer_.push_back(mjpg_header);
175 NewDataToSend();
176 }
177
178 void RasterFrame(std::shared_ptr<Frame> frame) {
179 if (!output_buffer_.empty() || !ready_to_recieve_) return;
180 sending_frame_ = frame;
181 aos::vision::DataRef data = frame->data;
182
183 size_t n_written = snprintf(data_header_tmp_, sizeof(data_header_tmp_),
Austin Schuh8d5fff42018-05-30 20:44:12 -0700184 "--boundary\r\n"
185 "Content-type: image/jpg\r\n"
186 "Content-Length: %zu\r\n\r\n",
187 data.size());
Parker Schuhc1975fc2018-04-07 15:27:07 -0700188 // This should never happen because the buffer should be properly sized.
189 if (n_written == sizeof(data_header_tmp_)) {
190 fprintf(stderr, "wrong sized buffer\n");
191 exit(-1);
192 }
193 output_buffer_.push_back(aos::vision::DataRef(data_header_tmp_, n_written));
194 output_buffer_.push_back(data);
195 output_buffer_.push_back("\r\n\r\n");
196 NewDataToSend();
197 }
198
Austin Schuh8d5fff42018-05-30 20:44:12 -0700199 void NewFrame(std::shared_ptr<Frame> frame) { RasterFrame(std::move(frame)); }
Parker Schuhc1975fc2018-04-07 15:27:07 -0700200
201 void NewDataToSend() {
202 while (!output_buffer_.empty()) {
Austin Schuh8d5fff42018-05-30 20:44:12 -0700203 auto &data = *output_buffer_.begin();
Parker Schuhc1975fc2018-04-07 15:27:07 -0700204
205 while (!data.empty()) {
206 int len = send(fd(), data.data(), data.size(), MSG_NOSIGNAL);
207 if (len == -1) {
208 if (errno == EAGAIN) {
209 // Next thinggy will pick this up.
210 return;
211 } else {
212 CloseConnection();
213 return;
214 }
215 } else {
216 data.remove_prefix(len);
217 }
218 }
219 output_buffer_.pop_front();
220 }
221 }
222
223 private:
224 char data_header_tmp_[512];
225 std::shared_ptr<Frame> sending_frame_;
226 std::deque<aos::vision::DataRef> output_buffer_;
227
228 bool ready_to_recieve_ = false;
229 void CloseConnection() {
230 loop()->Delete(this);
231 close(fd());
232 delete this;
233 }
234 size_t match_i_ = 0;
235};
236
Austin Schuh8d5fff42018-05-30 20:44:12 -0700237class CameraStream : public ::aos::vision::ImageStreamEvent {
Parker Schuhc1975fc2018-04-07 15:27:07 -0700238 public:
Austin Schuh8d5fff42018-05-30 20:44:12 -0700239 CameraStream(::aos::vision::CameraParams params, const ::std::string &fname,
240 TCPServer<MjpegDataSocket> *tcp_server, bool log,
241 ::std::function<void()> frame_callback)
242 : ImageStreamEvent(fname, params),
243 tcp_server_(tcp_server),
244 frame_callback_(frame_callback) {
245 if (log) {
246 log_.reset(new BlobLog("./logging/blob_record_", ".dat"));
247 }
248 }
Parker Schuhc1975fc2018-04-07 15:27:07 -0700249
Austin Schuh8d5fff42018-05-30 20:44:12 -0700250 void set_active(bool active) { active_ = active; }
251
252 bool active() const { return active_; }
253
254 void ProcessImage(DataRef data,
255 monotonic_clock::time_point /*monotonic_now*/) {
Parker Schuhc1975fc2018-04-07 15:27:07 -0700256 ++sampling;
257 // 20 is the sampling rate.
258 if (sampling == 20) {
259 int tmp_size = data.size() + sizeof(int32_t);
260 char *buf;
261 std::string log_record;
262 log_record.resize(tmp_size, 0);
263 {
264 buf = Int32Codec::Write(&log_record[0], tmp_size);
265 data.copy(buf, data.size());
266 }
Austin Schuh8d5fff42018-05-30 20:44:12 -0700267 if (log_) {
268 log_->WriteLogEntry(log_record);
269 }
Parker Schuhc1975fc2018-04-07 15:27:07 -0700270 sampling = 0;
271 }
272
Austin Schuh8d5fff42018-05-30 20:44:12 -0700273 if (active_) {
274 auto frame = std::make_shared<Frame>(Frame{std::string(data)});
275 tcp_server_->Broadcast(
276 [frame](MjpegDataSocket *event) { event->NewFrame(frame); });
277 }
278 frame_callback_();
Parker Schuhc1975fc2018-04-07 15:27:07 -0700279 }
Austin Schuh8d5fff42018-05-30 20:44:12 -0700280
Parker Schuhc1975fc2018-04-07 15:27:07 -0700281 private:
282 int sampling = 0;
Austin Schuh8d5fff42018-05-30 20:44:12 -0700283 TCPServer<MjpegDataSocket> *tcp_server_;
284 ::std::unique_ptr<BlobLog> log_;
285 ::std::function<void()> frame_callback_;
286 bool active_ = false;
Parker Schuhc1975fc2018-04-07 15:27:07 -0700287};
288
Austin Schuh8d5fff42018-05-30 20:44:12 -0700289int main(int argc, char ** argv) {
290 gflags::ParseCommandLineFlags(&argc, &argv, false);
Parker Schuhc1975fc2018-04-07 15:27:07 -0700291 ::aos::logging::Init();
292 ::aos::logging::AddImplementation(
293 new ::aos::logging::StreamLogImplementation(stderr));
294
Austin Schuh8d5fff42018-05-30 20:44:12 -0700295 TCPServer<MjpegDataSocket> tcp_server_(80);
296 aos::vision::CameraParams params0;
297 params0.set_exposure(FLAGS_camera0_exposure);
298 params0.set_brightness(-40);
299 params0.set_width(320);
300 //params0.set_fps(10);
301 params0.set_height(240);
302
303 aos::vision::CameraParams params1 = params0;
304 params1.set_exposure(FLAGS_camera1_exposure);
305
306 ::y2018::VisionStatus vision_status;
307 ::aos::events::ProtoTXUdpSocket<::y2018::VisionStatus> status_socket(
308 "10.9.71.2", 5001);
309
310 ::std::unique_ptr<CameraStream> camera1;
311 ::std::unique_ptr<CameraStream> camera0(new CameraStream(
312 params0, "/dev/video0", &tcp_server_, true,
313 [&camera0, &camera1, &status_socket, &vision_status]() {
314 vision_status.set_low_frame_count(vision_status.low_frame_count() + 1);
315 LOG(INFO, "Got a frame cam0\n");
316 if (camera0->active()) {
317 status_socket.Send(vision_status);
318 }
319 }));
320 if (!FLAGS_single_camera) {
321 camera1.reset(new CameraStream(
322 // params,
323 // "/dev/v4l/by-path/platform-tegra-xhci-usb-0:3.1:1.0-video-index0",
324 params1, "/dev/video1", &tcp_server_, false,
325 [&camera0, &camera1, &status_socket, &vision_status]() {
326 vision_status.set_high_frame_count(vision_status.high_frame_count() +
327 1);
328 LOG(INFO, "Got a frame cam1\n");
329 if (camera1->active()) {
330 status_socket.Send(vision_status);
331 }
332 }));
333 }
334
335 ProtoUdpClient<VisionControl> udp_client(
336 5000, [&camera0, &camera1](const VisionControl &vision_control) {
Austin Schuha8de4a62018-09-03 18:04:28 -0700337 bool cam0_active = false;
Austin Schuh8d5fff42018-05-30 20:44:12 -0700338 if (camera1) {
Austin Schuha8de4a62018-09-03 18:04:28 -0700339 cam0_active = !vision_control.high_video();
Austin Schuh8d5fff42018-05-30 20:44:12 -0700340 camera0->set_active(!vision_control.high_video());
341 camera1->set_active(vision_control.high_video());
342 } else {
Austin Schuha8de4a62018-09-03 18:04:28 -0700343 cam0_active = true;
Austin Schuh8d5fff42018-05-30 20:44:12 -0700344 camera0->set_active(true);
345 }
Austin Schuha8de4a62018-09-03 18:04:28 -0700346 LOG(INFO, "Got control packet, cam%d active\n", cam0_active ? 0 : 1);
Austin Schuh8d5fff42018-05-30 20:44:12 -0700347 });
348
349 // Default to camera0
350 camera0->set_active(true);
351 if (camera1) {
352 camera1->set_active(false);
353 }
Parker Schuhc1975fc2018-04-07 15:27:07 -0700354
355 aos::events::EpollLoop loop;
Austin Schuh8d5fff42018-05-30 20:44:12 -0700356 loop.Add(&tcp_server_);
357 loop.Add(camera0.get());
358 if (camera1) {
359 loop.Add(camera1.get());
360 }
361 loop.Add(&udp_client);
Parker Schuhc1975fc2018-04-07 15:27:07 -0700362
363 printf("Running Camera\n");
364 loop.Run();
365}