Make preliminary Seasocks server with data collector.

This is stage 2 of the http_status project!
Currently, this version can display various things from the queues in
a web browser, but only in plain text. It implements some of the basic
classes that will later be used for later stages. It also supports
concurrent connections.

A couple of things not included are all the trigger/bandwidth adjust
options.

Change-Id: I108d965a768c073a0f4b3757c0475fe7bfc34ccb
diff --git a/frc971/http_status/http_status.cc b/frc971/http_status/http_status.cc
index 9a6440a..6cdb303 100644
--- a/frc971/http_status/http_status.cc
+++ b/frc971/http_status/http_status.cc
@@ -1,114 +1,296 @@
-// Copyright (c) 2013, Matt Godbolt
-// All rights reserved.
-// 
-// Redistribution and use in source and binary forms, with or without 
-// modification, are permitted provided that the following conditions are met:
-// 
-// Redistributions of source code must retain the above copyright notice, this 
-// list of conditions and the following disclaimer.
-// 
-// Redistributions in binary form must reproduce the above copyright notice, 
-// this list of conditions and the following disclaimer in the documentation 
-// and/or other materials provided with the distribution.
-// 
-// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 
-// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
-// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
-// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 
-// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
-// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
-// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
-// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
-// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 
-// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
-// POSSIBILITY OF SUCH DAMAGE.
-
-// An extraordinarily simple test which presents a web page with some buttons.
-// Clicking on the numbered button increments the number, which is visible to
-// other connected clients.  WebSockets are used to do this: by the rather
-// suspicious means of sending raw JavaScript commands to be executed on other
-// clients.
-
 #include "seasocks/PrintfLogger.h"
-#include "seasocks/Server.h"
-#include "seasocks/StringUtil.h"
 #include "seasocks/WebSocket.h"
-#include "seasocks/util/Json.h"
+#include "seasocks/Server.h"
 
-#include <cstring>
 #include <iostream>
-#include <memory>
-#include <set>
 #include <sstream>
 #include <string>
+#include <thread>
+#include <vector>
 
-#include "embedded.h"
+#include "aos/linux_code/init.h"
+#include "aos/common/logging/logging.h"
+#include "aos/common/time.h"
+#include "aos/common/util/phased_loop.h"
+#include "aos/common/mutex.h"
 
-using namespace seasocks;
-using namespace std;
+#include "frc971/control_loops/claw/claw.q.h"
+#include "frc971/control_loops/fridge/fridge.q.h"
 
-class MyHandler: public WebSocket::Handler {
-public:
-    MyHandler(Server* server) : _server(server), _currentValue(0) {
-        setValue(1);
+#include "frc971/http_status/http_status.h"
+
+namespace frc971 {
+namespace http_status {
+
+// TODO(comran): Make some of these separate libraries & document them better.
+
+HTTPStatusMessage::HTTPStatusMessage()
+    : sample_id_(0),
+      measure_index_(0),
+      overflow_id_(200),
+      num_samples_per_packet_(50) {}
+
+void HTTPStatusMessage::NextSample() {
+  int32_t adjusted_index = GetIndex(sample_id_);
+
+  ::aos::time::Time time_now = ::aos::time::Time::Now();
+
+  if (sample_id_ < overflow_id_) {
+    sample_times_.emplace_back(time_now);
+    data_values_.emplace_back(::std::vector<double>());
+  } else {
+    sample_times_[adjusted_index] = time_now;
+  }
+
+  sample_id_++;
+  measure_index_ = 0;
+
+  CHECK(!mutex_.Lock());  // Lock the mutex so measures can be added.
+}
+
+void HTTPStatusMessage::EndSample() { mutex_.Unlock(); }
+
+int32_t HTTPStatusMessage::GetIndex(int32_t sample_id) {
+  return sample_id % overflow_id_;
+}
+
+void HTTPStatusMessage::AddMeasure(::std::string name, double value) {
+  // Mutex should be locked when this method is called to synchronize packets.
+  assert(mutex_.OwnedBySelf());
+
+  int32_t index = GetIndex(sample_id_ - 1);
+
+  if (measure_index_ >= static_cast<int32_t>(data_names_.size())) {
+    data_names_.emplace_back(name);
+  }
+
+  if (measure_index_ >= static_cast<int32_t>(data_values_.at(index).size())) {
+    data_values_.at(index).emplace_back(value);
+  } else {
+    data_values_.at(index).at(measure_index_) = value;
+  }
+  measure_index_++;
+}
+
+::std::string HTTPStatusMessage::Fetch(size_t from_sample) {
+  ::aos::MutexLocker locker(&mutex_);
+
+  ::std::stringstream message;
+  message.precision(10);  // Cap how precise the time/measurement data is.
+
+  // To save space, data is being sent with a custom protocol over to the
+  // client.
+  // Initially, a message containing all the names of the measurements is sent
+  // and is preceeded with a *.
+  // Names begin with a star and are split with commas.
+
+  // Example: *test,test2
+  if (static_cast<int32_t>(from_sample) == -1) {
+    message << "*";
+    for (int32_t cur_data_name = 0;
+         cur_data_name < static_cast<int32_t>(data_names_.size());
+         cur_data_name++) {
+      if (cur_data_name > 0) {
+        message << ",";
+      }
+      message << data_names_.at(cur_data_name);
+    }
+    return message.str();
+  }
+
+  // TODO(comran): Use from_sample to determine the speed packets should be sent
+  // out to avoid skipping packets.
+  from_sample = sample_id_ - num_samples_per_packet_;
+
+  // Data packets are sent, with raw data being placed at the same index as the
+  // original index of the measurement name sent in the initial packet.
+  // Samples are split with dollar signs, info with percent signs, and
+  // measurements with commas.
+  // This special format system is helpful for debugging issues and looping
+  // through the data on the client side.
+
+  // Example of two samples that correspond with the initialized example:
+  // 289%2803.135127%10,67$290%2803.140109%12,68
+  for (int32_t cur_sample = from_sample;
+       cur_sample <
+               static_cast<int32_t>(from_sample + num_samples_per_packet_) &&
+           GetIndex(cur_sample) < static_cast<int32_t>(data_values_.size());
+       cur_sample++) {
+    if (cur_sample != static_cast<int32_t>(from_sample)) {
+      message << "$";
     }
 
-    virtual void onConnect(WebSocket* connection) {
-        _connections.insert(connection);
-        connection->send(_currentSetValue.c_str());
-        cout << "Connected: " << connection->getRequestUri()
-                << " : " << formatAddress(connection->getRemoteAddress())
-                << endl;
-        cout << "Credentials: " << *(connection->credentials()) << endl;
+    int32_t adjusted_index = GetIndex(cur_sample);
+
+    message << cur_sample << "%" << sample_times_.at(adjusted_index).ToSeconds()
+            << "%";
+    for (int32_t cur_measure = 0;
+         cur_measure < static_cast<int32_t>(data_names_.size());
+         cur_measure++) {
+      if (cur_measure > 0) {
+        message << ",";
+      }
+      message << data_values_.at(adjusted_index).at(cur_measure);
     }
+  }
+  return message.str();
+}
 
-    virtual void onData(WebSocket* connection, const char* data) {
-        if (0 == strcmp("die", data)) {
-            _server->terminate();
-            return;
-        }
-        if (0 == strcmp("close", data)) {
-            cout << "Closing.." << endl;
-            connection->close();
-            cout << "Closed." << endl;
-            return;
-        }
+DataCollector::DataCollector() : cur_raw_data_("no data") {}
 
-        int value = atoi(data) + 1;
-        if (value > _currentValue) {
-            setValue(value);
-            for (auto connection : _connections) {
-                connection->send(_currentSetValue.c_str());
-            }
-        }
-    }
+void DataCollector::RunIteration() {
+  auto& fridge_queue = control_loops::fridge_queue;
+  auto& claw_queue = control_loops::claw_queue;
 
-    virtual void onDisconnect(WebSocket* connection) {
-        _connections.erase(connection);
-        cout << "Disconnected: " << connection->getRequestUri()
-                << " : " << formatAddress(connection->getRemoteAddress())
-                << endl;
-    }
+  fridge_queue.status.FetchAnother();
+  claw_queue.status.FetchAnother();
 
-private:
-    set<WebSocket*> _connections;
-    Server* _server;
-    int _currentValue;
-    string _currentSetValue;
+  message_.NextSample();
+  // Add recorded data here. /////
+  // NOTE: Try to use fewer than 30 measures, or the whole thing will lag.
+  // Abbreviate names if long, otherwise just use the command to get the value
+  // from the queue.
 
-    void setValue(int value) {
-        _currentValue = value;
-        _currentSetValue = makeExecString("set", _currentValue);
-    }
-};
+  // TODO(comran): Make it so that the name doesn't have to be copied as a
+  // string.
 
-int main(int argc, const char* argv[]) {
-    shared_ptr<Logger> logger(new PrintfLogger(Logger::DEBUG));
+  // //// Fridge
+  // Positions
+  message_.AddMeasure("(fridge position left arm encoder)",
+                      fridge_queue.position->arm.left.encoder);
+  message_.AddMeasure("(fridge position right arm encoder)",
+                      fridge_queue.position->arm.right.encoder);
+  message_.AddMeasure("(fridge position left elev encoder)",
+                      fridge_queue.position->elevator.left.encoder);
+  message_.AddMeasure("(fridge position right elev encoder)",
+                      fridge_queue.position->elevator.right.encoder);
+  // Goals
+  message_.AddMeasure("fridge_queue.goal->profiling_type",
+                      fridge_queue.goal->profiling_type);
+  message_.AddMeasure("fridge_queue.goal->angle", fridge_queue.goal->angle);
+  message_.AddMeasure("fridge_queue.goal->angular_velocity",
+                      fridge_queue.goal->angular_velocity);
+  message_.AddMeasure("fridge_queue.goal->height", fridge_queue.goal->height);
+  message_.AddMeasure("fridge_queue.goal->velocity",
+                      fridge_queue.goal->velocity);
+  message_.AddMeasure("fridge_queue.x", fridge_queue.goal->x);
+  message_.AddMeasure("fridge_queue.x_velocity", fridge_queue.goal->x_velocity);
+  message_.AddMeasure("fridge_queue.y", fridge_queue.goal->y);
+  message_.AddMeasure("fridge_queue.y_velocity", fridge_queue.goal->y_velocity);
+  // Statuses
+  message_.AddMeasure("fridge_queue.status->height",
+                      fridge_queue.status->height);
+  message_.AddMeasure("fridge_queue.status->velocity",
+                      fridge_queue.status->velocity);
+  message_.AddMeasure("fridge_queue.status->angle", fridge_queue.status->angle);
+  message_.AddMeasure("fridge_queue.status->angular_velocity",
+                      fridge_queue.status->angular_velocity);
+  message_.AddMeasure("fridge_queue.status->x", fridge_queue.status->x);
+  message_.AddMeasure("fridge_queue.status->x_velocity",
+                      fridge_queue.status->x_velocity);
+  message_.AddMeasure("fridge_queue.status->y", fridge_queue.status->y);
+  message_.AddMeasure("fridge_queue.status->y_velocity",
+                      fridge_queue.status->y_velocity);
+  message_.AddMeasure("fridge_queue.status->state", fridge_queue.status->state);
+  message_.AddMeasure("fridge_queue.status->zeroed",
+                      fridge_queue.status->zeroed);
+  message_.AddMeasure("fridge_queue.status->estopped",
+                      fridge_queue.status->estopped);
+  // Outputs
+  message_.AddMeasure("fridge_queue.output->left_arm",
+                      fridge_queue.output->left_arm);
+  message_.AddMeasure("fridge_queue.output->right_arm",
+                      fridge_queue.output->right_arm);
+  message_.AddMeasure("fridge_queue.output->left_elevator",
+                      fridge_queue.output->left_elevator);
+  message_.AddMeasure("fridge_queue.output->right_elevator",
+                      fridge_queue.output->right_elevator);
+  // End recorded data. /////
+  message_.EndSample();
+}
 
-    Server server(logger);
+::std::string DataCollector::GetData(int32_t from_sample) {
+  return message_.Fetch(from_sample);
+}
 
-    shared_ptr<MyHandler> handler(new MyHandler(&server));
-    server.addWebSocketHandler("/ws", handler);
-    server.serve("web_test", 8080);
-    return 0;
+void DataCollector::operator()() {
+  ::aos::SetCurrentThreadName("HTTPStatusData");
+
+  while (run_) {
+    ::aos::time::PhasedLoopXMS(5, 0);
+    RunIteration();
+  }
+}
+
+SocketHandler::SocketHandler()
+    : data_collector_thread_(::std::ref(data_collector_)) {}
+
+void SocketHandler::onConnect(seasocks::WebSocket* connection) {
+  connections_.insert(connection);
+  LOG(INFO, "Connected: %s : %s\n", connection->getRequestUri().c_str(),
+      seasocks::formatAddress(connection->getRemoteAddress()).c_str());
+}
+
+void SocketHandler::onData(seasocks::WebSocket* connection, const char* data) {
+  int32_t from_sample = atoi(data);
+
+  ::std::string send_data = data_collector_.GetData(from_sample);
+  connection->send(send_data.c_str());
+}
+
+void SocketHandler::onDisconnect(seasocks::WebSocket* connection) {
+  connections_.erase(connection);
+  LOG(INFO, "Disconnected: %s : %s\n", connection->getRequestUri().c_str(),
+      seasocks::formatAddress(connection->getRemoteAddress()).c_str());
+}
+
+void SocketHandler::Quit() {
+  data_collector_.Quit();
+  data_collector_thread_.join();
+}
+
+SeasocksLogger::SeasocksLogger(Level min_level_to_log)
+    : PrintfLogger(min_level_to_log) {}
+
+void SeasocksLogger::log(Level level, const char* message) {
+  log_level aos_level;
+  switch (level) {
+    case seasocks::Logger::INFO:
+      aos_level = INFO;
+      break;
+    case seasocks::Logger::WARNING:
+      aos_level = WARNING;
+      break;
+    case seasocks::Logger::ERROR:
+    case seasocks::Logger::SEVERE:
+      aos_level = ERROR;
+      break;
+    case seasocks::Logger::DEBUG:
+    case seasocks::Logger::ACCESS:
+    default:
+      aos_level = DEBUG;
+      break;
+  }
+  LOG(aos_level, "Seasocks: %s\n", message);
+}
+
+}  // namespace http_status
+}  // namespace frc971
+
+int main(int, char* []) {
+  ::aos::InitNRT();
+
+  seasocks::Server server(::std::shared_ptr<seasocks::Logger>(
+      new frc971::http_status::SeasocksLogger(seasocks::Logger::INFO)));
+  frc971::http_status::SocketHandler socket_handler;
+
+  server.addWebSocketHandler(
+      "/ws",
+      ::std::shared_ptr<frc971::http_status::SocketHandler>(&socket_handler));
+  server.serve("www", 8080);
+
+  socket_handler.Quit();
+
+  ::aos::Cleanup();
+  return 0;
 }