Provide lossless version of live plotter
Reuse the existing buffers that exist for offline log replay to maintain
only the buffers we need during realtime replay.
Change-Id: Ic2b43bb2c761ebf564d77ee3ce00cd84804a7a1a
Signed-off-by: James Kuszmaul <jabukuszmaul+collab@gmail.com>
diff --git a/aos/network/log_web_proxy_main.cc b/aos/network/log_web_proxy_main.cc
index 5c099a2..945d034 100644
--- a/aos/network/log_web_proxy_main.cc
+++ b/aos/network/log_web_proxy_main.cc
@@ -42,7 +42,8 @@
event_loop->SkipTimingReport();
- aos::web_proxy::WebProxy web_proxy(event_loop.get(), FLAGS_buffer_size);
+ aos::web_proxy::WebProxy web_proxy(
+ event_loop.get(), aos::web_proxy::StoreHistory::kYes, FLAGS_buffer_size);
web_proxy.SetDataPath(FLAGS_data_dir.c_str());
diff --git a/aos/network/web_proxy.cc b/aos/network/web_proxy.cc
index fdd8f1e..6d4f23f 100644
--- a/aos/network/web_proxy.cc
+++ b/aos/network/web_proxy.cc
@@ -28,7 +28,9 @@
namespace aos {
namespace web_proxy {
WebsocketHandler::WebsocketHandler(::seasocks::Server *server,
- aos::EventLoop *event_loop, int buffer_size)
+ aos::EventLoop *event_loop,
+ StoreHistory store_history,
+ int per_channel_buffer_size_bytes)
: server_(server),
config_(aos::CopyFlatBuffer(event_loop->configuration())),
event_loop_(event_loop) {
@@ -47,7 +49,10 @@
if (aos::configuration::ChannelIsReadableOnNode(channel, self)) {
auto fetcher = event_loop_->MakeRawFetcher(channel);
subscribers_.emplace_back(std::make_unique<aos::web_proxy::Subscriber>(
- std::move(fetcher), i, buffer_size));
+ std::move(fetcher), i, store_history,
+ per_channel_buffer_size_bytes < 0
+ ? -1
+ : per_channel_buffer_size_bytes / channel->max_size()));
} else {
subscribers_.emplace_back(nullptr);
}
@@ -126,19 +131,24 @@
global_epoll->DeleteFd(fd);
}
-WebProxy::WebProxy(aos::EventLoop *event_loop, int buffer_size)
- : WebProxy(event_loop, &internal_epoll_, buffer_size) {}
+WebProxy::WebProxy(aos::EventLoop *event_loop, StoreHistory store_history,
+ int per_channel_buffer_size_bytes)
+ : WebProxy(event_loop, &internal_epoll_, store_history,
+ per_channel_buffer_size_bytes) {}
-WebProxy::WebProxy(aos::ShmEventLoop *event_loop, int buffer_size)
- : WebProxy(event_loop, event_loop->epoll(), buffer_size) {}
+WebProxy::WebProxy(aos::ShmEventLoop *event_loop, StoreHistory store_history,
+ int per_channel_buffer_size_bytes)
+ : WebProxy(event_loop, event_loop->epoll(), store_history,
+ per_channel_buffer_size_bytes) {}
WebProxy::WebProxy(aos::EventLoop *event_loop, aos::internal::EPoll *epoll,
- int buffer_size)
+ StoreHistory store_history,
+ int per_channel_buffer_size_bytes)
: epoll_(epoll),
server_(std::make_shared<aos::seasocks::SeasocksLogger>(
::seasocks::Logger::Level::Info)),
- websocket_handler_(
- new WebsocketHandler(&server_, event_loop, buffer_size)) {
+ websocket_handler_(new WebsocketHandler(
+ &server_, event_loop, store_history, per_channel_buffer_size_bytes)) {
CHECK(!global_epoll);
global_epoll = epoll;
@@ -192,10 +202,13 @@
}
void Subscriber::RunIteration() {
- if (channels_.empty() && buffer_size_ == 0) {
+ if (channels_.empty() && (buffer_size_ == 0 || !store_history_)) {
+ fetcher_->Fetch();
+ message_buffer_.clear();
return;
}
+
while (fetcher_->FetchNext()) {
// If we aren't building up a buffer, short-circuit the FetchNext().
if (buffer_size_ == 0) {
@@ -271,12 +284,6 @@
ChannelInformation info;
info.transfer_method = transfer_method;
- // If we aren't keeping a buffer and there are no existing listeners, call
- // Fetch() to avoid falling behind on future calls to FetchNext().
- if (channels_.empty() && buffer_size_ == 0) {
- fetcher_->Fetch();
- }
-
channels_.emplace_back(std::make_pair(data_channel, info));
data_channel->set_on_message(
diff --git a/aos/network/web_proxy.fbs b/aos/network/web_proxy.fbs
index 6c85acb..f1d6645 100644
--- a/aos/network/web_proxy.fbs
+++ b/aos/network/web_proxy.fbs
@@ -64,7 +64,7 @@
enum TransferMethod : byte {
SUBSAMPLE,
- EVERYTHING_WITH_HISTORY,
+ LOSSLESS,
}
table ChannelRequest {
diff --git a/aos/network/web_proxy.h b/aos/network/web_proxy.h
index 0815ebf..baca26e 100644
--- a/aos/network/web_proxy.h
+++ b/aos/network/web_proxy.h
@@ -24,13 +24,19 @@
class Subscriber;
class ApplicationConnection;
+enum class StoreHistory {
+ kNo,
+ kYes,
+};
+
// Basic class that handles receiving new websocket connections. Creates a new
// Connection to manage the rest of the negotiation and data passing. When the
// websocket closes, it deletes the Connection.
class WebsocketHandler : public ::seasocks::WebSocket::Handler {
public:
WebsocketHandler(::seasocks::Server *server, aos::EventLoop *event_loop,
- int buffer_size);
+ StoreHistory store_history,
+ int per_channel_buffer_size_bytes);
void onConnect(::seasocks::WebSocket *sock) override;
void onData(::seasocks::WebSocket *sock, const uint8_t *data,
size_t size) override;
@@ -50,15 +56,28 @@
// Wrapper class that manages the seasocks server and WebsocketHandler.
class WebProxy {
public:
- WebProxy(aos::EventLoop *event_loop, int buffer_size);
- WebProxy(aos::ShmEventLoop *event_loop, int buffer_size);
+ // Constructs a WebProxy object for interacting with a webpage. store_history
+ // and per_channel_buffer_size_bytes specify how we manage delivering LOSSLESS
+ // messages to the client:
+ // * store_history specifies whether we should always buffer up data for all
+ // channels--even for messages that are played prior to the client
+ // connecting. This is mostly useful for log replay where the client
+ // will typically connect after the logfile has been fully loaded/replayed.
+ // * per_channel_buffer_size_bytes is the maximum amount of data to buffer
+ // up per channel (-1 will indicate infinite data, which is used during log
+ // replay). This is divided by the max_size per channel to determine
+ // how many messages to queue up.
+ WebProxy(aos::EventLoop *event_loop, StoreHistory store_history,
+ int per_channel_buffer_size_bytes);
+ WebProxy(aos::ShmEventLoop *event_loop, StoreHistory store_history,
+ int per_channel_buffer_size_bytes);
~WebProxy();
void SetDataPath(const char *path) { server_.setStaticPath(path); }
private:
WebProxy(aos::EventLoop *event_loop, aos::internal::EPoll *epoll,
- int buffer_size);
+ StoreHistory store_history, int per_channel_buffer_size_bytes);
aos::internal::EPoll internal_epoll_;
aos::internal::EPoll *const epoll_;
@@ -96,9 +115,10 @@
class Subscriber {
public:
Subscriber(std::unique_ptr<RawFetcher> fetcher, int channel_index,
- int buffer_size)
+ StoreHistory store_history, int buffer_size)
: fetcher_(std::move(fetcher)),
channel_index_(channel_index),
+ store_history_(store_history == StoreHistory::kYes),
buffer_size_(buffer_size) {}
void RunIteration();
@@ -133,6 +153,10 @@
std::unique_ptr<RawFetcher> fetcher_;
int channel_index_;
+ // If set, will always build up a buffer of the most recent buffer_size_
+ // messages. If store_history_ is *not* set we will only buffer up messages
+ // while there is an active listener.
+ bool store_history_;
int buffer_size_;
std::deque<Message> message_buffer_;
// The ScopedDataChannel that we use for actually sending data over WebRTC
diff --git a/aos/network/web_proxy_main.cc b/aos/network/web_proxy_main.cc
index 06fe942..f3ad926 100644
--- a/aos/network/web_proxy_main.cc
+++ b/aos/network/web_proxy_main.cc
@@ -6,7 +6,9 @@
DEFINE_string(config, "./config.json", "File path of aos configuration");
DEFINE_string(data_dir, "www", "Directory to serve data files from");
-DEFINE_int32(buffer_size, 0, "-1 if infinite, in # of messages / channel.");
+DEFINE_int32(buffer_size, 1000000,
+ "-1 if infinite, in bytes / channel. If there are no active "
+ "connections, will not store anything.");
int main(int argc, char **argv) {
aos::InitGoogle(&argc, &argv);
@@ -16,7 +18,8 @@
aos::ShmEventLoop event_loop(&config.message());
- aos::web_proxy::WebProxy web_proxy(&event_loop, FLAGS_buffer_size);
+ aos::web_proxy::WebProxy web_proxy(
+ &event_loop, aos::web_proxy::StoreHistory::kNo, FLAGS_buffer_size);
web_proxy.SetDataPath(FLAGS_data_dir.c_str());
event_loop.Run();
diff --git a/aos/network/www/proxy.ts b/aos/network/www/proxy.ts
index 5461899..5415400 100644
--- a/aos/network/www/proxy.ts
+++ b/aos/network/www/proxy.ts
@@ -117,7 +117,7 @@
name: string, type: string,
handler: (data: Uint8Array, sentTime: number) => void): void {
this.addHandlerImpl(
- name, type, TransferMethod.EVERYTHING_WITH_HISTORY, handler);
+ name, type, TransferMethod.LOSSLESS, handler);
}
/**
@@ -137,7 +137,7 @@
if (!this.handlerFuncs.has(channel.key())) {
this.handlerFuncs.set(channel.key(), []);
} else {
- if (method == TransferMethod.EVERYTHING_WITH_HISTORY) {
+ if (method == TransferMethod.LOSSLESS) {
console.warn(
'Behavior of multiple reliable handlers is currently poorly ' +
'defined and may not actually deliver all of the messages.');
diff --git a/frc971/analysis/in_process_plotter.cc b/frc971/analysis/in_process_plotter.cc
index 0eaf719..cd52b1c 100644
--- a/frc971/analysis/in_process_plotter.cc
+++ b/frc971/analysis/in_process_plotter.cc
@@ -15,7 +15,7 @@
event_loop_factory_(&config_.message()),
event_loop_(event_loop_factory_.MakeEventLoop("plotter")),
plot_sender_(event_loop_->MakeSender<Plot>("/analysis")),
- web_proxy_(event_loop_.get(), -1),
+ web_proxy_(event_loop_.get(), aos::web_proxy::StoreHistory::kYes, -1),
builder_(plot_sender_.MakeBuilder()) {
web_proxy_.SetDataPath(kDataPath);
event_loop_->SkipTimingReport();
diff --git a/frc971/analysis/live_web_plotter_demo.sh b/frc971/analysis/live_web_plotter_demo.sh
index 4c4c9c4..45f3b92 100755
--- a/frc971/analysis/live_web_plotter_demo.sh
+++ b/frc971/analysis/live_web_plotter_demo.sh
@@ -1 +1 @@
-./aos/network/web_proxy_main --config=aos/network/www/test_config.json --data_dir=frc971/analysis
+./aos/network/web_proxy_main --config=aos/network/www/test_config.json --data_dir=frc971/analysis "$@"