blob: 7749f5c7927352eaaebe4ee6f5b9fe3487828d35 [file] [log] [blame]
Parker Schuh90641112017-02-25 12:18:36 -08001#include <gdk/gdk.h>
2#include <gtk/gtk.h>
3#include <sys/stat.h>
4#include <unistd.h>
Austin Schuh60e77942022-05-16 17:48:24 -07005
Parker Schuh90641112017-02-25 12:18:36 -08006#include <cstdlib>
7#include <fstream>
8#include <functional>
9#include <string>
10
11#include "aos/vision/blob/codec.h"
Austin Schuh60e77942022-05-16 17:48:24 -070012#include "aos/vision/debug/debug_framework.h"
Parker Schuh90641112017-02-25 12:18:36 -080013#include "aos/vision/events/tcp_client.h"
14
Stephan Pleinesf63bde82024-01-13 15:59:33 -080015namespace aos::vision {
Parker Schuh90641112017-02-25 12:18:36 -080016
Parker Schuhcd258b82017-04-09 16:28:29 -070017// Reads packets in the form:
18// uint32 length
19// string text (of size length)
Parker Schuh90641112017-02-25 12:18:36 -080020class BufferedLengthDelimReader {
21 public:
22 union data_len {
23 uint32_t len;
24 char buf[4];
25 };
26 BufferedLengthDelimReader() {
27 num_read_ = 0;
28 img_read_ = -1;
29 }
30 template <typename Lamb>
31 void up(int fd, Lamb lam) {
Parker Schuhcd258b82017-04-09 16:28:29 -070032 // Drops older messages with this while loop until we've seeked to
33 // be current.
34 while (true) {
35 ssize_t count;
36 if (img_read_ < 0) {
37 count = read(fd, &len_.buf[num_read_], sizeof(len_.buf) - num_read_);
38 if (count < 0) break;
39 num_read_ += count;
40 if (num_read_ < 4) break;
41 num_read_ = 0;
42 img_read_ = 0;
43 data_.clear();
44 if (len_.len > 200000) {
45 printf("bad size: %d\n", len_.len);
46 exit(-1);
47 }
48 data_.resize(len_.len);
49 } else {
50 count = read(fd, &data_[img_read_], len_.len - img_read_);
51 if (count < 0) break;
52 img_read_ += count;
53 if (img_read_ < (int)len_.len) return;
54 std::swap(prev_data_, data_);
55 has_prev_ = true;
56 img_read_ = -1;
Parker Schuh90641112017-02-25 12:18:36 -080057 }
Parker Schuhcd258b82017-04-09 16:28:29 -070058 }
59 if (has_prev_) {
60 lam(DataRef{&prev_data_[0], len_.len});
61 has_prev_ = false;
Parker Schuh90641112017-02-25 12:18:36 -080062 }
63 }
64
65 private:
66 data_len len_;
67 int num_read_;
Parker Schuhcd258b82017-04-09 16:28:29 -070068 bool has_prev_ = false;
69 std::vector<char> prev_data_;
Parker Schuh90641112017-02-25 12:18:36 -080070 std::vector<char> data_;
71 int img_read_;
72};
73
74bool ParsePort(const std::string &port, int *portno) {
75 if (port.empty()) return false;
76 int value = 0;
77 if (port[0] == '0') return false;
78 for (char item : port) {
79 if (item < '0' || item > '9') return false;
80 value = value * 10 + (item - '0');
81 }
82 *portno = value;
83 return true;
84}
85
86class TCPImageSource : public ImageSource {
87 public:
88 class Impl : public aos::events::TcpClient {
89 public:
90 Impl(const std::string &hostname, int portno,
91 DebugFrameworkInterface *interface)
Parker Schuhef47dbf2017-03-04 16:59:30 -080092 : aos::events::TcpClient(hostname.c_str(), portno),
93 interface_(interface) {}
Parker Schuh90641112017-02-25 12:18:36 -080094
95 void ReadEvent() override {
96 read_.up(fd(), [&](DataRef data) {
97 BlobList blobl;
98 const char *buf = data.data();
99 buf += sizeof(uint32_t);
100
101 ImageFormat fmt;
102 Int64Codec::Read(&buf);
103 fmt.w = Int32Codec::Read(&buf);
104 fmt.h = Int32Codec::Read(&buf);
105 buf = ParseBlobList(&blobl, buf);
106 interface_->NewBlobList(blobl, fmt);
107 });
Parker Schuhcd258b82017-04-09 16:28:29 -0700108 if (errno != EAGAIN) {
109 fprintf(stderr, "disconnected\n");
110 loop()->Delete(this);
111 }
Parker Schuh90641112017-02-25 12:18:36 -0800112 }
113
114 BufferedLengthDelimReader read_;
115 DebugFrameworkInterface *interface_ = nullptr;
116 };
117
118 void Init(const std::string &addr_and_port,
119 DebugFrameworkInterface *interface) override {
120 auto it = addr_and_port.rfind(':');
121 if (it == std::string::npos) {
122 fprintf(stderr, "usage is: tcp:hostname:port\n");
123 exit(-1);
124 }
125 auto hostname = addr_and_port.substr(0, it);
126 auto port = addr_and_port.substr(it + 1);
127 int portno = 0;
128 if (!ParsePort(port, &portno)) {
129 fprintf(stderr, "usage is: tcp:hostname:port\n");
130 exit(-1);
131 }
132
133 impl_.reset(new Impl(hostname, portno, interface));
134
135 interface->Loop()->Add(impl_.get());
136
137 interface->InstallKeyPress([this](uint32_t /*keyval*/) {});
138 }
139
140 const char *GetHelpMessage() override {
141 return &R"(
142 format_spec is in ipaddr:port format.
143 This viewer soure connects to a target_sender binary and views the live
144 blob-stream.
145)"[1];
146 }
147
148 private:
149 std::unique_ptr<Impl> impl_;
150};
151
152REGISTER_IMAGE_SOURCE("tcp", TCPImageSource);
153
Stephan Pleinesf63bde82024-01-13 15:59:33 -0800154} // namespace aos::vision