blob: cd83a4e44161035750793356342dc35b315e4c2b [file] [log] [blame]
Michael Schuh5a1a7582019-03-01 13:03:47 -08001#include "aos/vision/image/image_stream.h"
2
3#include <sys/stat.h>
4#include <deque>
5#include <fstream>
6#include <string>
7
8#include "aos/logging/implementations.h"
9#include "aos/logging/logging.h"
10#include "aos/vision/blob/codec.h"
11#include "aos/vision/events/socket_types.h"
12#include "aos/vision/events/udp.h"
13#include "aos/vision/image/reader.h"
14#include "gflags/gflags.h"
15#include "y2019/vision.pb.h"
16
17using ::aos::events::DataSocket;
18using ::aos::events::RXUdpSocket;
19using ::aos::events::TCPServer;
20using ::aos::vision::DataRef;
21using ::aos::vision::Int32Codec;
22using ::aos::monotonic_clock;
23using ::y2019::VisionControl;
24
25DEFINE_string(roborio_ip, "10.9.71.2", "RoboRIO IP Address");
26DEFINE_string(log, "",
27 "If non-empty, log images to the specified prefix with the image "
28 "index appended to the filename");
29DEFINE_bool(single_camera, true, "If true, only use video0");
30DEFINE_int32(camera0_exposure, 600, "Exposure for video0");
31DEFINE_int32(camera1_exposure, 600, "Exposure for video1");
32
33aos::vision::DataRef mjpg_header =
34 "HTTP/1.0 200 OK\r\n"
35 "Server: YourServerName\r\n"
36 "Connection: close\r\n"
37 "Max-Age: 0\r\n"
38 "Expires: 0\r\n"
39 "Cache-Control: no-cache, private\r\n"
40 "Pragma: no-cache\r\n"
41 "Content-Type: multipart/x-mixed-replace; "
42 "boundary=--boundary\r\n\r\n";
43
44struct Frame {
45 std::string data;
46};
47
48inline bool FileExist(const std::string &name) {
49 struct stat buffer;
50 return (stat(name.c_str(), &buffer) == 0);
51}
52
53class BlobLog {
54 public:
55 explicit BlobLog(const char *prefix, const char *extension) {
56 int index = 0;
57 while (true) {
58 std::string file = prefix + std::to_string(index) + extension;
59 if (FileExist(file)) {
60 index++;
61 } else {
62 printf("Logging to file (%s)\n", file.c_str());
63 ofst_.open(file);
64 assert(ofst_.is_open());
65 break;
66 }
67 }
68 }
69
70 ~BlobLog() { ofst_.close(); }
71
72 void WriteLogEntry(DataRef data) { ofst_.write(&data[0], data.size()); }
73
74 private:
75 std::ofstream ofst_;
76};
77
78class UdpClient : public ::aos::events::EpollEvent {
79 public:
80 UdpClient(int port, ::std::function<void(void *, size_t)> callback)
81 : ::aos::events::EpollEvent(RXUdpSocket::SocketBindListenOnPort(port)),
82 callback_(callback) {}
83
84 private:
85 ::std::function<void(void *, size_t)> callback_;
86
87 void ReadEvent() override {
88 char data[1024];
89 size_t received_data_size = Recv(data, sizeof(data));
90 callback_(data, received_data_size);
91 }
92
93 size_t Recv(void *data, int size) {
94 return PCHECK(recv(fd(), static_cast<char *>(data), size, 0));
95 }
96};
97
98// TODO(aschuh & michael) Pull this out.
99template <typename PB>
100class ProtoUdpClient : public UdpClient {
101 public:
102 ProtoUdpClient(int port, ::std::function<void(const PB &)> proto_callback)
103 : UdpClient(port, ::std::bind(&ProtoUdpClient::ReadData, this,
104 ::std::placeholders::_1,
105 ::std::placeholders::_2)),
106 proto_callback_(proto_callback) {}
107
108 private:
109 ::std::function<void(const PB &)> proto_callback_;
110
111 void ReadData(void *data, size_t size) {
112 PB pb;
113 pb.ParseFromArray(data, size);
114 proto_callback_(pb);
115 }
116};
117
118class MjpegDataSocket : public aos::events::SocketConnection {
119 public:
120 MjpegDataSocket(aos::events::TCPServerBase *server, int fd)
121 : aos::events::SocketConnection(server, fd) {
122 SetEvents(EPOLLOUT | EPOLLET);
123 }
124
125 ~MjpegDataSocket() { printf("Closed connection on descriptor %d\n", fd()); }
126
127 void DirectEvent(uint32_t events) override {
128 if (events & EPOLLOUT) {
129 NewDataToSend();
130 events &= ~EPOLLOUT;
131 }
132 // Other end hung up. Ditch the connection.
133 if (events & EPOLLHUP) {
134 CloseConnection();
135 events &= ~EPOLLHUP;
136 return;
137 }
138 if (events) {
139 aos::events::EpollEvent::DirectEvent(events);
140 }
141 }
142
143 void ReadEvent() override {
144 // Ignore reads, but don't leave them pending.
145 ssize_t count;
146 char buf[512];
147 while (true) {
148 count = read(fd(), &buf, sizeof buf);
149 if (count <= 0) {
150 if (errno != EAGAIN) {
151 CloseConnection();
152 return;
153 }
154 break;
155 } else if (!ready_to_receive_) {
156 // This 4 should match "\r\n\r\n".length();
157 if (match_i_ >= 4) {
158 printf("reading after last match\n");
159 continue;
160 }
161 for (char c : aos::vision::DataRef(&buf[0], count)) {
162 if (c == "\r\n\r\n"[match_i_]) {
163 ++match_i_;
164 if (match_i_ >= 4) {
165 if (!ready_to_receive_) {
166 ready_to_receive_ = true;
167 RasterHeader();
168 }
169 }
170 } else if (match_i_ != 0) {
171 if (c == '\r') match_i_ = 1;
172 }
173 }
174 }
175 }
176 }
177
178 void RasterHeader() {
179 output_buffer_.push_back(mjpg_header);
180 NewDataToSend();
181 }
182
183 void RasterFrame(std::shared_ptr<Frame> frame) {
184 if (!output_buffer_.empty() || !ready_to_receive_) return;
185 sending_frame_ = frame;
186 aos::vision::DataRef data = frame->data;
187
188 size_t n_written = snprintf(data_header_tmp_, sizeof(data_header_tmp_),
189 "--boundary\r\n"
190 "Content-type: image/jpg\r\n"
191 "Content-Length: %zu\r\n\r\n",
192 data.size());
193 // This should never happen because the buffer should be properly sized.
194 if (n_written == sizeof(data_header_tmp_)) {
195 fprintf(stderr, "wrong sized buffer\n");
196 exit(-1);
197 }
198 LOG(INFO, "Frame size in bytes: data.size() = %zu\n",data.size());
199 output_buffer_.push_back(aos::vision::DataRef(data_header_tmp_, n_written));
200 output_buffer_.push_back(data);
201 output_buffer_.push_back("\r\n\r\n");
202 NewDataToSend();
203 }
204
205 void NewFrame(std::shared_ptr<Frame> frame) { RasterFrame(std::move(frame)); }
206
207 void NewDataToSend() {
208 while (!output_buffer_.empty()) {
209 auto &data = *output_buffer_.begin();
210
211 while (!data.empty()) {
212 int len = send(fd(), data.data(), data.size(), MSG_NOSIGNAL);
213 if (len == -1) {
214 if (errno == EAGAIN) {
215 // Next thinggy will pick this up.
216 return;
217 } else {
218 CloseConnection();
219 return;
220 }
221 } else {
222 data.remove_prefix(len);
223 }
224 }
225 output_buffer_.pop_front();
226 }
227 }
228
229 private:
230 char data_header_tmp_[512];
231 std::shared_ptr<Frame> sending_frame_;
232 std::deque<aos::vision::DataRef> output_buffer_;
233
234 bool ready_to_receive_ = false;
235 void CloseConnection() {
236 loop()->Delete(this);
237 close(fd());
238 delete this;
239 }
240 size_t match_i_ = 0;
241};
242
243class CameraStream : public ::aos::vision::ImageStreamEvent {
244 public:
245 CameraStream(::aos::vision::CameraParams params, const ::std::string &fname,
246 TCPServer<MjpegDataSocket> *tcp_server, bool log,
247 ::std::function<void()> frame_callback)
248 : ImageStreamEvent(fname, params),
249 tcp_server_(tcp_server),
250 frame_callback_(frame_callback) {
251 if (log) {
252 log_.reset(new BlobLog(FLAGS_log.c_str(), ".dat"));
253 }
254 }
255
256 void set_active(bool active) { active_ = active; }
257
258 bool active() const { return active_; }
259
260 void ProcessImage(DataRef data,
261 monotonic_clock::time_point /*monotonic_now*/) {
262 ++sampling;
263 // 20 is the sampling rate.
264 if (sampling == 20) {
265 int tmp_size = data.size() + sizeof(int32_t);
266 char *buf;
267 std::string log_record;
268 log_record.resize(tmp_size, 0);
269 {
270 buf = Int32Codec::Write(&log_record[0], tmp_size);
271 data.copy(buf, data.size());
272 }
273 if (log_) {
274 log_->WriteLogEntry(log_record);
275 }
276 sampling = 0;
277 }
278
279 if (active_) {
280 auto frame = std::make_shared<Frame>(Frame{std::string(data)});
281 tcp_server_->Broadcast(
282 [frame](MjpegDataSocket *event) { event->NewFrame(frame); });
283 }
284 frame_callback_();
285 }
286
287 private:
288 int sampling = 0;
289 TCPServer<MjpegDataSocket> *tcp_server_;
290 ::std::unique_ptr<BlobLog> log_;
291 ::std::function<void()> frame_callback_;
292 bool active_ = false;
293};
294
295int main(int argc, char **argv) {
296 gflags::ParseCommandLineFlags(&argc, &argv, false);
297 ::aos::logging::Init();
298 ::aos::logging::AddImplementation(
299 new ::aos::logging::StreamLogImplementation(stderr));
300 TCPServer<MjpegDataSocket> tcp_server_(80);
301 aos::vision::CameraParams params0;
302 params0.set_exposure(FLAGS_camera0_exposure);
303 params0.set_brightness(-40);
304 params0.set_width(320);
305 // params0.set_fps(10);
306 params0.set_height(240);
307
308 aos::vision::CameraParams params1 = params0;
309 params1.set_exposure(FLAGS_camera1_exposure);
310
311 ::y2019::VisionStatus vision_status;
312 LOG(INFO,
313 "The UDP socket should be on port 5001 to 10.9.71.2 for "
314 "the competition robot.\n");
315 LOG(INFO, "Starting UDP socket on port 5001 to %s\n",
316 FLAGS_roborio_ip.c_str());
317 ::aos::events::ProtoTXUdpSocket<::y2019::VisionStatus> status_socket(
318 FLAGS_roborio_ip.c_str(), 5001);
319
320 ::std::unique_ptr<CameraStream> camera1;
321 ::std::unique_ptr<CameraStream> camera0(new CameraStream(
322 params0, "/dev/video0", &tcp_server_, !FLAGS_log.empty(),
323 [&camera0, &camera1, &status_socket, &vision_status]() {
324 vision_status.set_low_frame_count(vision_status.low_frame_count() + 1);
325 LOG(INFO, "Got a frame cam0\n");
326 if (camera0->active()) {
327 status_socket.Send(vision_status);
328 }
329 }));
330 if (!FLAGS_single_camera) {
331 camera1.reset(new CameraStream(
332 params1, "/dev/video1", &tcp_server_, false,
333 [&camera0, &camera1, &status_socket, &vision_status]() {
334 vision_status.set_high_frame_count(vision_status.high_frame_count() +
335 1);
336 LOG(INFO, "Got a frame cam1\n");
337 if (camera1->active()) {
338 status_socket.Send(vision_status);
339 }
340 }));
341 }
342
343 ProtoUdpClient<VisionControl> udp_client(
344 5000, [&camera0, &camera1](const VisionControl &vision_control) {
345 bool cam0_active = false;
346 if (camera1) {
347 cam0_active = !vision_control.high_video();
348 camera0->set_active(!vision_control.high_video());
349 camera1->set_active(vision_control.high_video());
350 } else {
351 cam0_active = true;
352 camera0->set_active(true);
353 }
354 LOG(INFO, "Got control packet, cam%d active\n", cam0_active ? 0 : 1);
355 });
356
357 // Default to camera0
358 camera0->set_active(true);
359 if (camera1) {
360 camera1->set_active(false);
361 }
362
363 aos::events::EpollLoop loop;
364 loop.Add(&tcp_server_);
365 loop.Add(camera0.get());
366 if (camera1) {
367 loop.Add(camera1.get());
368 }
369 loop.Add(&udp_client);
370
371 printf("Running Camera\n");
372 loop.Run();
373}