blob: 0271e36482f3ce6b87dd66caeb126eb4b2d8cab6 [file] [log] [blame]
James Kuszmaulcf324122023-01-14 14:07:17 -08001// Copyright (c) FIRST and other WPILib contributors.
2// Open Source Software; you can modify and/or share it under the terms of
3// the WPILib BSD license file in the root directory of this project.
4
5#ifdef __APPLE__
6#include <util.h>
7#elif !defined(_WIN32)
8#include <pty.h>
9#endif
10
11#include <cstdio>
12
13#include <fmt/format.h>
14#include <wpi/MathExtras.h>
15#include <wpi/SmallVector.h>
16#include <wpi/StringExtras.h>
17#include <wpi/timestamp.h>
18
19#include "wpinet/raw_uv_ostream.h"
20#include "wpinet/uv/Loop.h"
21#include "wpinet/uv/Pipe.h"
22#include "wpinet/uv/Process.h"
23#include "wpinet/uv/Signal.h"
24#include "wpinet/uv/Tcp.h"
25#include "wpinet/uv/Tty.h"
26#include "wpinet/uv/Udp.h"
27#include "wpinet/uv/util.h"
28
29namespace uv = wpi::uv;
30
31static uint64_t startTime = wpi::Now();
32
33static bool NewlineBuffer(std::string& rem, uv::Buffer& buf, size_t len,
34 wpi::SmallVectorImpl<uv::Buffer>& bufs, bool tcp,
35 uint16_t tcpSeq) {
36 // scan for last newline
37 std::string_view str(buf.base, len);
38 size_t idx = str.rfind('\n');
39 if (idx == std::string_view::npos) {
40 // no newline yet, just keep appending to remainder
41 rem += str;
42 return false;
43 }
44
45 // build output
46 wpi::raw_uv_ostream out(bufs, 4096);
47 std::string_view toCopy = wpi::slice(str, 0, idx + 1);
48 if (tcp) {
49 // Header is 2 byte len, 1 byte type, 4 byte timestamp, 2 byte sequence num
50 uint32_t ts = wpi::FloatToBits((wpi::Now() - startTime) * 1.0e-6);
51 uint16_t len = rem.size() + toCopy.size() + 1 + 4 + 2;
52 const uint8_t header[] = {static_cast<uint8_t>((len >> 8) & 0xff),
53 static_cast<uint8_t>(len & 0xff),
54 12,
55 static_cast<uint8_t>((ts >> 24) & 0xff),
56 static_cast<uint8_t>((ts >> 16) & 0xff),
57 static_cast<uint8_t>((ts >> 8) & 0xff),
58 static_cast<uint8_t>(ts & 0xff),
59 static_cast<uint8_t>((tcpSeq >> 8) & 0xff),
60 static_cast<uint8_t>(tcpSeq & 0xff)};
61 out << std::span<const uint8_t>(header);
62 }
63 out << rem << toCopy;
64
65 // reset remainder
66 rem = wpi::slice(str, idx + 1, std::string_view::npos);
67 return true;
68}
69
70static void CopyUdp(uv::Stream& in, std::shared_ptr<uv::Udp> out,
71 bool broadcast) {
72 sockaddr_in addr;
73 if (broadcast) {
74 out->SetBroadcast(true);
75 uv::NameToAddr("0.0.0.0", 6666, &addr);
76 } else {
77 uv::NameToAddr("127.0.0.1", 6666, &addr);
78 }
79
80 in.data.connect(
81 [rem = std::make_shared<std::string>(), outPtr = out.get(), addr](
82 uv::Buffer& buf, size_t len) {
83 // build buffers
84 wpi::SmallVector<uv::Buffer, 4> bufs;
85 if (!NewlineBuffer(*rem, buf, len, bufs, false, 0)) {
86 return;
87 }
88
89 // send output
90 outPtr->Send(addr, bufs, [](auto bufs2, uv::Error) {
91 for (auto buf : bufs2) {
92 buf.Deallocate();
93 }
94 });
95 },
96 out);
97}
98
99static void CopyTcp(uv::Stream& in, std::shared_ptr<uv::Stream> out) {
100 struct StreamData {
101 std::string rem;
102 uint16_t seq = 0;
103 };
104 in.data.connect(
105 [data = std::make_shared<StreamData>(), outPtr = out.get()](
106 uv::Buffer& buf, size_t len) {
107 // build buffers
108 wpi::SmallVector<uv::Buffer, 4> bufs;
109 if (!NewlineBuffer(data->rem, buf, len, bufs, true, data->seq++)) {
110 return;
111 }
112
113 // send output
114 outPtr->Write(bufs, [](auto bufs2, uv::Error) {
115 for (auto buf : bufs2) {
116 buf.Deallocate();
117 }
118 });
119 },
120 out);
121}
122
123static void CopyStream(uv::Stream& in, std::shared_ptr<uv::Stream> out) {
124 in.data.connect([out](uv::Buffer& buf, size_t len) {
125 uv::Buffer buf2 = buf.Dup();
126 buf2.len = len;
127 out->Write({buf2}, [](auto bufs, uv::Error) {
128 for (auto buf : bufs) {
129 buf.Deallocate();
130 }
131 });
132 });
133}
134
135int main(int argc, char* argv[]) {
136 // parse arguments
137 int programArgc = 1;
138 bool useUdp = false;
139 bool broadcastUdp = false;
140 bool err = false;
141
142 while (programArgc < argc && argv[programArgc][0] == '-') {
143 if (std::string_view(argv[programArgc]) == "-u") {
144 useUdp = true;
145 } else if (std::string_view(argv[programArgc]) == "-b") {
146 useUdp = true;
147 broadcastUdp = true;
148 } else {
149 fmt::print(stderr, "unrecognized command line option {}\n",
150 argv[programArgc]);
151 err = true;
152 }
153 ++programArgc;
154 }
155
156 if (err || (argc - programArgc) < 1) {
157 std::fputs(argv[0], stderr);
158 std::fputs(
159 " [-ub] program [arguments ...]\n"
160 " -u send udp to localhost port 6666 instead of using tcp\n"
161 " -b broadcast udp to port 6666 instead of using tcp\n",
162 stderr);
163 return EXIT_FAILURE;
164 }
165
166 uv::Process::DisableStdioInheritance();
167
168 auto loop = uv::Loop::Create();
169 loop->error.connect(
170 [](uv::Error err) { fmt::print(stderr, "uv ERROR: {}\n", err.str()); });
171
172 // create pipes to communicate with child
173 auto stdinPipe = uv::Pipe::Create(loop);
174 auto stdoutPipe = uv::Pipe::Create(loop);
175 auto stderrPipe = uv::Pipe::Create(loop);
176
177 // create tty to pass from our console to child's
178 auto stdinTty = uv::Tty::Create(loop, 0, true);
179 auto stdoutTty = uv::Tty::Create(loop, 1, false);
180 auto stderrTty = uv::Tty::Create(loop, 2, false);
181
182 // pass through our console to child's (bidirectional)
183 if (stdinTty) {
184 CopyStream(*stdinTty, stdinPipe);
185 }
186 if (stdoutTty) {
187 CopyStream(*stdoutPipe, stdoutTty);
188 }
189 if (stderrTty) {
190 CopyStream(*stderrPipe, stderrTty);
191 }
192
193 // when our stdin closes, also close child stdin
194 if (stdinTty) {
195 stdinTty->end.connect([stdinPipe] { stdinPipe->Close(); });
196 }
197
198 if (useUdp) {
199 auto udp = uv::Udp::Create(loop);
200 // tee stdout and stderr
201 CopyUdp(*stdoutPipe, udp, broadcastUdp);
202 CopyUdp(*stderrPipe, udp, broadcastUdp);
203 } else {
204 auto tcp = uv::Tcp::Create(loop);
205
206 // bind to listen address and port
207 tcp->Bind("", 1740);
208
209 // when we get a connection, accept it
210 tcp->connection.connect([srv = tcp.get(), stdoutPipe, stderrPipe] {
211 auto tcp = srv->Accept();
212 if (!tcp) {
213 return;
214 }
215
216 // close on error
217 tcp->error.connect([s = tcp.get()](wpi::uv::Error err) { s->Close(); });
218
219 // tee stdout and stderr
220 CopyTcp(*stdoutPipe, tcp);
221 CopyTcp(*stderrPipe, tcp);
222 });
223
224 // start listening for incoming connections
225 tcp->Listen();
226 }
227
228 // build process options
229 wpi::SmallVector<uv::Process::Option, 8> options;
230
231 // hook up pipes to child
232 options.emplace_back(
233 uv::Process::StdioCreatePipe(0, *stdinPipe, UV_READABLE_PIPE));
234#ifndef _WIN32
235 // create a PTY so the child does unbuffered output
236 int parentfd, childfd;
237 if (openpty(&parentfd, &childfd, nullptr, nullptr, nullptr) == 0) {
238 stdoutPipe->Open(parentfd);
239 options.emplace_back(uv::Process::StdioInherit(1, childfd));
240 } else {
241 options.emplace_back(
242 uv::Process::StdioCreatePipe(1, *stdoutPipe, UV_WRITABLE_PIPE));
243 }
244#else
245 options.emplace_back(
246 uv::Process::StdioCreatePipe(1, *stdoutPipe, UV_WRITABLE_PIPE));
247#endif
248 options.emplace_back(
249 uv::Process::StdioCreatePipe(2, *stderrPipe, UV_WRITABLE_PIPE));
250
251 // pass our args as the child args (argv[1] becomes child argv[0], etc)
252 for (int i = programArgc; i < argc; ++i) {
253 options.emplace_back(argv[i]);
254 }
255
256 auto proc = uv::Process::SpawnArray(loop, argv[programArgc], options);
257 if (!proc) {
258 std::fputs("could not start subprocess\n", stderr);
259 return EXIT_FAILURE;
260 }
261 proc->exited.connect([](int64_t status, int) { std::exit(status); });
262
263 // start reading
264 if (stdinTty) {
265 stdinTty->StartRead();
266 }
267 stdoutPipe->StartRead();
268 stderrPipe->StartRead();
269
270 // pass various signals to child
271 auto sigHandler = [proc](int signum) { proc->Kill(signum); };
272 for (int signum : {SIGINT, SIGHUP, SIGTERM}) {
273 auto sig = uv::Signal::Create(loop);
274 sig->Start(signum);
275 sig->signal.connect(sigHandler);
276 }
277
278 // run the loop!
279 loop->Run();
280}