Add queue buffers & simulation support to web proxy
This makes a couple of major changes:
-Directly uses EPoll class for managing Seasocks events.
-Adds buffers of queues to web proxy Subscribers so that we can
transfering data losslessly in log replay.
-Modifies the flatbuffer used for the RTC communications so that
the webpage can specify whether it wants every message or subsampled
messages.
-Adds an option to LogReader to let us run past the end of the logfile.
Note that these changes do mean that, for log replay, the web proxy will
load the *entire* logfile into memory. Future changes can optimize this
to, e.g., only load the required channels into memory.
Change-Id: I74e7608c30baa8b36e05c4ab50e12a54bf75aa4c
diff --git a/aos/configuration.cc b/aos/configuration.cc
index ad2e295..6fce2e0 100644
--- a/aos/configuration.cc
+++ b/aos/configuration.cc
@@ -631,7 +631,8 @@
<< type << "\" }";
}
- // Then look for the channel.
+ // Then look for the channel (note that this relies on the channels being
+ // sorted in the config).
auto channel_iterator =
std::lower_bound(config->channels()->cbegin(), config->channels()->cend(),
std::make_pair(name, type), CompareChannels);
diff --git a/aos/configuration.h b/aos/configuration.h
index d10d16d..2a37ba8 100644
--- a/aos/configuration.h
+++ b/aos/configuration.h
@@ -45,6 +45,9 @@
//
// If the application name is empty, it is ignored. Maps are processed in
// reverse order, and application specific first.
+//
+// The config should already be fully merged and sorted (as produced by
+// MergeConfiguration() or any of the associated functions).
const Channel *GetChannel(const Configuration *config,
const std::string_view name,
const std::string_view type,
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index 0338e62..ef612c9 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -1517,7 +1517,7 @@
if (state->OldestMessageTime() == monotonic_clock::max_time) {
--live_nodes_;
VLOG(1) << MaybeNodeName(state->event_loop()->node()) << "Node down!";
- if (live_nodes_ == 0) {
+ if (exit_on_finish_ && live_nodes_ == 0) {
event_loop_factory_->Exit();
}
return;
diff --git a/aos/events/logging/logger.h b/aos/events/logging/logger.h
index d042f90..b72a764 100644
--- a/aos/events/logging/logger.h
+++ b/aos/events/logging/logger.h
@@ -464,6 +464,12 @@
return log_file_header()->name()->string_view();
}
+ // Set whether to exit the SimulatedEventLoopFactory when we finish reading
+ // the logfile.
+ void set_exit_on_finish(bool exit_on_finish) {
+ exit_on_finish_ = exit_on_finish;
+ }
+
private:
const Channel *RemapChannel(const EventLoop *event_loop,
const Channel *channel);
@@ -819,6 +825,9 @@
// during startup when we are bootstrapping everything and trying to get to
// the start of all the log files.
bool ignore_missing_data_ = false;
+
+ // Whether to exit the SimulatedEventLoop when we finish reading the logs.
+ bool exit_on_finish_ = true;
};
} // namespace logger
diff --git a/aos/network/BUILD b/aos/network/BUILD
index 13ba42e..c68f3cc 100644
--- a/aos/network/BUILD
+++ b/aos/network/BUILD
@@ -437,6 +437,34 @@
],
)
+cc_binary(
+ name = "log_web_proxy_main",
+ srcs = ["log_web_proxy_main.cc"],
+ args = [
+ "--config=aos/network/www/test_config.json",
+ "--data_dir=aos/network/www",
+ ],
+ copts = [
+ "-DWEBRTC_POSIX",
+ "-Wno-unused-parameter",
+ ],
+ data = [
+ "//aos/network/www:plotting_sample",
+ "//y2020:config",
+ "@com_github_google_flatbuffers//:flatjs",
+ ],
+ deps = [
+ ":gen_embedded",
+ ":web_proxy",
+ "//aos:init",
+ "//aos/events:simulated_event_loop",
+ "//aos/events/logging:logger",
+ "//aos/seasocks:seasocks_logger",
+ "//third_party/seasocks",
+ "@com_github_google_flatbuffers//:flatbuffers",
+ ],
+)
+
cc_library(
name = "timestamp_filter",
srcs = ["timestamp_filter.cc"],
diff --git a/aos/network/log_web_proxy_main.cc b/aos/network/log_web_proxy_main.cc
new file mode 100644
index 0000000..525eb29
--- /dev/null
+++ b/aos/network/log_web_proxy_main.cc
@@ -0,0 +1,61 @@
+// Sample binary for running the web server code against a logfile.
+// This can be run by running
+// bazel run -c opt //aos/network:log_web_proxy_main -- --node node_to_replay /path/to/logfile
+// And then opening the plotting webpage at http://localhost:8080/graph.html
+
+#include "aos/configuration.h"
+#include "aos/events/logging/logger.h"
+#include "aos/events/simulated_event_loop.h"
+#include "aos/flatbuffer_merge.h"
+#include "aos/init.h"
+#include "aos/network/web_proxy.h"
+#include "aos/seasocks/seasocks_logger.h"
+#include "gflags/gflags.h"
+
+#include "internal/Embedded.h"
+#include "seasocks/Server.h"
+#include "seasocks/WebSocket.h"
+
+DEFINE_string(config, "./config.json", "File path of aos configuration");
+DEFINE_string(data_dir, "www", "Directory to serve data files from");
+DEFINE_string(node, "", "Directory to serve data files from");
+DEFINE_int32(buffer_size, -1, "-1 if infinite, in # of messages / channel.");
+
+int main(int argc, char **argv) {
+ aos::InitGoogle(&argc, &argv);
+ // Make sure to reference this to force the linker to include it.
+ findEmbeddedContent("");
+
+ const std::vector<std::string> unsorted_logfiles =
+ aos::logger::FindLogs(argc, argv);
+
+ const std::vector<aos::logger::LogFile> logfiles =
+ aos::logger::SortParts(unsorted_logfiles);
+
+ aos::logger::LogReader reader(logfiles);
+
+ reader.Register();
+
+ std::unique_ptr<aos::EventLoop> event_loop;
+
+ if (FLAGS_node.empty()) {
+ CHECK(!aos::configuration::MultiNode(reader.configuration()))
+ << "If using a multi-node logfile, please specify --node.";
+ event_loop = reader.event_loop_factory()->MakeEventLoop("web_proxy");
+ } else {
+ event_loop = reader.event_loop_factory()->MakeEventLoop(
+ "web_proxy",
+ aos::configuration::GetNode(reader.configuration(), FLAGS_node));
+ }
+
+ event_loop->SkipTimingReport();
+
+ aos::web_proxy::WebProxy web_proxy(event_loop.get(), FLAGS_buffer_size);
+
+ web_proxy.SetDataPath(FLAGS_data_dir.c_str());
+
+ // Keep the web proxy alive past when we finish reading the logfile.
+ reader.set_exit_on_finish(false);
+
+ reader.event_loop_factory()->Run();
+}
diff --git a/aos/network/web_proxy.cc b/aos/network/web_proxy.cc
index 507f73f..7c34b22 100644
--- a/aos/network/web_proxy.cc
+++ b/aos/network/web_proxy.cc
@@ -4,6 +4,7 @@
#include "aos/network/connect_generated.h"
#include "aos/network/web_proxy_generated.h"
#include "aos/network/web_proxy_utils.h"
+#include "aos/seasocks/seasocks_logger.h"
#include "api/create_peerconnection_factory.h"
#include "glog/logging.h"
@@ -27,24 +28,20 @@
} // namespace
WebsocketHandler::WebsocketHandler(::seasocks::Server *server,
- aos::EventLoop *event_loop)
+ aos::EventLoop *event_loop, int buffer_size)
: server_(server),
- config_(aos::CopyFlatBuffer(event_loop->configuration())) {
- const bool is_multi_node =
- aos::configuration::MultiNode(event_loop->configuration());
- const aos::Node *self =
- is_multi_node ? aos::configuration::GetMyNode(event_loop->configuration())
- : nullptr;
+ config_(aos::CopyFlatBuffer(event_loop->configuration())),
+ event_loop_(event_loop) {
+ const aos::Node *self = event_loop->node();
for (uint i = 0; i < event_loop->configuration()->channels()->size(); ++i) {
auto channel = event_loop->configuration()->channels()->Get(i);
if (aos::configuration::ChannelIsReadableOnNode(channel, self)) {
auto fetcher = event_loop->MakeRawFetcher(channel);
- subscribers_.emplace_back(
- std::make_unique<aos::web_proxy::Subscriber>(std::move(fetcher), i));
+ subscribers_.emplace_back(std::make_unique<aos::web_proxy::Subscriber>(
+ std::move(fetcher), i, buffer_size));
}
}
-
TimerHandler *const timer = event_loop->AddTimer([this]() {
for (auto &subscriber : subscribers_) {
subscriber->RunIteration();
@@ -57,8 +54,8 @@
}
void WebsocketHandler::onConnect(::seasocks::WebSocket *sock) {
- std::unique_ptr<Connection> conn =
- std::make_unique<Connection>(sock, server_, subscribers_, config_);
+ std::unique_ptr<Connection> conn = std::make_unique<Connection>(
+ sock, server_, subscribers_, config_, event_loop_);
connections_.insert({sock, std::move(conn)});
}
@@ -71,31 +68,103 @@
connections_.erase(sock);
}
+WebProxy::WebProxy(aos::EventLoop *event_loop, int buffer_size)
+ : WebProxy(event_loop, &internal_epoll_, buffer_size) {}
+
+WebProxy::WebProxy(aos::ShmEventLoop *event_loop, int buffer_size)
+ : WebProxy(event_loop, event_loop->epoll(), buffer_size) {}
+
+WebProxy::WebProxy(aos::EventLoop *event_loop, aos::internal::EPoll *epoll,
+ int buffer_size)
+ : epoll_(epoll),
+ server_(std::make_shared<aos::seasocks::SeasocksLogger>(
+ ::seasocks::Logger::Level::Info)),
+ websocket_handler_(
+ new WebsocketHandler(&server_, event_loop, buffer_size)) {
+ server_.addWebSocketHandler("/ws", websocket_handler_);
+ CHECK(server_.startListening(8080));
+
+ epoll->OnReadable(server_.fd(), [this]() {
+ CHECK(::seasocks::Server::PollResult::Continue == server_.poll(0));
+ });
+
+ if (&internal_epoll_ == epoll) {
+ TimerHandler *const timer = event_loop->AddTimer([this]() {
+ // Run the epoll poller until there are no more events (if we are being
+ // backed by a shm event loop, there won't be anything registered to
+ // internal_epoll_ and this will just return false).
+ // We just deal with clearing all the epoll events using a simulated
+ // timer. This does mean that we will spin rather than actually sleeping
+ // in any coherent manner, which will be particularly noticeable when past
+ // the end of processing other events.
+ while (internal_epoll_.Poll(false)) {
+ continue;
+ }
+ });
+
+ event_loop->OnRun([timer, event_loop]() {
+ timer->Setup(event_loop->monotonic_now(), std::chrono::milliseconds(10));
+ });
+ }
+}
+
+WebProxy::~WebProxy() {
+ epoll_->DeleteFd(server_.fd());
+ server_.terminate();
+ CHECK(::seasocks::Server::PollResult::Terminated == server_.poll(0));
+}
+
void Subscriber::RunIteration() {
- if (channels_.empty()) {
+ // TODO(james): The channels_ struct gets accessed here, but modified by
+ // Add/RemoveListener, which are called from separate threads.
+ if (channels_.empty() && buffer_size_ == 0) {
return;
}
- fetcher_->Fetch();
- VLOG(2) << "Sending a message with " << GetPacketCount(fetcher_->context())
- << "packets";
- for (int packet_index = 0; packet_index < GetPacketCount(fetcher_->context());
- ++packet_index) {
- flatbuffers::Offset<MessageHeader> message =
- PackMessage(&fbb_, fetcher_->context(), channel_index_, packet_index);
- fbb_.Finish(message);
+ while (fetcher_->FetchNext()) {
+ // If we aren't building up a buffer, short-circuit the FetchNext().
+ if (buffer_size_ == 0) {
+ fetcher_->Fetch();
+ }
+ Message message;
+ message.index = fetcher_->context().queue_index;
+ VLOG(2) << "Packing a message with " << GetPacketCount(fetcher_->context())
+ << "packets";
+ for (int packet_index = 0;
+ packet_index < GetPacketCount(fetcher_->context()); ++packet_index) {
+ flatbuffers::Offset<MessageHeader> message_offset =
+ PackMessage(&fbb_, fetcher_->context(), channel_index_, packet_index);
+ fbb_.Finish(message_offset);
- const flatbuffers::DetachedBuffer buffer = fbb_.Release();
+ const flatbuffers::DetachedBuffer buffer = fbb_.Release();
- webrtc::DataBuffer data_buffer(
- rtc::CopyOnWriteBuffer(buffer.data(), buffer.size()),
- true /* binary array */);
- for (auto conn : channels_) {
- if (conn->buffered_amount() > 14000000) {
+ message.data.emplace_back(
+ rtc::CopyOnWriteBuffer(buffer.data(), buffer.size()),
+ true /* binary array */);
+ }
+ message_buffer_.push_back(std::move(message));
+ }
+ for (auto &conn : channels_) {
+ rtc::scoped_refptr<webrtc::DataChannelInterface> rtc_channel = conn.first;
+ ChannelInformation *channel_data = &conn.second;
+ if (channel_data->transfer_method == TransferMethod::SUBSAMPLE) {
+ SkipToLastMessage(channel_data);
+ }
+ const webrtc::DataBuffer *buffer = NextBuffer(channel_data);
+ while (buffer != nullptr) {
+ if (rtc_channel->buffered_amount() > 14000000) {
VLOG(1) << "skipping a send because buffered amount is too high";
- continue;
+ break;
}
- conn->Send(data_buffer);
+ // TODO(james): This Send() should be called from the signalling_thread
+ // created by the Connection.
+ rtc_channel->Send(*buffer);
+ buffer = NextBuffer(channel_data);
+ }
+ }
+ if (buffer_size_ >= 0) {
+ while (message_buffer_.size() > static_cast<size_t>(buffer_size_)) {
+ message_buffer_.pop_front();
}
}
}
@@ -107,17 +176,80 @@
fetcher_->channel()->type()->string_view();
}
+void Subscriber::AddListener(
+ rtc::scoped_refptr<webrtc::DataChannelInterface> channel,
+ TransferMethod transfer_method) {
+ ChannelInformation info;
+ info.transfer_method = transfer_method;
+ channels_.emplace(channel, info);
+}
+
+const webrtc::DataBuffer *Subscriber::NextBuffer(ChannelInformation *channel) {
+ CHECK_NOTNULL(channel);
+ if (message_buffer_.empty()) {
+ return nullptr;
+ }
+ const uint32_t earliest_index = message_buffer_.front().index;
+ const uint32_t latest_index = message_buffer_.back().index;
+ const bool fell_behind = channel->current_queue_index < earliest_index;
+ if (fell_behind) {
+ channel->current_queue_index = earliest_index;
+ channel->next_packet_number = 0;
+ return &message_buffer_.front().data.at(0);
+ }
+ if (channel->current_queue_index > latest_index) {
+ // We are still waiting on the next message to appear; return.
+ return nullptr;
+ }
+ CHECK_EQ(latest_index - earliest_index + 1, message_buffer_.size())
+ << "Inconsistent queue indices.";
+ const size_t packets_in_message =
+ message_buffer_[channel->current_queue_index - earliest_index]
+ .data.size();
+ CHECK_LT(0u, packets_in_message);
+ CHECK_LT(channel->next_packet_number, packets_in_message);
+
+ const webrtc::DataBuffer *data =
+ &message_buffer_[channel->current_queue_index - earliest_index].data.at(
+ channel->next_packet_number);
+
+ ++channel->next_packet_number;
+ if (channel->next_packet_number == packets_in_message) {
+ ++channel->current_queue_index;
+ channel->next_packet_number = 0;
+ }
+
+ return data;
+}
+
+void Subscriber::SkipToLastMessage(ChannelInformation *channel) {
+ CHECK_NOTNULL(channel);
+ if (message_buffer_.empty() ||
+ channel->current_queue_index == message_buffer_.back().index) {
+ return;
+ }
+ channel->current_queue_index = message_buffer_.back().index;
+ channel->next_packet_number = 0;
+}
+
+void Subscriber::RemoveListener(
+ rtc::scoped_refptr<webrtc::DataChannelInterface> channel) {
+ channels_.erase(channel);
+}
+
Connection::Connection(
::seasocks::WebSocket *sock, ::seasocks::Server *server,
const std::vector<std::unique_ptr<Subscriber>> &subscribers,
- const aos::FlatbufferDetachedBuffer<aos::Configuration> &config)
+ const aos::FlatbufferDetachedBuffer<aos::Configuration> &config,
+ const EventLoop *event_loop)
: sock_(sock),
server_(server),
subscribers_(subscribers),
- config_headers_(PackBuffer(config.span())) {}
+ config_headers_(PackBuffer(config.span())),
+ event_loop_(event_loop) {}
-// Function called for web socket data. Parses the flatbuffer and handles it
-// appropriately.
+// Function called for web socket data. Parses the flatbuffer and
+// handles it appropriately.
void Connection::HandleWebSocketData(const uint8_t *data, size_t size) {
const WebSocketMessage *message =
flatbuffers::GetRoot<WebSocketMessage>(data);
@@ -139,8 +271,9 @@
break;
}
- // We can only start creating the PeerConnection once we have something to
- // give it, so we wait until we get an offer before starting.
+ // We can only start creating the PeerConnection once we have
+ // something to give it, so we wait until we get an offer before
+ // starting.
webrtc::PeerConnectionInterface::RTCConfiguration config;
config.sdp_semantics = webrtc::SdpSemantics::kUnifiedPlan;
config.enable_dtls_srtp = true;
@@ -180,7 +313,9 @@
peer_connection_->AddIceCandidate(ice_candidate);
break;
}
- default: { break; }
+ default: {
+ break;
+ }
}
}
@@ -235,7 +370,7 @@
void Connection::OnStateChange() {
if (peer_connection_.get() != nullptr &&
data_channel_->state() == webrtc::DataChannelInterface::kOpen) {
- for (const auto &header: config_headers_) {
+ for (const auto &header : config_headers_) {
Send(header.buffer());
}
}
@@ -244,13 +379,39 @@
// Handle DataChannel messages. Subscribe to each listener that matches the
// subscribe message
void Connection::OnMessage(const webrtc::DataBuffer &buffer) {
- const message_bridge::Connect *message =
- flatbuffers::GetRoot<message_bridge::Connect>(buffer.data.data());
- VLOG(2) << "Got a connect message " << aos::FlatbufferToJson(message);
+ FlatbufferSpan<SubscriberRequest> message(
+ {buffer.data.data(), buffer.data.size()});
+ if (!message.Verify()) {
+ LOG(ERROR) << "Invalid flatbuffer received from browser client.";
+ return;
+ }
+ VLOG(2) << "Got a subscription message "
+ << aos::FlatbufferToJson(&message.message());
+ if (!message.message().has_channels_to_transfer()) {
+ LOG(ERROR) << "No channels requested for transfer.";
+ return;
+ }
for (auto &subscriber : subscribers_) {
bool found_match = false;
- for (auto channel : *message->channels_to_transfer()) {
- if (subscriber->Compare(channel)) {
+ for (auto channel_request : *message.message().channels_to_transfer()) {
+ const Channel *channel = channel_request->channel();
+ if (channel == nullptr) {
+ LOG(ERROR) << "Got unpopulated channel.";
+ continue;
+ }
+ const TransferMethod transfer_method = channel_request->method();
+ // Call GetChannel() before comparing the channel name/type to each
+ // subscriber. This allows us to resolve any node or application specific
+ // mappings.
+ const Channel *comparison_channel =
+ configuration::GetChannel(event_loop_->configuration(), channel,
+ event_loop_->name(), event_loop_->node());
+ if (comparison_channel == nullptr) {
+ LOG(ERROR) << "Channel not available: "
+ << configuration::StrippedChannelToString(channel);
+ continue;
+ }
+ if (subscriber->Compare(comparison_channel)) {
int index = subscriber->index();
auto it = channels_.find(index);
if (it == channels_.end()) {
@@ -260,7 +421,7 @@
nullptr)});
it = pair.first;
}
- subscriber->AddListener(it->second);
+ subscriber->AddListener(it->second, transfer_method);
VLOG(1) << "Subscribe to: " << channel->type()->str();
found_match = true;
diff --git a/aos/network/web_proxy.fbs b/aos/network/web_proxy.fbs
index e11ff26..ed72e97 100644
--- a/aos/network/web_proxy.fbs
+++ b/aos/network/web_proxy.fbs
@@ -61,3 +61,18 @@
// Time at which the message was sent, in nanoseconds.
monotonic_sent_time:long (id: 6);
}
+
+enum TransferMethod : byte {
+ SUBSAMPLE,
+ EVERYTHING_WITH_HISTORY,
+}
+
+table ChannelRequest {
+ channel:Channel (id: 0);
+ method:TransferMethod (id: 1);
+}
+
+table SubscriberRequest {
+ // The channels that we want transfered to this client.
+ channels_to_transfer:[ChannelRequest] (id: 0);
+}
diff --git a/aos/network/web_proxy.h b/aos/network/web_proxy.h
index 749e8fc..85cfd6f 100644
--- a/aos/network/web_proxy.h
+++ b/aos/network/web_proxy.h
@@ -3,6 +3,7 @@
#include <map>
#include <set>
#include "aos/events/event_loop.h"
+#include "aos/events/shm_event_loop.h"
#include "aos/network/connect_generated.h"
#include "aos/network/web_proxy_generated.h"
#include "aos/seasocks/seasocks_logger.h"
@@ -24,7 +25,8 @@
// websocket closes, it deletes the Connection.
class WebsocketHandler : public ::seasocks::WebSocket::Handler {
public:
- WebsocketHandler(::seasocks::Server *server, aos::EventLoop *event_loop);
+ WebsocketHandler(::seasocks::Server *server, aos::EventLoop *event_loop,
+ int buffer_size);
void onConnect(::seasocks::WebSocket *sock) override;
void onData(::seasocks::WebSocket *sock, const uint8_t *data,
size_t size) override;
@@ -35,6 +37,27 @@
::seasocks::Server *server_;
std::vector<std::unique_ptr<Subscriber>> subscribers_;
const aos::FlatbufferDetachedBuffer<aos::Configuration> config_;
+
+ const EventLoop *const event_loop_;
+};
+
+// Wrapper class that manages the seasocks server and WebsocketHandler.
+class WebProxy {
+ public:
+ WebProxy(aos::EventLoop *event_loop, int buffer_size);
+ WebProxy(aos::ShmEventLoop *event_loop, int buffer_size);
+ ~WebProxy();
+
+ void SetDataPath(const char *path) { server_.setStaticPath(path); }
+
+ private:
+ WebProxy(aos::EventLoop *event_loop, aos::internal::EPoll *epoll,
+ int buffer_size);
+
+ aos::internal::EPoll internal_epoll_;
+ aos::internal::EPoll *const epoll_;
+ ::seasocks::Server server_;
+ std::shared_ptr<WebsocketHandler> websocket_handler_;
};
// Seasocks requires that sends happen on the correct thread. This class takes a
@@ -59,25 +82,27 @@
// Represents a fetcher and all the Connections that care about it.
// Handles building the message and telling each connection to send it.
// indexed by location of the channel it handles in the config.
-// TODO(james): Make it so that Subscriber can optionally maintain an infinite
-// backlog of messages.
+// Subscriber also uses an internal buffer to store past messages. This is
+// primarily meant for use in offline log replay/simulation where we want to be
+// able to store infinite buffers. In the future, we will probably want to be
+// able to specify *which* channels to store buffers for so that we aren't just
+// loading the entire logfile into memory.
class Subscriber {
public:
- Subscriber(std::unique_ptr<RawFetcher> fetcher, int channel_index)
+ Subscriber(std::unique_ptr<RawFetcher> fetcher, int channel_index,
+ int buffer_size)
: fbb_(1024),
fetcher_(std::move(fetcher)),
- channel_index_(channel_index) {}
+ channel_index_(channel_index),
+ buffer_size_(buffer_size) {}
void RunIteration();
- void AddListener(rtc::scoped_refptr<webrtc::DataChannelInterface> channel) {
- channels_.insert(channel);
- }
+ void AddListener(rtc::scoped_refptr<webrtc::DataChannelInterface> channel,
+ TransferMethod transfer_method);
void RemoveListener(
- rtc::scoped_refptr<webrtc::DataChannelInterface> channel) {
- channels_.erase(channel);
- }
+ rtc::scoped_refptr<webrtc::DataChannelInterface> channel);
// Check if the Channel passed matches the channel this fetchs.
bool Compare(const Channel *channel) const;
@@ -85,10 +110,26 @@
int index() const { return channel_index_; }
private:
+ struct ChannelInformation {
+ TransferMethod transfer_method;
+ uint32_t current_queue_index = 0;
+ size_t next_packet_number = 0;
+ };
+ struct Message {
+ uint32_t index = 0xffffffff;
+ std::vector<webrtc::DataBuffer> data;
+ };
+
+ const webrtc::DataBuffer *NextBuffer(ChannelInformation *channel);
+ void SkipToLastMessage(ChannelInformation *channel);
+
flatbuffers::FlatBufferBuilder fbb_;
std::unique_ptr<RawFetcher> fetcher_;
int channel_index_;
- std::set<rtc::scoped_refptr<webrtc::DataChannelInterface>> channels_;
+ int buffer_size_;
+ std::deque<Message> message_buffer_;
+ std::map<rtc::scoped_refptr<webrtc::DataChannelInterface>, ChannelInformation>
+ channels_;
};
// Represents a single connection to a browser for the entire lifetime of the
@@ -99,7 +140,8 @@
public:
Connection(::seasocks::WebSocket *sock, ::seasocks::Server *server,
const std::vector<std::unique_ptr<Subscriber>> &subscribers,
- const aos::FlatbufferDetachedBuffer<aos::Configuration> &config);
+ const aos::FlatbufferDetachedBuffer<aos::Configuration> &config,
+ const EventLoop *event_loop);
~Connection() {
// DataChannel may call OnStateChange after this is destroyed, so make sure
@@ -154,6 +196,8 @@
rtc::scoped_refptr<webrtc::PeerConnectionInterface> peer_connection_;
rtc::scoped_refptr<webrtc::DataChannelInterface> data_channel_;
+
+ const EventLoop *const event_loop_;
};
} // namespace web_proxy
diff --git a/aos/network/web_proxy_main.cc b/aos/network/web_proxy_main.cc
index 78528ea..ddab5dc 100644
--- a/aos/network/web_proxy_main.cc
+++ b/aos/network/web_proxy_main.cc
@@ -11,6 +11,7 @@
DEFINE_string(config, "./config.json", "File path of aos configuration");
DEFINE_string(data_dir, "www", "Directory to serve data files from");
+DEFINE_int32(buffer_size, 0, "-1 if infinite, in # of messages / channel.");
int main(int argc, char **argv) {
aos::InitGoogle(&argc, &argv);
@@ -22,14 +23,8 @@
aos::ShmEventLoop event_loop(&config.message());
- seasocks::Server server(std::shared_ptr<seasocks::Logger>(
- new aos::seasocks::SeasocksLogger(seasocks::Logger::Level::Info)));
+ aos::web_proxy::WebProxy web_proxy(&event_loop, FLAGS_buffer_size);
+ web_proxy.SetDataPath(FLAGS_data_dir.c_str());
- auto websocket_handler =
- std::make_shared<aos::web_proxy::WebsocketHandler>(&server, &event_loop);
- server.addWebSocketHandler("/ws", websocket_handler);
-
- std::thread data_thread{[&event_loop]() { event_loop.Run(); }};
-
- server.serve(FLAGS_data_dir.c_str(), 8080);
+ event_loop.Run();
}
diff --git a/aos/network/www/BUILD b/aos/network/www/BUILD
index 8142e24..03e269d 100644
--- a/aos/network/www/BUILD
+++ b/aos/network/www/BUILD
@@ -112,6 +112,7 @@
":reflection_ts",
"//aos:configuration_ts_fbs",
"//aos/network:connect_ts_fbs",
+ "//aos/network:web_proxy_ts_fbs",
"@com_github_google_flatbuffers//ts:flatbuffers_ts",
],
)
@@ -163,3 +164,14 @@
],
target_compatible_with = ["@platforms//os:linux"],
)
+
+filegroup(
+ name = "plotting_sample",
+ srcs = [
+ "graph.html",
+ "graph_main_bundle.min.js",
+ "styles.css",
+ "test_config",
+ ],
+ visibility = ["//visibility:public"],
+)
diff --git a/aos/network/www/config_handler.ts b/aos/network/www/config_handler.ts
index a65d2fa..81f61b4 100644
--- a/aos/network/www/config_handler.ts
+++ b/aos/network/www/config_handler.ts
@@ -1,12 +1,14 @@
import * as configuration from 'org_frc971/aos/configuration_generated';
-import * as connect from 'org_frc971/aos/network/connect_generated';
+import {Connection} from 'org_frc971/aos/network/www/proxy';
import * as flatbuffers_builder from 'org_frc971/external/com_github_google_flatbuffers/ts/builder';
+import * as web_proxy from 'org_frc971/aos/network/web_proxy_generated';
-import {Connection} from './proxy';
import Configuration = configuration.aos.Configuration;
import Channel = configuration.aos.Channel;
-import Connect = connect.aos.message_bridge.Connect;
+import SubscriberRequest = web_proxy.aos.web_proxy.SubscriberRequest;
+import ChannelRequest = web_proxy.aos.web_proxy.ChannelRequest;
+import TransferMethod = web_proxy.aos.web_proxy.TransferMethod;
export class ConfigHandler {
private readonly root_div = document.createElement('div');
@@ -76,15 +78,18 @@
Channel.addName(builder, namefb);
Channel.addType(builder, typefb);
const channelfb = Channel.endChannel(builder);
- channels.push(channelfb);
+ ChannelRequest.startChannelRequest(builder);
+ ChannelRequest.addChannel(builder, channelfb);
+ ChannelRequest.addMethod(builder, TransferMethod.SUBSAMPLE);
+ channels.push(ChannelRequest.endChannelRequest(builder));
}
const channelsfb =
- Connect.createChannelsToTransferVector(builder, channels);
- Connect.startConnect(builder);
- Connect.addChannelsToTransfer(builder, channelsfb);
- const connect = Connect.endConnect(builder);
- builder.finish(connect);
+ SubscriberRequest.createChannelsToTransferVector(builder, channels);
+ SubscriberRequest.startSubscriberRequest(builder);
+ SubscriberRequest.addChannelsToTransfer(builder, channelsfb);
+ const request = SubscriberRequest.endSubscriberRequest(builder);
+ builder.finish(request);
this.connection.sendConnectMessage(builder);
}
diff --git a/aos/network/www/graph_main.ts b/aos/network/www/graph_main.ts
index b2286d1..9135cf4 100644
--- a/aos/network/www/graph_main.ts
+++ b/aos/network/www/graph_main.ts
@@ -9,9 +9,9 @@
// it is already being published by the web proxy process, so the demo requires
// very little setup).
import * as configuration from 'org_frc971/aos/configuration_generated';
-import * as connect from 'org_frc971/aos/network/connect_generated';
import {Line, Plot} from 'org_frc971/aos/network/www/plotter';
import * as proxy from 'org_frc971/aos/network/www/proxy';
+import * as web_proxy from 'org_frc971/aos/network/web_proxy_generated';
import * as reflection from 'org_frc971/aos/network/www/reflection'
import * as flatbuffers_builder from 'org_frc971/external/com_github_google_flatbuffers/ts/builder';
import {ByteBuffer} from 'org_frc971/external/com_github_google_flatbuffers/ts/byte-buffer';
@@ -21,7 +21,9 @@
import Configuration = configuration.aos.Configuration;
import Parser = reflection.Parser;
import Table = reflection.Table;
-import Connect = connect.aos.message_bridge.Connect;
+import SubscriberRequest = web_proxy.aos.web_proxy.SubscriberRequest;
+import ChannelRequest = web_proxy.aos.web_proxy.ChannelRequest;
+import TransferMethod = web_proxy.aos.web_proxy.TransferMethod;
const width = 900;
const height = 400;
@@ -88,6 +90,7 @@
this.averagePoints.push(parser.readScalar(statsTable, "average") * 1000);
+ // TODO: These memory allocations absolutely kill performance.
this.max.setPoints(new Float32Array(this.maxPoints));
this.min.setPoints(new Float32Array(this.minPoints));
this.average.setPoints(new Float32Array(this.averagePoints));
@@ -143,13 +146,16 @@
Channel.addName(builder, nameFb);
Channel.addType(builder, typeFb);
const channelFb = Channel.endChannel(builder);
- channels.push(channelFb);
+ ChannelRequest.startChannelRequest(builder);
+ ChannelRequest.addChannel(builder, channelFb);
+ ChannelRequest.addMethod(builder, TransferMethod.EVERYTHING_WITH_HISTORY);
+ channels.push(ChannelRequest.endChannelRequest(builder));
}
- const channelsFb = Connect.createChannelsToTransferVector(builder, channels);
- Connect.startConnect(builder);
- Connect.addChannelsToTransfer(builder, channelsFb);
- const connect = Connect.endConnect(builder);
+ const channelsFb = SubscriberRequest.createChannelsToTransferVector(builder, channels);
+ SubscriberRequest.startSubscriberRequest(builder);
+ SubscriberRequest.addChannelsToTransfer(builder, channelsFb);
+ const connect = SubscriberRequest.endSubscriberRequest(builder);
builder.finish(connect);
conn.sendConnectMessage(builder);
});
diff --git a/aos/network/www/log_web_proxy_demo.sh b/aos/network/www/log_web_proxy_demo.sh
new file mode 100755
index 0000000..e4e97bb
--- /dev/null
+++ b/aos/network/www/log_web_proxy_demo.sh
@@ -0,0 +1 @@
+./aos/network/log_web_proxy_main --config=aos/network/www/test_config.json --data_dir=aos/network/www $@
diff --git a/aos/network/www/plotter.ts b/aos/network/www/plotter.ts
index c10ff80..d33953c 100644
--- a/aos/network/www/plotter.ts
+++ b/aos/network/www/plotter.ts
@@ -35,8 +35,8 @@
private _drawLine: boolean = true;
private _pointSize: number = 3.0;
private _hasUpdate: boolean = false;
- private _minValues: number[] = [0.0, 0.0];
- private _maxValues: number[] = [0.0, 0.0];
+ private _minValues: number[] = [Infinity, Infinity];
+ private _maxValues: number[] = [-Infinity, -Infinity];
private _color: number[] = [1.0, 0.0, 0.0];
private pointAttribLocation: number;
private colorLocation: WebGLUniformLocation | null;
@@ -121,6 +121,10 @@
const x = this.points[ii];
const y = this.points[ii + 1];
+ if (isNaN(x) || isNaN(y)) {
+ continue;
+ }
+
this._minValues = cwiseOp(this._minValues, [x, y], Math.min);
this._maxValues = cwiseOp(this._maxValues, [x, y], Math.max);
}
@@ -497,6 +501,9 @@
for (let line of this.lines) {
minValues = cwiseOp(minValues, line.minValues(), Math.min);
}
+ if (!isFinite(minValues[0]) || !isFinite(minValues[1])) {
+ return [0, 0];
+ }
return minValues;
}
@@ -505,6 +512,9 @@
for (let line of this.lines) {
maxValues = cwiseOp(maxValues, line.maxValues(), Math.max);
}
+ if (!isFinite(maxValues[0]) || !isFinite(maxValues[1])) {
+ return [0, 0];
+ }
return maxValues;
}
@@ -729,10 +739,12 @@
private lastMousePosition: number[] = [0.0, 0.0];
private autoFollow: boolean = true;
private linkedXAxes: Plot[] = [];
+ private lastTimeMs: number = 0;
constructor(wrapperDiv: HTMLDivElement, width: number, height: number) {
wrapperDiv.appendChild(this.canvas);
wrapperDiv.appendChild(this.textCanvas);
+ this.lastTimeMs = (new Date()).getTime();
this.canvas.width =
width - this.axisLabelBuffer.left - this.axisLabelBuffer.right;
@@ -838,6 +850,9 @@
}
setZoom(scale: number[], offset: number[]) {
+ if (!isFinite(scale[0]) || !isFinite(scale[1])) {
+ throw new Error("Doesn't support non-finite scales due to singularities.");
+ }
const x_pressed = Plot.keysPressed["x"];
const y_pressed = Plot.keysPressed["y"];
const zoom = this.drawer.getZoom();
@@ -875,7 +890,17 @@
}
resetZoom() {
- this.setZoomCorners(this.drawer.minValues(), this.drawer.maxValues());
+ const minValues = this.drawer.minValues();
+ const maxValues = this.drawer.maxValues();
+ if (minValues[0] == maxValues[0]) {
+ minValues[0] -= 1;
+ maxValues[0] += 1;
+ }
+ if (minValues[1] == maxValues[1]) {
+ minValues[1] -= 1;
+ maxValues[1] += 1;
+ }
+ this.setZoomCorners(minValues, maxValues);
this.autoFollow = true;
for (let plot of this.linkedXAxes) {
plot.autoFollow = true;
@@ -892,6 +917,9 @@
draw() {
window.requestAnimationFrame(() => this.draw());
+ const curTime = (new Date()).getTime();
+ const frameRate = 1000.0 / (curTime - this.lastTimeMs);
+ this.lastTimeMs = curTime;
// Clear the overlay.
const textCtx = this.textCanvas.getContext("2d");
diff --git a/aos/network/www/proxy.ts b/aos/network/www/proxy.ts
index 4952a11..5660cef 100644
--- a/aos/network/www/proxy.ts
+++ b/aos/network/www/proxy.ts
@@ -1,4 +1,3 @@
-import {ConfigHandler} from './config_handler';
import * as configuration from 'org_frc971/aos/configuration_generated';
import * as web_proxy from 'org_frc971/aos/network/web_proxy_generated';
import {Builder} from 'org_frc971/external/com_github_google_flatbuffers/ts/builder';
diff --git a/y2020/www/BUILD b/y2020/www/BUILD
index bbad63b..2f26122 100644
--- a/y2020/www/BUILD
+++ b/y2020/www/BUILD
@@ -13,6 +13,7 @@
deps = [
"//aos:configuration_ts_fbs",
"//aos/network:connect_ts_fbs",
+ "//aos/network:web_proxy_ts_fbs",
"//aos/network/www:proxy",
"//y2020/vision:vision_ts_fbs",
"//y2020/vision/sift:sift_ts_fbs",
@@ -31,6 +32,7 @@
deps = [
"//aos:configuration_ts_fbs",
"//aos/network:connect_ts_fbs",
+ "//aos/network:web_proxy_ts_fbs",
"//aos/network/www:proxy",
"//frc971/control_loops/drivetrain:drivetrain_status_ts_fbs",
"//y2020/vision/sift:sift_ts_fbs",
diff --git a/y2020/www/camera_main.ts b/y2020/www/camera_main.ts
index 8667a8e..8c7ae55 100644
--- a/y2020/www/camera_main.ts
+++ b/y2020/www/camera_main.ts
@@ -1,7 +1,6 @@
import {Connection} from 'org_frc971/aos/network/www/proxy';
import {ImageHandler} from './image_handler';
-import {ConfigHandler} from 'org_frc971/aos/network/www/config_handler';
const conn = new Connection();
diff --git a/y2020/www/field_handler.ts b/y2020/www/field_handler.ts
index c0b6c50..d3bafa2 100644
--- a/y2020/www/field_handler.ts
+++ b/y2020/www/field_handler.ts
@@ -1,15 +1,17 @@
import * as configuration from 'org_frc971/aos/configuration_generated';
-import * as connect from 'org_frc971/aos/network/connect_generated';
import {Connection} from 'org_frc971/aos/network/www/proxy';
import * as flatbuffers_builder from 'org_frc971/external/com_github_google_flatbuffers/ts/builder';
import {ByteBuffer} from 'org_frc971/external/com_github_google_flatbuffers/ts/byte-buffer';
import * as drivetrain from 'org_frc971/frc971/control_loops/drivetrain/drivetrain_status_generated';
import * as sift from 'org_frc971/y2020/vision/sift/sift_generated';
+import * as web_proxy from 'org_frc971/aos/network/web_proxy_generated';
import DrivetrainStatus = drivetrain.frc971.control_loops.drivetrain.Status;
import ImageMatchResult = sift.frc971.vision.sift.ImageMatchResult;
-import Connect = connect.aos.message_bridge.Connect;
import Channel = configuration.aos.Channel;
+import SubscriberRequest = web_proxy.aos.web_proxy.SubscriberRequest;
+import ChannelRequest = web_proxy.aos.web_proxy.ChannelRequest;
+import TransferMethod = web_proxy.aos.web_proxy.TransferMethod;
import {FIELD_LENGTH, FIELD_WIDTH, FT_TO_M, IN_TO_M} from './constants';
@@ -132,14 +134,17 @@
Channel.addName(builder, nameFb);
Channel.addType(builder, typeFb);
const channelFb = Channel.endChannel(builder);
- channels.push(channelFb);
+ ChannelRequest.startChannelRequest(builder);
+ ChannelRequest.addChannel(builder, channelFb);
+ ChannelRequest.addMethod(builder, TransferMethod.SUBSAMPLE);
+ channels.push(ChannelRequest.endChannelRequest(builder));
}
const channelsFb =
- Connect.createChannelsToTransferVector(builder, channels);
- Connect.startConnect(builder);
- Connect.addChannelsToTransfer(builder, channelsFb);
- const connect = Connect.endConnect(builder);
+ SubscriberRequest.createChannelsToTransferVector(builder, channels);
+ SubscriberRequest.startSubscriberRequest(builder);
+ SubscriberRequest.addChannelsToTransfer(builder, channelsFb);
+ const connect = SubscriberRequest.endSubscriberRequest(builder);
builder.finish(connect);
this.connection.sendConnectMessage(builder);
}
diff --git a/y2020/www/image_handler.ts b/y2020/www/image_handler.ts
index 121ab75..79ce74e 100644
--- a/y2020/www/image_handler.ts
+++ b/y2020/www/image_handler.ts
@@ -1,18 +1,20 @@
import * as configuration from 'org_frc971/aos/configuration_generated';
-import * as connect from 'org_frc971/aos/network/connect_generated';
import {Connection} from 'org_frc971/aos/network/www/proxy';
import * as flatbuffers_builder from 'org_frc971/external/com_github_google_flatbuffers/ts/builder';
import {ByteBuffer} from 'org_frc971/external/com_github_google_flatbuffers/ts/byte-buffer';
import {Long} from 'org_frc971/external/com_github_google_flatbuffers/ts/long';
import * as sift from 'org_frc971/y2020/vision/sift/sift_generated'
import * as vision from 'org_frc971/y2020/vision/vision_generated';
+import * as web_proxy from 'org_frc971/aos/network/web_proxy_generated';
import Channel = configuration.aos.Channel;
import Configuration = configuration.aos.Configuration;
-import Connect = connect.aos.message_bridge.Connect;
import CameraImage = vision.frc971.vision.CameraImage;
import ImageMatchResult = sift.frc971.vision.sift.ImageMatchResult;
import Feature = sift.frc971.vision.sift.Feature;
+import SubscriberRequest = web_proxy.aos.web_proxy.SubscriberRequest;
+import ChannelRequest = web_proxy.aos.web_proxy.ChannelRequest;
+import TransferMethod = web_proxy.aos.web_proxy.TransferMethod;
/*
* All the messages that are required to show an image with metadata.
@@ -106,14 +108,17 @@
Channel.addName(builder, nameFb);
Channel.addType(builder, typeFb);
const channelFb = Channel.endChannel(builder);
- channels.push(channelFb);
+ ChannelRequest.startChannelRequest(builder);
+ ChannelRequest.addChannel(builder, channelFb);
+ ChannelRequest.addMethod(builder, TransferMethod.SUBSAMPLE);
+ channels.push(ChannelRequest.endChannelRequest(builder));
}
const channelsFb =
- Connect.createChannelsToTransferVector(builder, channels);
- Connect.startConnect(builder);
- Connect.addChannelsToTransfer(builder, channelsFb);
- const connect = Connect.endConnect(builder);
+ SubscriberRequest.createChannelsToTransferVector(builder, channels);
+ SubscriberRequest.startSubscriberRequest(builder);
+ SubscriberRequest.addChannelsToTransfer(builder, channelsFb);
+ const connect = SubscriberRequest.endSubscriberRequest(builder);
builder.finish(connect);
this.connection.sendConnectMessage(builder);
}