blob: ce6f5e2f42ee586c9f1d0e206382cfee95beef6e [file] [log] [blame]
#include "aos/vision/events/socket_types.h"
#include "aos/vision/image/reader.h"
#include "aos/vision/image/image_stream.h"
#include "aos/common/logging/implementations.h"
#include "aos/common/logging/logging.h"
#include "aos/vision/blob/codec.h"
#include <fstream>
#include <sys/stat.h>
#include <deque>
#include <string>
using aos::events::TCPServer;
using aos::events::DataSocket;
using aos::vision::Int32Codec;
using aos::vision::DataRef;
aos::vision::DataRef mjpg_header = "HTTP/1.0 200 OK\r\n"\
"Server: YourServerName\r\n"\
"Connection: close\r\n"\
"Max-Age: 0\r\n"\
"Expires: 0\r\n"\
"Cache-Control: no-cache, private\r\n"\
"Pragma: no-cache\r\n"\
"Content-Type: multipart/x-mixed-replace; "\
"boundary=--boundary\r\n\r\n";
struct Frame {
std::string data;
};
inline bool FileExist(const std::string &name) {
struct stat buffer;
return (stat(name.c_str(), &buffer) == 0);
}
class BlobLog {
public:
explicit BlobLog(const char *prefix, const char *extension) {
int index = 0;
while (true) {
std::string file = prefix + std::to_string(index) + extension;
if (FileExist(file)) {
index++;
} else {
printf("Logging to file (%s)\n", file.c_str());
ofst_.open(file);
assert(ofst_.is_open());
break;
}
}
}
~BlobLog() { ofst_.close(); }
void WriteLogEntry(DataRef data) { ofst_.write(&data[0], data.size()); }
private:
std::ofstream ofst_;
};
class MjpegDataSocket : public aos::events::SocketConnection {
public:
MjpegDataSocket(aos::events::TCPServerBase *serv, int fd)
: aos::events::SocketConnection(serv, fd) {
SetEvents(EPOLLOUT | EPOLLET);
}
~MjpegDataSocket() { printf("Closed connection on descriptor %d\n", fd()); }
void DirectEvent(uint32_t events) override {
if (events & EPOLLOUT) {
NewDataToSend();
events &= ~EPOLLOUT;
}
if (events) {
aos::events::EpollEvent::DirectEvent(events);
}
}
void ReadEvent() override {
// Ignore reads, but don't leave them pending.
ssize_t count;
char buf[512];
while (true) {
count = read(fd(), &buf, sizeof buf);
if (count <= 0) {
if (errno != EAGAIN) {
CloseConnection();
return;
}
break;
} else if (!ready_to_recieve_) {
// This 4 should match "\r\n\r\n".length();
if (match_i_ >= 4) {
printf("reading after last match\n");
continue;
}
for (char c : aos::vision::DataRef(&buf[0], count)) {
if (c == "\r\n\r\n"[match_i_]) {
++match_i_;
if (match_i_ >= 4) {
if (!ready_to_recieve_) {
ready_to_recieve_ = true;
RasterHeader();
}
}
} else if (match_i_ != 0) {
if (c == '\r') match_i_ = 1;
}
}
}
}
}
void RasterHeader() {
output_buffer_.push_back(mjpg_header);
NewDataToSend();
}
void RasterFrame(std::shared_ptr<Frame> frame) {
if (!output_buffer_.empty() || !ready_to_recieve_) return;
sending_frame_ = frame;
aos::vision::DataRef data = frame->data;
size_t n_written = snprintf(data_header_tmp_, sizeof(data_header_tmp_),
"--boundary\r\n"\
"Content-type: image/jpg\r\n"\
"Content-Length: %zu\r\n\r\n", data.size());
// This should never happen because the buffer should be properly sized.
if (n_written == sizeof(data_header_tmp_)) {
fprintf(stderr, "wrong sized buffer\n");
exit(-1);
}
output_buffer_.push_back(aos::vision::DataRef(data_header_tmp_, n_written));
output_buffer_.push_back(data);
output_buffer_.push_back("\r\n\r\n");
NewDataToSend();
}
void NewFrame(std::shared_ptr<Frame> frame) {
RasterFrame(std::move(frame));
}
void NewDataToSend() {
while (!output_buffer_.empty()) {
auto& data = *output_buffer_.begin();
while (!data.empty()) {
int len = send(fd(), data.data(), data.size(), MSG_NOSIGNAL);
if (len == -1) {
if (errno == EAGAIN) {
// Next thinggy will pick this up.
return;
} else {
CloseConnection();
return;
}
} else {
data.remove_prefix(len);
}
}
output_buffer_.pop_front();
}
}
private:
char data_header_tmp_[512];
std::shared_ptr<Frame> sending_frame_;
std::deque<aos::vision::DataRef> output_buffer_;
bool ready_to_recieve_ = false;
void CloseConnection() {
loop()->Delete(this);
close(fd());
delete this;
}
size_t match_i_ = 0;
};
using namespace aos::vision;
class CameraStream : public ImageStreamEvent {
public:
CameraStream(aos::vision::CameraParams params,
const std::string &fname, TCPServer<MjpegDataSocket>* tcp_serv)
: ImageStreamEvent(fname, params), tcp_serv_(tcp_serv),
log_("./logging/blob_record_", ".dat") {}
void ProcessImage(DataRef data, aos::monotonic_clock::time_point tp) {
(void)tp;
++sampling;
// 20 is the sampling rate.
if (sampling == 20) {
int tmp_size = data.size() + sizeof(int32_t);
char *buf;
std::string log_record;
log_record.resize(tmp_size, 0);
{
buf = Int32Codec::Write(&log_record[0], tmp_size);
data.copy(buf, data.size());
}
log_.WriteLogEntry(log_record);
sampling = 0;
}
auto frame = std::make_shared<Frame>(Frame{std::string(data)});
tcp_serv_->Broadcast([frame](MjpegDataSocket* event) {
event->NewFrame(frame);
});
}
private:
int sampling = 0;
TCPServer<MjpegDataSocket>* tcp_serv_;
BlobLog log_;
};
int main() {
::aos::logging::Init();
::aos::logging::AddImplementation(
new ::aos::logging::StreamLogImplementation(stderr));
TCPServer<MjpegDataSocket> tcp_serv_(80);
aos::vision::CameraParams params;
params.set_exposure(100);
params.set_width(320);
params.set_height(240);
CameraStream camera(params, "/dev/video0", &tcp_serv_);
aos::events::EpollLoop loop;
loop.Add(&tcp_serv_);
loop.Add(&camera);
printf("Running Camera\n");
loop.Run();
}