blob: ebcd0b2d62bc6a76b78165bb35970bc8966158d0 [file] [log] [blame]
Michael Schuh5a1a7582019-03-01 13:03:47 -08001#include "aos/vision/image/image_stream.h"
2
3#include <sys/stat.h>
4#include <deque>
5#include <fstream>
6#include <string>
7
8#include "aos/logging/implementations.h"
9#include "aos/logging/logging.h"
10#include "aos/vision/blob/codec.h"
11#include "aos/vision/events/socket_types.h"
12#include "aos/vision/events/udp.h"
13#include "aos/vision/image/reader.h"
14#include "gflags/gflags.h"
Tyler Chatowe0241452019-03-08 21:07:50 -080015#include "y2019/image_streamer/flip_image.h"
Michael Schuh5a1a7582019-03-01 13:03:47 -080016#include "y2019/vision.pb.h"
17
18using ::aos::events::DataSocket;
19using ::aos::events::RXUdpSocket;
20using ::aos::events::TCPServer;
21using ::aos::vision::DataRef;
22using ::aos::vision::Int32Codec;
23using ::aos::monotonic_clock;
24using ::y2019::VisionControl;
25
26DEFINE_string(roborio_ip, "10.9.71.2", "RoboRIO IP Address");
27DEFINE_string(log, "",
28 "If non-empty, log images to the specified prefix with the image "
29 "index appended to the filename");
30DEFINE_bool(single_camera, true, "If true, only use video0");
31DEFINE_int32(camera0_exposure, 600, "Exposure for video0");
32DEFINE_int32(camera1_exposure, 600, "Exposure for video1");
33
34aos::vision::DataRef mjpg_header =
35 "HTTP/1.0 200 OK\r\n"
36 "Server: YourServerName\r\n"
37 "Connection: close\r\n"
38 "Max-Age: 0\r\n"
39 "Expires: 0\r\n"
40 "Cache-Control: no-cache, private\r\n"
41 "Pragma: no-cache\r\n"
42 "Content-Type: multipart/x-mixed-replace; "
43 "boundary=--boundary\r\n\r\n";
44
45struct Frame {
46 std::string data;
47};
48
49inline bool FileExist(const std::string &name) {
50 struct stat buffer;
51 return (stat(name.c_str(), &buffer) == 0);
52}
53
54class BlobLog {
55 public:
56 explicit BlobLog(const char *prefix, const char *extension) {
57 int index = 0;
58 while (true) {
59 std::string file = prefix + std::to_string(index) + extension;
60 if (FileExist(file)) {
61 index++;
62 } else {
63 printf("Logging to file (%s)\n", file.c_str());
64 ofst_.open(file);
65 assert(ofst_.is_open());
66 break;
67 }
68 }
69 }
70
71 ~BlobLog() { ofst_.close(); }
72
73 void WriteLogEntry(DataRef data) { ofst_.write(&data[0], data.size()); }
74
75 private:
76 std::ofstream ofst_;
77};
78
79class UdpClient : public ::aos::events::EpollEvent {
80 public:
81 UdpClient(int port, ::std::function<void(void *, size_t)> callback)
82 : ::aos::events::EpollEvent(RXUdpSocket::SocketBindListenOnPort(port)),
83 callback_(callback) {}
84
85 private:
86 ::std::function<void(void *, size_t)> callback_;
87
88 void ReadEvent() override {
89 char data[1024];
90 size_t received_data_size = Recv(data, sizeof(data));
91 callback_(data, received_data_size);
92 }
93
94 size_t Recv(void *data, int size) {
95 return PCHECK(recv(fd(), static_cast<char *>(data), size, 0));
96 }
97};
98
99// TODO(aschuh & michael) Pull this out.
100template <typename PB>
101class ProtoUdpClient : public UdpClient {
102 public:
103 ProtoUdpClient(int port, ::std::function<void(const PB &)> proto_callback)
104 : UdpClient(port, ::std::bind(&ProtoUdpClient::ReadData, this,
105 ::std::placeholders::_1,
106 ::std::placeholders::_2)),
107 proto_callback_(proto_callback) {}
108
109 private:
110 ::std::function<void(const PB &)> proto_callback_;
111
112 void ReadData(void *data, size_t size) {
113 PB pb;
Brian Silverman05271722019-03-09 16:09:31 -0800114 // TODO(Brian): Do something useful if parsing fails.
Michael Schuh5a1a7582019-03-01 13:03:47 -0800115 pb.ParseFromArray(data, size);
116 proto_callback_(pb);
117 }
118};
119
120class MjpegDataSocket : public aos::events::SocketConnection {
121 public:
122 MjpegDataSocket(aos::events::TCPServerBase *server, int fd)
123 : aos::events::SocketConnection(server, fd) {
124 SetEvents(EPOLLOUT | EPOLLET);
125 }
126
127 ~MjpegDataSocket() { printf("Closed connection on descriptor %d\n", fd()); }
128
129 void DirectEvent(uint32_t events) override {
130 if (events & EPOLLOUT) {
131 NewDataToSend();
132 events &= ~EPOLLOUT;
133 }
134 // Other end hung up. Ditch the connection.
135 if (events & EPOLLHUP) {
136 CloseConnection();
137 events &= ~EPOLLHUP;
138 return;
139 }
140 if (events) {
141 aos::events::EpollEvent::DirectEvent(events);
142 }
143 }
144
145 void ReadEvent() override {
Michael Schuh5a1a7582019-03-01 13:03:47 -0800146 ssize_t count;
147 char buf[512];
148 while (true) {
Brian Silverman05271722019-03-09 16:09:31 -0800149 // Always read everything so epoll won't return immediately.
Michael Schuh5a1a7582019-03-01 13:03:47 -0800150 count = read(fd(), &buf, sizeof buf);
151 if (count <= 0) {
152 if (errno != EAGAIN) {
153 CloseConnection();
154 return;
155 }
156 break;
157 } else if (!ready_to_receive_) {
158 // This 4 should match "\r\n\r\n".length();
159 if (match_i_ >= 4) {
160 printf("reading after last match\n");
161 continue;
162 }
163 for (char c : aos::vision::DataRef(&buf[0], count)) {
164 if (c == "\r\n\r\n"[match_i_]) {
165 ++match_i_;
166 if (match_i_ >= 4) {
167 if (!ready_to_receive_) {
168 ready_to_receive_ = true;
169 RasterHeader();
170 }
171 }
172 } else if (match_i_ != 0) {
173 if (c == '\r') match_i_ = 1;
174 }
175 }
176 }
177 }
178 }
179
180 void RasterHeader() {
181 output_buffer_.push_back(mjpg_header);
182 NewDataToSend();
183 }
184
185 void RasterFrame(std::shared_ptr<Frame> frame) {
186 if (!output_buffer_.empty() || !ready_to_receive_) return;
187 sending_frame_ = frame;
188 aos::vision::DataRef data = frame->data;
189
190 size_t n_written = snprintf(data_header_tmp_, sizeof(data_header_tmp_),
191 "--boundary\r\n"
192 "Content-type: image/jpg\r\n"
193 "Content-Length: %zu\r\n\r\n",
194 data.size());
195 // This should never happen because the buffer should be properly sized.
196 if (n_written == sizeof(data_header_tmp_)) {
197 fprintf(stderr, "wrong sized buffer\n");
198 exit(-1);
199 }
Tyler Chatowe0241452019-03-08 21:07:50 -0800200 LOG(INFO, "Frame size in bytes: data.size() = %zu\n", data.size());
Michael Schuh5a1a7582019-03-01 13:03:47 -0800201 output_buffer_.push_back(aos::vision::DataRef(data_header_tmp_, n_written));
202 output_buffer_.push_back(data);
203 output_buffer_.push_back("\r\n\r\n");
204 NewDataToSend();
205 }
206
207 void NewFrame(std::shared_ptr<Frame> frame) { RasterFrame(std::move(frame)); }
208
209 void NewDataToSend() {
210 while (!output_buffer_.empty()) {
211 auto &data = *output_buffer_.begin();
212
213 while (!data.empty()) {
214 int len = send(fd(), data.data(), data.size(), MSG_NOSIGNAL);
215 if (len == -1) {
216 if (errno == EAGAIN) {
217 // Next thinggy will pick this up.
218 return;
219 } else {
220 CloseConnection();
221 return;
222 }
223 } else {
224 data.remove_prefix(len);
225 }
226 }
227 output_buffer_.pop_front();
228 }
229 }
230
231 private:
232 char data_header_tmp_[512];
233 std::shared_ptr<Frame> sending_frame_;
234 std::deque<aos::vision::DataRef> output_buffer_;
235
236 bool ready_to_receive_ = false;
237 void CloseConnection() {
238 loop()->Delete(this);
239 close(fd());
240 delete this;
241 }
242 size_t match_i_ = 0;
243};
244
245class CameraStream : public ::aos::vision::ImageStreamEvent {
246 public:
247 CameraStream(::aos::vision::CameraParams params, const ::std::string &fname,
248 TCPServer<MjpegDataSocket> *tcp_server, bool log,
249 ::std::function<void()> frame_callback)
250 : ImageStreamEvent(fname, params),
251 tcp_server_(tcp_server),
252 frame_callback_(frame_callback) {
253 if (log) {
254 log_.reset(new BlobLog(FLAGS_log.c_str(), ".dat"));
255 }
256 }
257
258 void set_active(bool active) { active_ = active; }
259
Tyler Chatowe0241452019-03-08 21:07:50 -0800260 void set_flip(bool flip) { flip_ = flip; }
261
Michael Schuh5a1a7582019-03-01 13:03:47 -0800262 bool active() const { return active_; }
263
264 void ProcessImage(DataRef data,
265 monotonic_clock::time_point /*monotonic_now*/) {
266 ++sampling;
267 // 20 is the sampling rate.
268 if (sampling == 20) {
269 int tmp_size = data.size() + sizeof(int32_t);
270 char *buf;
271 std::string log_record;
272 log_record.resize(tmp_size, 0);
273 {
274 buf = Int32Codec::Write(&log_record[0], tmp_size);
275 data.copy(buf, data.size());
276 }
277 if (log_) {
278 log_->WriteLogEntry(log_record);
279 }
280 sampling = 0;
281 }
282
Austin Schuh9c03a532019-03-17 18:14:45 -0700283 ::std::string image_out;
Tyler Chatowe0241452019-03-08 21:07:50 -0800284
Austin Schuh9c03a532019-03-17 18:14:45 -0700285 unsigned int out_size = image_buffer_out_.size();
286 flip_image(data.data(), data.size(), &image_buffer_out_[0], &out_size,
287 flip_);
288 image_out.assign(&image_buffer_out_[0], &image_buffer_out_[out_size]);
Tyler Chatowe0241452019-03-08 21:07:50 -0800289
Michael Schuh5a1a7582019-03-01 13:03:47 -0800290 if (active_) {
Tyler Chatowe0241452019-03-08 21:07:50 -0800291 auto frame = std::make_shared<Frame>(Frame{image_out});
Michael Schuh5a1a7582019-03-01 13:03:47 -0800292 tcp_server_->Broadcast(
293 [frame](MjpegDataSocket *event) { event->NewFrame(frame); });
294 }
295 frame_callback_();
296 }
297
298 private:
299 int sampling = 0;
300 TCPServer<MjpegDataSocket> *tcp_server_;
301 ::std::unique_ptr<BlobLog> log_;
302 ::std::function<void()> frame_callback_;
303 bool active_ = false;
Tyler Chatowe0241452019-03-08 21:07:50 -0800304 bool flip_ = false;
305 std::array<JOCTET, 100000> image_buffer_out_;
Michael Schuh5a1a7582019-03-01 13:03:47 -0800306};
307
308int main(int argc, char **argv) {
309 gflags::ParseCommandLineFlags(&argc, &argv, false);
310 ::aos::logging::Init();
311 ::aos::logging::AddImplementation(
312 new ::aos::logging::StreamLogImplementation(stderr));
313 TCPServer<MjpegDataSocket> tcp_server_(80);
314 aos::vision::CameraParams params0;
315 params0.set_exposure(FLAGS_camera0_exposure);
316 params0.set_brightness(-40);
317 params0.set_width(320);
318 // params0.set_fps(10);
319 params0.set_height(240);
320
321 aos::vision::CameraParams params1 = params0;
322 params1.set_exposure(FLAGS_camera1_exposure);
323
324 ::y2019::VisionStatus vision_status;
325 LOG(INFO,
326 "The UDP socket should be on port 5001 to 10.9.71.2 for "
327 "the competition robot.\n");
328 LOG(INFO, "Starting UDP socket on port 5001 to %s\n",
329 FLAGS_roborio_ip.c_str());
330 ::aos::events::ProtoTXUdpSocket<::y2019::VisionStatus> status_socket(
331 FLAGS_roborio_ip.c_str(), 5001);
332
333 ::std::unique_ptr<CameraStream> camera1;
334 ::std::unique_ptr<CameraStream> camera0(new CameraStream(
335 params0, "/dev/video0", &tcp_server_, !FLAGS_log.empty(),
Brian Silverman05271722019-03-09 16:09:31 -0800336 [&camera0, &status_socket, &vision_status]() {
Michael Schuh5a1a7582019-03-01 13:03:47 -0800337 vision_status.set_low_frame_count(vision_status.low_frame_count() + 1);
338 LOG(INFO, "Got a frame cam0\n");
339 if (camera0->active()) {
340 status_socket.Send(vision_status);
341 }
342 }));
343 if (!FLAGS_single_camera) {
344 camera1.reset(new CameraStream(
345 params1, "/dev/video1", &tcp_server_, false,
Brian Silverman05271722019-03-09 16:09:31 -0800346 [&camera1, &status_socket, &vision_status]() {
Michael Schuh5a1a7582019-03-01 13:03:47 -0800347 vision_status.set_high_frame_count(vision_status.high_frame_count() +
348 1);
349 LOG(INFO, "Got a frame cam1\n");
350 if (camera1->active()) {
351 status_socket.Send(vision_status);
352 }
353 }));
354 }
355
356 ProtoUdpClient<VisionControl> udp_client(
357 5000, [&camera0, &camera1](const VisionControl &vision_control) {
358 bool cam0_active = false;
Tyler Chatowe0241452019-03-08 21:07:50 -0800359 camera0->set_flip(vision_control.flip_image());
Michael Schuh5a1a7582019-03-01 13:03:47 -0800360 if (camera1) {
Tyler Chatowe0241452019-03-08 21:07:50 -0800361 camera1->set_flip(vision_control.flip_image());
Michael Schuh5a1a7582019-03-01 13:03:47 -0800362 cam0_active = !vision_control.high_video();
363 camera0->set_active(!vision_control.high_video());
364 camera1->set_active(vision_control.high_video());
365 } else {
366 cam0_active = true;
367 camera0->set_active(true);
368 }
369 LOG(INFO, "Got control packet, cam%d active\n", cam0_active ? 0 : 1);
370 });
371
372 // Default to camera0
373 camera0->set_active(true);
374 if (camera1) {
375 camera1->set_active(false);
376 }
377
378 aos::events::EpollLoop loop;
379 loop.Add(&tcp_server_);
380 loop.Add(camera0.get());
381 if (camera1) {
382 loop.Add(camera1.get());
383 }
384 loop.Add(&udp_client);
385
386 printf("Running Camera\n");
387 loop.Run();
388}