Add queue buffers & simulation support to web proxy
This makes a couple of major changes:
-Directly uses EPoll class for managing Seasocks events.
-Adds buffers of queues to web proxy Subscribers so that we can
transfering data losslessly in log replay.
-Modifies the flatbuffer used for the RTC communications so that
the webpage can specify whether it wants every message or subsampled
messages.
-Adds an option to LogReader to let us run past the end of the logfile.
Note that these changes do mean that, for log replay, the web proxy will
load the *entire* logfile into memory. Future changes can optimize this
to, e.g., only load the required channels into memory.
Change-Id: I74e7608c30baa8b36e05c4ab50e12a54bf75aa4c
diff --git a/aos/network/web_proxy.cc b/aos/network/web_proxy.cc
index 507f73f..7c34b22 100644
--- a/aos/network/web_proxy.cc
+++ b/aos/network/web_proxy.cc
@@ -4,6 +4,7 @@
#include "aos/network/connect_generated.h"
#include "aos/network/web_proxy_generated.h"
#include "aos/network/web_proxy_utils.h"
+#include "aos/seasocks/seasocks_logger.h"
#include "api/create_peerconnection_factory.h"
#include "glog/logging.h"
@@ -27,24 +28,20 @@
} // namespace
WebsocketHandler::WebsocketHandler(::seasocks::Server *server,
- aos::EventLoop *event_loop)
+ aos::EventLoop *event_loop, int buffer_size)
: server_(server),
- config_(aos::CopyFlatBuffer(event_loop->configuration())) {
- const bool is_multi_node =
- aos::configuration::MultiNode(event_loop->configuration());
- const aos::Node *self =
- is_multi_node ? aos::configuration::GetMyNode(event_loop->configuration())
- : nullptr;
+ config_(aos::CopyFlatBuffer(event_loop->configuration())),
+ event_loop_(event_loop) {
+ const aos::Node *self = event_loop->node();
for (uint i = 0; i < event_loop->configuration()->channels()->size(); ++i) {
auto channel = event_loop->configuration()->channels()->Get(i);
if (aos::configuration::ChannelIsReadableOnNode(channel, self)) {
auto fetcher = event_loop->MakeRawFetcher(channel);
- subscribers_.emplace_back(
- std::make_unique<aos::web_proxy::Subscriber>(std::move(fetcher), i));
+ subscribers_.emplace_back(std::make_unique<aos::web_proxy::Subscriber>(
+ std::move(fetcher), i, buffer_size));
}
}
-
TimerHandler *const timer = event_loop->AddTimer([this]() {
for (auto &subscriber : subscribers_) {
subscriber->RunIteration();
@@ -57,8 +54,8 @@
}
void WebsocketHandler::onConnect(::seasocks::WebSocket *sock) {
- std::unique_ptr<Connection> conn =
- std::make_unique<Connection>(sock, server_, subscribers_, config_);
+ std::unique_ptr<Connection> conn = std::make_unique<Connection>(
+ sock, server_, subscribers_, config_, event_loop_);
connections_.insert({sock, std::move(conn)});
}
@@ -71,31 +68,103 @@
connections_.erase(sock);
}
+WebProxy::WebProxy(aos::EventLoop *event_loop, int buffer_size)
+ : WebProxy(event_loop, &internal_epoll_, buffer_size) {}
+
+WebProxy::WebProxy(aos::ShmEventLoop *event_loop, int buffer_size)
+ : WebProxy(event_loop, event_loop->epoll(), buffer_size) {}
+
+WebProxy::WebProxy(aos::EventLoop *event_loop, aos::internal::EPoll *epoll,
+ int buffer_size)
+ : epoll_(epoll),
+ server_(std::make_shared<aos::seasocks::SeasocksLogger>(
+ ::seasocks::Logger::Level::Info)),
+ websocket_handler_(
+ new WebsocketHandler(&server_, event_loop, buffer_size)) {
+ server_.addWebSocketHandler("/ws", websocket_handler_);
+ CHECK(server_.startListening(8080));
+
+ epoll->OnReadable(server_.fd(), [this]() {
+ CHECK(::seasocks::Server::PollResult::Continue == server_.poll(0));
+ });
+
+ if (&internal_epoll_ == epoll) {
+ TimerHandler *const timer = event_loop->AddTimer([this]() {
+ // Run the epoll poller until there are no more events (if we are being
+ // backed by a shm event loop, there won't be anything registered to
+ // internal_epoll_ and this will just return false).
+ // We just deal with clearing all the epoll events using a simulated
+ // timer. This does mean that we will spin rather than actually sleeping
+ // in any coherent manner, which will be particularly noticeable when past
+ // the end of processing other events.
+ while (internal_epoll_.Poll(false)) {
+ continue;
+ }
+ });
+
+ event_loop->OnRun([timer, event_loop]() {
+ timer->Setup(event_loop->monotonic_now(), std::chrono::milliseconds(10));
+ });
+ }
+}
+
+WebProxy::~WebProxy() {
+ epoll_->DeleteFd(server_.fd());
+ server_.terminate();
+ CHECK(::seasocks::Server::PollResult::Terminated == server_.poll(0));
+}
+
void Subscriber::RunIteration() {
- if (channels_.empty()) {
+ // TODO(james): The channels_ struct gets accessed here, but modified by
+ // Add/RemoveListener, which are called from separate threads.
+ if (channels_.empty() && buffer_size_ == 0) {
return;
}
- fetcher_->Fetch();
- VLOG(2) << "Sending a message with " << GetPacketCount(fetcher_->context())
- << "packets";
- for (int packet_index = 0; packet_index < GetPacketCount(fetcher_->context());
- ++packet_index) {
- flatbuffers::Offset<MessageHeader> message =
- PackMessage(&fbb_, fetcher_->context(), channel_index_, packet_index);
- fbb_.Finish(message);
+ while (fetcher_->FetchNext()) {
+ // If we aren't building up a buffer, short-circuit the FetchNext().
+ if (buffer_size_ == 0) {
+ fetcher_->Fetch();
+ }
+ Message message;
+ message.index = fetcher_->context().queue_index;
+ VLOG(2) << "Packing a message with " << GetPacketCount(fetcher_->context())
+ << "packets";
+ for (int packet_index = 0;
+ packet_index < GetPacketCount(fetcher_->context()); ++packet_index) {
+ flatbuffers::Offset<MessageHeader> message_offset =
+ PackMessage(&fbb_, fetcher_->context(), channel_index_, packet_index);
+ fbb_.Finish(message_offset);
- const flatbuffers::DetachedBuffer buffer = fbb_.Release();
+ const flatbuffers::DetachedBuffer buffer = fbb_.Release();
- webrtc::DataBuffer data_buffer(
- rtc::CopyOnWriteBuffer(buffer.data(), buffer.size()),
- true /* binary array */);
- for (auto conn : channels_) {
- if (conn->buffered_amount() > 14000000) {
+ message.data.emplace_back(
+ rtc::CopyOnWriteBuffer(buffer.data(), buffer.size()),
+ true /* binary array */);
+ }
+ message_buffer_.push_back(std::move(message));
+ }
+ for (auto &conn : channels_) {
+ rtc::scoped_refptr<webrtc::DataChannelInterface> rtc_channel = conn.first;
+ ChannelInformation *channel_data = &conn.second;
+ if (channel_data->transfer_method == TransferMethod::SUBSAMPLE) {
+ SkipToLastMessage(channel_data);
+ }
+ const webrtc::DataBuffer *buffer = NextBuffer(channel_data);
+ while (buffer != nullptr) {
+ if (rtc_channel->buffered_amount() > 14000000) {
VLOG(1) << "skipping a send because buffered amount is too high";
- continue;
+ break;
}
- conn->Send(data_buffer);
+ // TODO(james): This Send() should be called from the signalling_thread
+ // created by the Connection.
+ rtc_channel->Send(*buffer);
+ buffer = NextBuffer(channel_data);
+ }
+ }
+ if (buffer_size_ >= 0) {
+ while (message_buffer_.size() > static_cast<size_t>(buffer_size_)) {
+ message_buffer_.pop_front();
}
}
}
@@ -107,17 +176,80 @@
fetcher_->channel()->type()->string_view();
}
+void Subscriber::AddListener(
+ rtc::scoped_refptr<webrtc::DataChannelInterface> channel,
+ TransferMethod transfer_method) {
+ ChannelInformation info;
+ info.transfer_method = transfer_method;
+ channels_.emplace(channel, info);
+}
+
+const webrtc::DataBuffer *Subscriber::NextBuffer(ChannelInformation *channel) {
+ CHECK_NOTNULL(channel);
+ if (message_buffer_.empty()) {
+ return nullptr;
+ }
+ const uint32_t earliest_index = message_buffer_.front().index;
+ const uint32_t latest_index = message_buffer_.back().index;
+ const bool fell_behind = channel->current_queue_index < earliest_index;
+ if (fell_behind) {
+ channel->current_queue_index = earliest_index;
+ channel->next_packet_number = 0;
+ return &message_buffer_.front().data.at(0);
+ }
+ if (channel->current_queue_index > latest_index) {
+ // We are still waiting on the next message to appear; return.
+ return nullptr;
+ }
+ CHECK_EQ(latest_index - earliest_index + 1, message_buffer_.size())
+ << "Inconsistent queue indices.";
+ const size_t packets_in_message =
+ message_buffer_[channel->current_queue_index - earliest_index]
+ .data.size();
+ CHECK_LT(0u, packets_in_message);
+ CHECK_LT(channel->next_packet_number, packets_in_message);
+
+ const webrtc::DataBuffer *data =
+ &message_buffer_[channel->current_queue_index - earliest_index].data.at(
+ channel->next_packet_number);
+
+ ++channel->next_packet_number;
+ if (channel->next_packet_number == packets_in_message) {
+ ++channel->current_queue_index;
+ channel->next_packet_number = 0;
+ }
+
+ return data;
+}
+
+void Subscriber::SkipToLastMessage(ChannelInformation *channel) {
+ CHECK_NOTNULL(channel);
+ if (message_buffer_.empty() ||
+ channel->current_queue_index == message_buffer_.back().index) {
+ return;
+ }
+ channel->current_queue_index = message_buffer_.back().index;
+ channel->next_packet_number = 0;
+}
+
+void Subscriber::RemoveListener(
+ rtc::scoped_refptr<webrtc::DataChannelInterface> channel) {
+ channels_.erase(channel);
+}
+
Connection::Connection(
::seasocks::WebSocket *sock, ::seasocks::Server *server,
const std::vector<std::unique_ptr<Subscriber>> &subscribers,
- const aos::FlatbufferDetachedBuffer<aos::Configuration> &config)
+ const aos::FlatbufferDetachedBuffer<aos::Configuration> &config,
+ const EventLoop *event_loop)
: sock_(sock),
server_(server),
subscribers_(subscribers),
- config_headers_(PackBuffer(config.span())) {}
+ config_headers_(PackBuffer(config.span())),
+ event_loop_(event_loop) {}
-// Function called for web socket data. Parses the flatbuffer and handles it
-// appropriately.
+// Function called for web socket data. Parses the flatbuffer and
+// handles it appropriately.
void Connection::HandleWebSocketData(const uint8_t *data, size_t size) {
const WebSocketMessage *message =
flatbuffers::GetRoot<WebSocketMessage>(data);
@@ -139,8 +271,9 @@
break;
}
- // We can only start creating the PeerConnection once we have something to
- // give it, so we wait until we get an offer before starting.
+ // We can only start creating the PeerConnection once we have
+ // something to give it, so we wait until we get an offer before
+ // starting.
webrtc::PeerConnectionInterface::RTCConfiguration config;
config.sdp_semantics = webrtc::SdpSemantics::kUnifiedPlan;
config.enable_dtls_srtp = true;
@@ -180,7 +313,9 @@
peer_connection_->AddIceCandidate(ice_candidate);
break;
}
- default: { break; }
+ default: {
+ break;
+ }
}
}
@@ -235,7 +370,7 @@
void Connection::OnStateChange() {
if (peer_connection_.get() != nullptr &&
data_channel_->state() == webrtc::DataChannelInterface::kOpen) {
- for (const auto &header: config_headers_) {
+ for (const auto &header : config_headers_) {
Send(header.buffer());
}
}
@@ -244,13 +379,39 @@
// Handle DataChannel messages. Subscribe to each listener that matches the
// subscribe message
void Connection::OnMessage(const webrtc::DataBuffer &buffer) {
- const message_bridge::Connect *message =
- flatbuffers::GetRoot<message_bridge::Connect>(buffer.data.data());
- VLOG(2) << "Got a connect message " << aos::FlatbufferToJson(message);
+ FlatbufferSpan<SubscriberRequest> message(
+ {buffer.data.data(), buffer.data.size()});
+ if (!message.Verify()) {
+ LOG(ERROR) << "Invalid flatbuffer received from browser client.";
+ return;
+ }
+ VLOG(2) << "Got a subscription message "
+ << aos::FlatbufferToJson(&message.message());
+ if (!message.message().has_channels_to_transfer()) {
+ LOG(ERROR) << "No channels requested for transfer.";
+ return;
+ }
for (auto &subscriber : subscribers_) {
bool found_match = false;
- for (auto channel : *message->channels_to_transfer()) {
- if (subscriber->Compare(channel)) {
+ for (auto channel_request : *message.message().channels_to_transfer()) {
+ const Channel *channel = channel_request->channel();
+ if (channel == nullptr) {
+ LOG(ERROR) << "Got unpopulated channel.";
+ continue;
+ }
+ const TransferMethod transfer_method = channel_request->method();
+ // Call GetChannel() before comparing the channel name/type to each
+ // subscriber. This allows us to resolve any node or application specific
+ // mappings.
+ const Channel *comparison_channel =
+ configuration::GetChannel(event_loop_->configuration(), channel,
+ event_loop_->name(), event_loop_->node());
+ if (comparison_channel == nullptr) {
+ LOG(ERROR) << "Channel not available: "
+ << configuration::StrippedChannelToString(channel);
+ continue;
+ }
+ if (subscriber->Compare(comparison_channel)) {
int index = subscriber->index();
auto it = channels_.find(index);
if (it == channels_.end()) {
@@ -260,7 +421,7 @@
nullptr)});
it = pair.first;
}
- subscriber->AddListener(it->second);
+ subscriber->AddListener(it->second, transfer_method);
VLOG(1) << "Subscribe to: " << channel->type()->str();
found_match = true;