Make preliminary Seasocks server with data collector.
This is stage 2 of the http_status project!
Currently, this version can display various things from the queues in
a web browser, but only in plain text. It implements some of the basic
classes that will later be used for later stages. It also supports
concurrent connections.
A couple of things not included are all the trigger/bandwidth adjust
options.
Change-Id: I108d965a768c073a0f4b3757c0475fe7bfc34ccb
diff --git a/frc971/http_status/http_status.cc b/frc971/http_status/http_status.cc
index 9a6440a..6cdb303 100644
--- a/frc971/http_status/http_status.cc
+++ b/frc971/http_status/http_status.cc
@@ -1,114 +1,296 @@
-// Copyright (c) 2013, Matt Godbolt
-// All rights reserved.
-//
-// Redistribution and use in source and binary forms, with or without
-// modification, are permitted provided that the following conditions are met:
-//
-// Redistributions of source code must retain the above copyright notice, this
-// list of conditions and the following disclaimer.
-//
-// Redistributions in binary form must reproduce the above copyright notice,
-// this list of conditions and the following disclaimer in the documentation
-// and/or other materials provided with the distribution.
-//
-// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
-// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
-// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
-// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
-// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
-// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
-// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
-// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
-// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
-// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
-// POSSIBILITY OF SUCH DAMAGE.
-
-// An extraordinarily simple test which presents a web page with some buttons.
-// Clicking on the numbered button increments the number, which is visible to
-// other connected clients. WebSockets are used to do this: by the rather
-// suspicious means of sending raw JavaScript commands to be executed on other
-// clients.
-
#include "seasocks/PrintfLogger.h"
-#include "seasocks/Server.h"
-#include "seasocks/StringUtil.h"
#include "seasocks/WebSocket.h"
-#include "seasocks/util/Json.h"
+#include "seasocks/Server.h"
-#include <cstring>
#include <iostream>
-#include <memory>
-#include <set>
#include <sstream>
#include <string>
+#include <thread>
+#include <vector>
-#include "embedded.h"
+#include "aos/linux_code/init.h"
+#include "aos/common/logging/logging.h"
+#include "aos/common/time.h"
+#include "aos/common/util/phased_loop.h"
+#include "aos/common/mutex.h"
-using namespace seasocks;
-using namespace std;
+#include "frc971/control_loops/claw/claw.q.h"
+#include "frc971/control_loops/fridge/fridge.q.h"
-class MyHandler: public WebSocket::Handler {
-public:
- MyHandler(Server* server) : _server(server), _currentValue(0) {
- setValue(1);
+#include "frc971/http_status/http_status.h"
+
+namespace frc971 {
+namespace http_status {
+
+// TODO(comran): Make some of these separate libraries & document them better.
+
+HTTPStatusMessage::HTTPStatusMessage()
+ : sample_id_(0),
+ measure_index_(0),
+ overflow_id_(200),
+ num_samples_per_packet_(50) {}
+
+void HTTPStatusMessage::NextSample() {
+ int32_t adjusted_index = GetIndex(sample_id_);
+
+ ::aos::time::Time time_now = ::aos::time::Time::Now();
+
+ if (sample_id_ < overflow_id_) {
+ sample_times_.emplace_back(time_now);
+ data_values_.emplace_back(::std::vector<double>());
+ } else {
+ sample_times_[adjusted_index] = time_now;
+ }
+
+ sample_id_++;
+ measure_index_ = 0;
+
+ CHECK(!mutex_.Lock()); // Lock the mutex so measures can be added.
+}
+
+void HTTPStatusMessage::EndSample() { mutex_.Unlock(); }
+
+int32_t HTTPStatusMessage::GetIndex(int32_t sample_id) {
+ return sample_id % overflow_id_;
+}
+
+void HTTPStatusMessage::AddMeasure(::std::string name, double value) {
+ // Mutex should be locked when this method is called to synchronize packets.
+ assert(mutex_.OwnedBySelf());
+
+ int32_t index = GetIndex(sample_id_ - 1);
+
+ if (measure_index_ >= static_cast<int32_t>(data_names_.size())) {
+ data_names_.emplace_back(name);
+ }
+
+ if (measure_index_ >= static_cast<int32_t>(data_values_.at(index).size())) {
+ data_values_.at(index).emplace_back(value);
+ } else {
+ data_values_.at(index).at(measure_index_) = value;
+ }
+ measure_index_++;
+}
+
+::std::string HTTPStatusMessage::Fetch(size_t from_sample) {
+ ::aos::MutexLocker locker(&mutex_);
+
+ ::std::stringstream message;
+ message.precision(10); // Cap how precise the time/measurement data is.
+
+ // To save space, data is being sent with a custom protocol over to the
+ // client.
+ // Initially, a message containing all the names of the measurements is sent
+ // and is preceeded with a *.
+ // Names begin with a star and are split with commas.
+
+ // Example: *test,test2
+ if (static_cast<int32_t>(from_sample) == -1) {
+ message << "*";
+ for (int32_t cur_data_name = 0;
+ cur_data_name < static_cast<int32_t>(data_names_.size());
+ cur_data_name++) {
+ if (cur_data_name > 0) {
+ message << ",";
+ }
+ message << data_names_.at(cur_data_name);
+ }
+ return message.str();
+ }
+
+ // TODO(comran): Use from_sample to determine the speed packets should be sent
+ // out to avoid skipping packets.
+ from_sample = sample_id_ - num_samples_per_packet_;
+
+ // Data packets are sent, with raw data being placed at the same index as the
+ // original index of the measurement name sent in the initial packet.
+ // Samples are split with dollar signs, info with percent signs, and
+ // measurements with commas.
+ // This special format system is helpful for debugging issues and looping
+ // through the data on the client side.
+
+ // Example of two samples that correspond with the initialized example:
+ // 289%2803.135127%10,67$290%2803.140109%12,68
+ for (int32_t cur_sample = from_sample;
+ cur_sample <
+ static_cast<int32_t>(from_sample + num_samples_per_packet_) &&
+ GetIndex(cur_sample) < static_cast<int32_t>(data_values_.size());
+ cur_sample++) {
+ if (cur_sample != static_cast<int32_t>(from_sample)) {
+ message << "$";
}
- virtual void onConnect(WebSocket* connection) {
- _connections.insert(connection);
- connection->send(_currentSetValue.c_str());
- cout << "Connected: " << connection->getRequestUri()
- << " : " << formatAddress(connection->getRemoteAddress())
- << endl;
- cout << "Credentials: " << *(connection->credentials()) << endl;
+ int32_t adjusted_index = GetIndex(cur_sample);
+
+ message << cur_sample << "%" << sample_times_.at(adjusted_index).ToSeconds()
+ << "%";
+ for (int32_t cur_measure = 0;
+ cur_measure < static_cast<int32_t>(data_names_.size());
+ cur_measure++) {
+ if (cur_measure > 0) {
+ message << ",";
+ }
+ message << data_values_.at(adjusted_index).at(cur_measure);
}
+ }
+ return message.str();
+}
- virtual void onData(WebSocket* connection, const char* data) {
- if (0 == strcmp("die", data)) {
- _server->terminate();
- return;
- }
- if (0 == strcmp("close", data)) {
- cout << "Closing.." << endl;
- connection->close();
- cout << "Closed." << endl;
- return;
- }
+DataCollector::DataCollector() : cur_raw_data_("no data") {}
- int value = atoi(data) + 1;
- if (value > _currentValue) {
- setValue(value);
- for (auto connection : _connections) {
- connection->send(_currentSetValue.c_str());
- }
- }
- }
+void DataCollector::RunIteration() {
+ auto& fridge_queue = control_loops::fridge_queue;
+ auto& claw_queue = control_loops::claw_queue;
- virtual void onDisconnect(WebSocket* connection) {
- _connections.erase(connection);
- cout << "Disconnected: " << connection->getRequestUri()
- << " : " << formatAddress(connection->getRemoteAddress())
- << endl;
- }
+ fridge_queue.status.FetchAnother();
+ claw_queue.status.FetchAnother();
-private:
- set<WebSocket*> _connections;
- Server* _server;
- int _currentValue;
- string _currentSetValue;
+ message_.NextSample();
+ // Add recorded data here. /////
+ // NOTE: Try to use fewer than 30 measures, or the whole thing will lag.
+ // Abbreviate names if long, otherwise just use the command to get the value
+ // from the queue.
- void setValue(int value) {
- _currentValue = value;
- _currentSetValue = makeExecString("set", _currentValue);
- }
-};
+ // TODO(comran): Make it so that the name doesn't have to be copied as a
+ // string.
-int main(int argc, const char* argv[]) {
- shared_ptr<Logger> logger(new PrintfLogger(Logger::DEBUG));
+ // //// Fridge
+ // Positions
+ message_.AddMeasure("(fridge position left arm encoder)",
+ fridge_queue.position->arm.left.encoder);
+ message_.AddMeasure("(fridge position right arm encoder)",
+ fridge_queue.position->arm.right.encoder);
+ message_.AddMeasure("(fridge position left elev encoder)",
+ fridge_queue.position->elevator.left.encoder);
+ message_.AddMeasure("(fridge position right elev encoder)",
+ fridge_queue.position->elevator.right.encoder);
+ // Goals
+ message_.AddMeasure("fridge_queue.goal->profiling_type",
+ fridge_queue.goal->profiling_type);
+ message_.AddMeasure("fridge_queue.goal->angle", fridge_queue.goal->angle);
+ message_.AddMeasure("fridge_queue.goal->angular_velocity",
+ fridge_queue.goal->angular_velocity);
+ message_.AddMeasure("fridge_queue.goal->height", fridge_queue.goal->height);
+ message_.AddMeasure("fridge_queue.goal->velocity",
+ fridge_queue.goal->velocity);
+ message_.AddMeasure("fridge_queue.x", fridge_queue.goal->x);
+ message_.AddMeasure("fridge_queue.x_velocity", fridge_queue.goal->x_velocity);
+ message_.AddMeasure("fridge_queue.y", fridge_queue.goal->y);
+ message_.AddMeasure("fridge_queue.y_velocity", fridge_queue.goal->y_velocity);
+ // Statuses
+ message_.AddMeasure("fridge_queue.status->height",
+ fridge_queue.status->height);
+ message_.AddMeasure("fridge_queue.status->velocity",
+ fridge_queue.status->velocity);
+ message_.AddMeasure("fridge_queue.status->angle", fridge_queue.status->angle);
+ message_.AddMeasure("fridge_queue.status->angular_velocity",
+ fridge_queue.status->angular_velocity);
+ message_.AddMeasure("fridge_queue.status->x", fridge_queue.status->x);
+ message_.AddMeasure("fridge_queue.status->x_velocity",
+ fridge_queue.status->x_velocity);
+ message_.AddMeasure("fridge_queue.status->y", fridge_queue.status->y);
+ message_.AddMeasure("fridge_queue.status->y_velocity",
+ fridge_queue.status->y_velocity);
+ message_.AddMeasure("fridge_queue.status->state", fridge_queue.status->state);
+ message_.AddMeasure("fridge_queue.status->zeroed",
+ fridge_queue.status->zeroed);
+ message_.AddMeasure("fridge_queue.status->estopped",
+ fridge_queue.status->estopped);
+ // Outputs
+ message_.AddMeasure("fridge_queue.output->left_arm",
+ fridge_queue.output->left_arm);
+ message_.AddMeasure("fridge_queue.output->right_arm",
+ fridge_queue.output->right_arm);
+ message_.AddMeasure("fridge_queue.output->left_elevator",
+ fridge_queue.output->left_elevator);
+ message_.AddMeasure("fridge_queue.output->right_elevator",
+ fridge_queue.output->right_elevator);
+ // End recorded data. /////
+ message_.EndSample();
+}
- Server server(logger);
+::std::string DataCollector::GetData(int32_t from_sample) {
+ return message_.Fetch(from_sample);
+}
- shared_ptr<MyHandler> handler(new MyHandler(&server));
- server.addWebSocketHandler("/ws", handler);
- server.serve("web_test", 8080);
- return 0;
+void DataCollector::operator()() {
+ ::aos::SetCurrentThreadName("HTTPStatusData");
+
+ while (run_) {
+ ::aos::time::PhasedLoopXMS(5, 0);
+ RunIteration();
+ }
+}
+
+SocketHandler::SocketHandler()
+ : data_collector_thread_(::std::ref(data_collector_)) {}
+
+void SocketHandler::onConnect(seasocks::WebSocket* connection) {
+ connections_.insert(connection);
+ LOG(INFO, "Connected: %s : %s\n", connection->getRequestUri().c_str(),
+ seasocks::formatAddress(connection->getRemoteAddress()).c_str());
+}
+
+void SocketHandler::onData(seasocks::WebSocket* connection, const char* data) {
+ int32_t from_sample = atoi(data);
+
+ ::std::string send_data = data_collector_.GetData(from_sample);
+ connection->send(send_data.c_str());
+}
+
+void SocketHandler::onDisconnect(seasocks::WebSocket* connection) {
+ connections_.erase(connection);
+ LOG(INFO, "Disconnected: %s : %s\n", connection->getRequestUri().c_str(),
+ seasocks::formatAddress(connection->getRemoteAddress()).c_str());
+}
+
+void SocketHandler::Quit() {
+ data_collector_.Quit();
+ data_collector_thread_.join();
+}
+
+SeasocksLogger::SeasocksLogger(Level min_level_to_log)
+ : PrintfLogger(min_level_to_log) {}
+
+void SeasocksLogger::log(Level level, const char* message) {
+ log_level aos_level;
+ switch (level) {
+ case seasocks::Logger::INFO:
+ aos_level = INFO;
+ break;
+ case seasocks::Logger::WARNING:
+ aos_level = WARNING;
+ break;
+ case seasocks::Logger::ERROR:
+ case seasocks::Logger::SEVERE:
+ aos_level = ERROR;
+ break;
+ case seasocks::Logger::DEBUG:
+ case seasocks::Logger::ACCESS:
+ default:
+ aos_level = DEBUG;
+ break;
+ }
+ LOG(aos_level, "Seasocks: %s\n", message);
+}
+
+} // namespace http_status
+} // namespace frc971
+
+int main(int, char* []) {
+ ::aos::InitNRT();
+
+ seasocks::Server server(::std::shared_ptr<seasocks::Logger>(
+ new frc971::http_status::SeasocksLogger(seasocks::Logger::INFO)));
+ frc971::http_status::SocketHandler socket_handler;
+
+ server.addWebSocketHandler(
+ "/ws",
+ ::std::shared_ptr<frc971::http_status::SocketHandler>(&socket_handler));
+ server.serve("www", 8080);
+
+ socket_handler.Quit();
+
+ ::aos::Cleanup();
+ return 0;
}