blob: 81f2c6c29f6a0fb52c4d756ea236fa8b94a4ab03 [file] [log] [blame]
Austin Schuh8d5fff42018-05-30 20:44:12 -07001#include <sys/stat.h>
Philipp Schrader790cb542023-07-05 21:06:52 -07002
Austin Schuh8d5fff42018-05-30 20:44:12 -07003#include <deque>
4#include <fstream>
5#include <string>
6
Austin Schuh99f7c6a2024-06-25 22:07:44 -07007#include "absl/flags/flag.h"
Philipp Schrader790cb542023-07-05 21:06:52 -07008
Austin Schuh99f7c6a2024-06-25 22:07:44 -07009#include "aos/init.h"
John Park33858a32018-09-28 23:05:48 -070010#include "aos/logging/implementations.h"
11#include "aos/logging/logging.h"
Parker Schuhc1975fc2018-04-07 15:27:07 -070012#include "aos/vision/blob/codec.h"
Austin Schuh8d5fff42018-05-30 20:44:12 -070013#include "aos/vision/events/socket_types.h"
14#include "aos/vision/events/udp.h"
Philipp Schrader790cb542023-07-05 21:06:52 -070015#include "aos/vision/image/image_stream.h"
Austin Schuh8d5fff42018-05-30 20:44:12 -070016#include "aos/vision/image/reader.h"
Austin Schuh8d5fff42018-05-30 20:44:12 -070017#include "y2018/vision.pb.h"
Parker Schuhc1975fc2018-04-07 15:27:07 -070018
Philipp Schrader790cb542023-07-05 21:06:52 -070019using ::aos::monotonic_clock;
Austin Schuh8d5fff42018-05-30 20:44:12 -070020using ::aos::events::DataSocket;
21using ::aos::events::RXUdpSocket;
22using ::aos::events::TCPServer;
23using ::aos::vision::DataRef;
24using ::aos::vision::Int32Codec;
Austin Schuh8d5fff42018-05-30 20:44:12 -070025using ::y2018::VisionControl;
Parker Schuhc1975fc2018-04-07 15:27:07 -070026
Austin Schuh99f7c6a2024-06-25 22:07:44 -070027ABSL_FLAG(bool, single_camera, true, "If true, only use video0");
28ABSL_FLAG(int32_t, camera0_exposure, 300, "Exposure for video0");
29ABSL_FLAG(int32_t, camera1_exposure, 300, "Exposure for video1");
Austin Schuh8d5fff42018-05-30 20:44:12 -070030
31aos::vision::DataRef mjpg_header =
32 "HTTP/1.0 200 OK\r\n"
33 "Server: YourServerName\r\n"
34 "Connection: close\r\n"
35 "Max-Age: 0\r\n"
36 "Expires: 0\r\n"
37 "Cache-Control: no-cache, private\r\n"
38 "Pragma: no-cache\r\n"
39 "Content-Type: multipart/x-mixed-replace; "
40 "boundary=--boundary\r\n\r\n";
Parker Schuhc1975fc2018-04-07 15:27:07 -070041
42struct Frame {
43 std::string data;
44};
45
46inline bool FileExist(const std::string &name) {
47 struct stat buffer;
48 return (stat(name.c_str(), &buffer) == 0);
49}
50
51class BlobLog {
52 public:
53 explicit BlobLog(const char *prefix, const char *extension) {
54 int index = 0;
55 while (true) {
56 std::string file = prefix + std::to_string(index) + extension;
57 if (FileExist(file)) {
58 index++;
59 } else {
60 printf("Logging to file (%s)\n", file.c_str());
61 ofst_.open(file);
62 assert(ofst_.is_open());
63 break;
64 }
65 }
66 }
67
68 ~BlobLog() { ofst_.close(); }
69
70 void WriteLogEntry(DataRef data) { ofst_.write(&data[0], data.size()); }
71
72 private:
73 std::ofstream ofst_;
74};
75
Austin Schuh8d5fff42018-05-30 20:44:12 -070076class UdpClient : public ::aos::events::EpollEvent {
77 public:
78 UdpClient(int port, ::std::function<void(void *, size_t)> callback)
79 : ::aos::events::EpollEvent(RXUdpSocket::SocketBindListenOnPort(port)),
80 callback_(callback) {}
81
82 private:
83 ::std::function<void(void *, size_t)> callback_;
84
85 void ReadEvent() override {
86 char data[1024];
87 size_t received_data_size = Recv(data, sizeof(data));
88 callback_(data, received_data_size);
89 }
90
91 size_t Recv(void *data, int size) {
Austin Schuhf257f3c2019-10-27 21:00:43 -070092 return AOS_PCHECK(recv(fd(), static_cast<char *>(data), size, 0));
Austin Schuh8d5fff42018-05-30 20:44:12 -070093 }
94};
95
96template <typename PB>
97class ProtoUdpClient : public UdpClient {
98 public:
99 ProtoUdpClient(int port, ::std::function<void(const PB &)> proto_callback)
100 : UdpClient(port, ::std::bind(&ProtoUdpClient::ReadData, this,
101 ::std::placeholders::_1,
102 ::std::placeholders::_2)),
103 proto_callback_(proto_callback) {}
104
105 private:
106 ::std::function<void(const PB &)> proto_callback_;
107
108 void ReadData(void *data, size_t size) {
109 PB pb;
110 pb.ParseFromArray(data, size);
111 proto_callback_(pb);
112 }
113};
114
Parker Schuhc1975fc2018-04-07 15:27:07 -0700115class MjpegDataSocket : public aos::events::SocketConnection {
116 public:
Austin Schuh8d5fff42018-05-30 20:44:12 -0700117 MjpegDataSocket(aos::events::TCPServerBase *server, int fd)
118 : aos::events::SocketConnection(server, fd) {
Parker Schuhc1975fc2018-04-07 15:27:07 -0700119 SetEvents(EPOLLOUT | EPOLLET);
120 }
121
122 ~MjpegDataSocket() { printf("Closed connection on descriptor %d\n", fd()); }
123
124 void DirectEvent(uint32_t events) override {
125 if (events & EPOLLOUT) {
126 NewDataToSend();
127 events &= ~EPOLLOUT;
128 }
Austin Schuh8d5fff42018-05-30 20:44:12 -0700129 // Other end hung up. Ditch the connection.
130 if (events & EPOLLHUP) {
131 CloseConnection();
132 events &= ~EPOLLHUP;
133 return;
134 }
Parker Schuhc1975fc2018-04-07 15:27:07 -0700135 if (events) {
136 aos::events::EpollEvent::DirectEvent(events);
137 }
138 }
139
140 void ReadEvent() override {
141 // Ignore reads, but don't leave them pending.
142 ssize_t count;
143 char buf[512];
144 while (true) {
145 count = read(fd(), &buf, sizeof buf);
146 if (count <= 0) {
147 if (errno != EAGAIN) {
148 CloseConnection();
149 return;
150 }
151 break;
152 } else if (!ready_to_recieve_) {
153 // This 4 should match "\r\n\r\n".length();
154 if (match_i_ >= 4) {
155 printf("reading after last match\n");
156 continue;
157 }
158 for (char c : aos::vision::DataRef(&buf[0], count)) {
159 if (c == "\r\n\r\n"[match_i_]) {
160 ++match_i_;
161 if (match_i_ >= 4) {
162 if (!ready_to_recieve_) {
163 ready_to_recieve_ = true;
164 RasterHeader();
165 }
166 }
167 } else if (match_i_ != 0) {
168 if (c == '\r') match_i_ = 1;
169 }
170 }
171 }
172 }
173 }
174
175 void RasterHeader() {
176 output_buffer_.push_back(mjpg_header);
177 NewDataToSend();
178 }
179
180 void RasterFrame(std::shared_ptr<Frame> frame) {
181 if (!output_buffer_.empty() || !ready_to_recieve_) return;
182 sending_frame_ = frame;
183 aos::vision::DataRef data = frame->data;
184
185 size_t n_written = snprintf(data_header_tmp_, sizeof(data_header_tmp_),
Austin Schuh8d5fff42018-05-30 20:44:12 -0700186 "--boundary\r\n"
187 "Content-type: image/jpg\r\n"
188 "Content-Length: %zu\r\n\r\n",
189 data.size());
Parker Schuhc1975fc2018-04-07 15:27:07 -0700190 // This should never happen because the buffer should be properly sized.
191 if (n_written == sizeof(data_header_tmp_)) {
192 fprintf(stderr, "wrong sized buffer\n");
193 exit(-1);
194 }
195 output_buffer_.push_back(aos::vision::DataRef(data_header_tmp_, n_written));
196 output_buffer_.push_back(data);
197 output_buffer_.push_back("\r\n\r\n");
198 NewDataToSend();
199 }
200
Austin Schuh8d5fff42018-05-30 20:44:12 -0700201 void NewFrame(std::shared_ptr<Frame> frame) { RasterFrame(std::move(frame)); }
Parker Schuhc1975fc2018-04-07 15:27:07 -0700202
203 void NewDataToSend() {
204 while (!output_buffer_.empty()) {
Austin Schuh8d5fff42018-05-30 20:44:12 -0700205 auto &data = *output_buffer_.begin();
Parker Schuhc1975fc2018-04-07 15:27:07 -0700206
207 while (!data.empty()) {
208 int len = send(fd(), data.data(), data.size(), MSG_NOSIGNAL);
209 if (len == -1) {
210 if (errno == EAGAIN) {
211 // Next thinggy will pick this up.
212 return;
213 } else {
214 CloseConnection();
215 return;
216 }
217 } else {
218 data.remove_prefix(len);
219 }
220 }
221 output_buffer_.pop_front();
222 }
223 }
224
225 private:
226 char data_header_tmp_[512];
227 std::shared_ptr<Frame> sending_frame_;
228 std::deque<aos::vision::DataRef> output_buffer_;
229
230 bool ready_to_recieve_ = false;
231 void CloseConnection() {
232 loop()->Delete(this);
233 close(fd());
234 delete this;
235 }
236 size_t match_i_ = 0;
237};
238
Austin Schuh8d5fff42018-05-30 20:44:12 -0700239class CameraStream : public ::aos::vision::ImageStreamEvent {
Parker Schuhc1975fc2018-04-07 15:27:07 -0700240 public:
Austin Schuh8d5fff42018-05-30 20:44:12 -0700241 CameraStream(::aos::vision::CameraParams params, const ::std::string &fname,
242 TCPServer<MjpegDataSocket> *tcp_server, bool log,
243 ::std::function<void()> frame_callback)
244 : ImageStreamEvent(fname, params),
245 tcp_server_(tcp_server),
246 frame_callback_(frame_callback) {
247 if (log) {
248 log_.reset(new BlobLog("./logging/blob_record_", ".dat"));
249 }
250 }
Parker Schuhc1975fc2018-04-07 15:27:07 -0700251
Austin Schuh8d5fff42018-05-30 20:44:12 -0700252 void set_active(bool active) { active_ = active; }
253
254 bool active() const { return active_; }
255
256 void ProcessImage(DataRef data,
257 monotonic_clock::time_point /*monotonic_now*/) {
Parker Schuhc1975fc2018-04-07 15:27:07 -0700258 ++sampling;
259 // 20 is the sampling rate.
260 if (sampling == 20) {
261 int tmp_size = data.size() + sizeof(int32_t);
262 char *buf;
263 std::string log_record;
264 log_record.resize(tmp_size, 0);
265 {
266 buf = Int32Codec::Write(&log_record[0], tmp_size);
267 data.copy(buf, data.size());
268 }
Austin Schuh8d5fff42018-05-30 20:44:12 -0700269 if (log_) {
270 log_->WriteLogEntry(log_record);
271 }
Parker Schuhc1975fc2018-04-07 15:27:07 -0700272 sampling = 0;
273 }
274
Austin Schuh8d5fff42018-05-30 20:44:12 -0700275 if (active_) {
276 auto frame = std::make_shared<Frame>(Frame{std::string(data)});
277 tcp_server_->Broadcast(
278 [frame](MjpegDataSocket *event) { event->NewFrame(frame); });
279 }
280 frame_callback_();
Parker Schuhc1975fc2018-04-07 15:27:07 -0700281 }
Austin Schuh8d5fff42018-05-30 20:44:12 -0700282
Parker Schuhc1975fc2018-04-07 15:27:07 -0700283 private:
284 int sampling = 0;
Austin Schuh8d5fff42018-05-30 20:44:12 -0700285 TCPServer<MjpegDataSocket> *tcp_server_;
286 ::std::unique_ptr<BlobLog> log_;
287 ::std::function<void()> frame_callback_;
288 bool active_ = false;
Parker Schuhc1975fc2018-04-07 15:27:07 -0700289};
290
Philipp Schrader790cb542023-07-05 21:06:52 -0700291int main(int argc, char **argv) {
Austin Schuh99f7c6a2024-06-25 22:07:44 -0700292 aos::InitGoogle(&argc, &argv);
Parker Schuhc1975fc2018-04-07 15:27:07 -0700293
Austin Schuh8d5fff42018-05-30 20:44:12 -0700294 TCPServer<MjpegDataSocket> tcp_server_(80);
295 aos::vision::CameraParams params0;
Austin Schuh99f7c6a2024-06-25 22:07:44 -0700296 params0.set_exposure(absl::GetFlag(FLAGS_camera0_exposure));
Austin Schuh8d5fff42018-05-30 20:44:12 -0700297 params0.set_brightness(-40);
298 params0.set_width(320);
Philipp Schrader790cb542023-07-05 21:06:52 -0700299 // params0.set_fps(10);
Austin Schuh8d5fff42018-05-30 20:44:12 -0700300 params0.set_height(240);
301
302 aos::vision::CameraParams params1 = params0;
Austin Schuh99f7c6a2024-06-25 22:07:44 -0700303 params1.set_exposure(absl::GetFlag(FLAGS_camera1_exposure));
Austin Schuh8d5fff42018-05-30 20:44:12 -0700304
305 ::y2018::VisionStatus vision_status;
306 ::aos::events::ProtoTXUdpSocket<::y2018::VisionStatus> status_socket(
307 "10.9.71.2", 5001);
308
309 ::std::unique_ptr<CameraStream> camera1;
310 ::std::unique_ptr<CameraStream> camera0(new CameraStream(
311 params0, "/dev/video0", &tcp_server_, true,
James Kuszmaul3ae42262019-11-08 12:33:41 -0800312 [&camera0, &status_socket, &vision_status]() {
Austin Schuh8d5fff42018-05-30 20:44:12 -0700313 vision_status.set_low_frame_count(vision_status.low_frame_count() + 1);
Austin Schuhf257f3c2019-10-27 21:00:43 -0700314 AOS_LOG(INFO, "Got a frame cam0\n");
Austin Schuh8d5fff42018-05-30 20:44:12 -0700315 if (camera0->active()) {
316 status_socket.Send(vision_status);
317 }
318 }));
Austin Schuh99f7c6a2024-06-25 22:07:44 -0700319 if (!absl::GetFlag(FLAGS_single_camera)) {
Austin Schuh8d5fff42018-05-30 20:44:12 -0700320 camera1.reset(new CameraStream(
321 // params,
322 // "/dev/v4l/by-path/platform-tegra-xhci-usb-0:3.1:1.0-video-index0",
323 params1, "/dev/video1", &tcp_server_, false,
James Kuszmaul3ae42262019-11-08 12:33:41 -0800324 [&camera1, &status_socket, &vision_status]() {
Austin Schuh8d5fff42018-05-30 20:44:12 -0700325 vision_status.set_high_frame_count(vision_status.high_frame_count() +
326 1);
Austin Schuhf257f3c2019-10-27 21:00:43 -0700327 AOS_LOG(INFO, "Got a frame cam1\n");
Austin Schuh8d5fff42018-05-30 20:44:12 -0700328 if (camera1->active()) {
329 status_socket.Send(vision_status);
330 }
331 }));
332 }
333
334 ProtoUdpClient<VisionControl> udp_client(
335 5000, [&camera0, &camera1](const VisionControl &vision_control) {
Austin Schuha8de4a62018-09-03 18:04:28 -0700336 bool cam0_active = false;
Austin Schuh8d5fff42018-05-30 20:44:12 -0700337 if (camera1) {
Austin Schuha8de4a62018-09-03 18:04:28 -0700338 cam0_active = !vision_control.high_video();
Austin Schuh8d5fff42018-05-30 20:44:12 -0700339 camera0->set_active(!vision_control.high_video());
340 camera1->set_active(vision_control.high_video());
341 } else {
Austin Schuha8de4a62018-09-03 18:04:28 -0700342 cam0_active = true;
Austin Schuh8d5fff42018-05-30 20:44:12 -0700343 camera0->set_active(true);
344 }
Austin Schuhf257f3c2019-10-27 21:00:43 -0700345 AOS_LOG(INFO, "Got control packet, cam%d active\n",
346 cam0_active ? 0 : 1);
Austin Schuh8d5fff42018-05-30 20:44:12 -0700347 });
348
349 // Default to camera0
350 camera0->set_active(true);
351 if (camera1) {
352 camera1->set_active(false);
353 }
Parker Schuhc1975fc2018-04-07 15:27:07 -0700354
355 aos::events::EpollLoop loop;
Austin Schuh8d5fff42018-05-30 20:44:12 -0700356 loop.Add(&tcp_server_);
357 loop.Add(camera0.get());
358 if (camera1) {
359 loop.Add(camera1.get());
360 }
361 loop.Add(&udp_client);
Parker Schuhc1975fc2018-04-07 15:27:07 -0700362
363 printf("Running Camera\n");
364 loop.Run();
365}