Create C++ -> Python interface for calling LogReader

This provides an interface that allows the user to extract json data
from the logfiles directly into Python.

Change-Id: Iafd1e4ac2301befe1f8939b396d6e40a3133b2a2
diff --git a/WORKSPACE b/WORKSPACE
index cbc390c..d43693f 100644
--- a/WORKSPACE
+++ b/WORKSPACE
@@ -549,3 +549,10 @@
     name = "com_github_google_flatbuffers",
     path = "third_party/flatbuffers",
 )
+
+http_file(
+    name = "sample_logfile",
+    downloaded_file_path = "log.fbs",
+    sha256 = "91c98edee0c90a19992792c711dde4a6743af2d6d7e45b5079ec228fdf51ff11",
+    urls = ["http://www.frc971.org/Build-Dependencies/small_sample_logfile.fbs"],
+)
diff --git a/aos/BUILD b/aos/BUILD
index 6b3c5bc..e964066 100644
--- a/aos/BUILD
+++ b/aos/BUILD
@@ -1,5 +1,5 @@
 load("//tools:environments.bzl", "mcu_cpus")
-load("@com_github_google_flatbuffers//:build_defs.bzl", "flatbuffer_cc_library")
+load("@com_github_google_flatbuffers//:build_defs.bzl", "flatbuffer_cc_library", "flatbuffer_python_library")
 
 filegroup(
     name = "prime_binaries",
@@ -295,6 +295,19 @@
     visibility = ["//visibility:public"],
 )
 
+flatbuffer_python_library(
+    name = "configuration_fbs_python",
+    srcs = ["configuration.fbs"],
+    namespace = "aos",
+    tables = [
+        "Configuration",
+        "Channel",
+        "Map",
+        "Node",
+    ],
+    visibility = ["//visibility:public"],
+)
+
 cc_library(
     name = "configuration",
     srcs = [
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index e35fe5b..1c2815d 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -269,6 +269,10 @@
   QueueMessages();
 }
 
+LogReader::~LogReader() {
+  CHECK(!event_loop_unique_ptr_) << "Did you remember to call Deregister?";
+}
+
 bool LogReader::ReadBlock() {
   if (end_of_file_) {
     return false;
diff --git a/aos/events/logging/logger.h b/aos/events/logging/logger.h
index 497dabb..b1f6718 100644
--- a/aos/events/logging/logger.h
+++ b/aos/events/logging/logger.h
@@ -93,6 +93,7 @@
 class LogReader {
  public:
   LogReader(absl::string_view filename);
+  ~LogReader();
 
   // Registers the timer and senders used to resend the messages from the log
   // file.
diff --git a/debian/python.BUILD b/debian/python.BUILD
index 666f2d8..5e5d810 100644
--- a/debian/python.BUILD
+++ b/debian/python.BUILD
@@ -1,16 +1,20 @@
 package(default_visibility = ["@//debian:__pkg__"])
 
 cc_library(
-    name = "python3.4_lib",
-    hdrs = glob(["usr/include/python3.4m/**/*.h"]),
+    name = "python3.5_lib",
+    srcs = [
+        "usr/lib/x86_64-linux-gnu/libpython3.5m.so",
+    ],
+    hdrs = glob(["usr/include/**/*.h"]),
     includes = [
-        "usr/include/python3.4m/",
+        "usr/include/",
+        "usr/include/python3.5m/",
     ],
     visibility = ["//visibility:public"],
 )
 
 cc_library(
-    name = "python3.4_f2py",
+    name = "python3.5_f2py",
     srcs = [
         "usr/lib/python3/dist-packages/numpy/f2py/src/fortranobject.c",
     ],
@@ -26,7 +30,7 @@
     ],
     visibility = ["//visibility:public"],
     deps = [
-        ":python3.4_lib",
+        ":python3.5_lib",
     ],
 )
 
diff --git a/frc971/analysis/BUILD b/frc971/analysis/BUILD
index b45031f..b526712 100644
--- a/frc971/analysis/BUILD
+++ b/frc971/analysis/BUILD
@@ -21,3 +21,30 @@
     srcs = ["__init__.py"],
     deps = ["//frc971:python_init"],
 )
+
+cc_binary(
+    name = "py_log_reader.so",
+    srcs = ["py_log_reader.cc"],
+    linkshared = True,
+    restricted_to = ["//tools:k8"],
+    deps = [
+        "//aos:configuration",
+        "//aos:json_to_flatbuffer",
+        "//aos/events:shm_event_loop",
+        "//aos/events:simulated_event_loop",
+        "//aos/events/logging:logger",
+        "@com_github_google_glog//:glog",
+        "@python_repo//:python3.5_lib",
+    ],
+)
+
+py_test(
+    name = "log_reader_test",
+    srcs = ["log_reader_test.py"],
+    data = [
+        ":py_log_reader.so",
+        "@sample_logfile//file",
+    ],
+    restricted_to = ["//tools:k8"],
+    deps = ["//aos:configuration_fbs_python"],
+)
diff --git a/frc971/analysis/log_reader_test.py b/frc971/analysis/log_reader_test.py
new file mode 100644
index 0000000..3389d00
--- /dev/null
+++ b/frc971/analysis/log_reader_test.py
@@ -0,0 +1,117 @@
+#!/usr/bin/python3
+import json
+import unittest
+
+from aos.Configuration import Configuration
+from frc971.analysis.py_log_reader import LogReader
+
+
+class LogReaderTest(unittest.TestCase):
+    def setUp(self):
+        self.reader = LogReader("external/sample_logfile/file/log.fbs")
+        # A list of all the channels in the logfile--this is used to confirm that
+        # we did indeed read the config correctly.
+        self.all_channels = [
+            ("/aos", "aos.JoystickState"), ("/aos", "aos.RobotState"),
+            ("/aos", "aos.timing.Report"), ("/aos", "frc971.PDPValues"),
+            ("/aos",
+             "frc971.wpilib.PneumaticsToLog"), ("/autonomous",
+                                                "aos.common.actions.Status"),
+            ("/autonomous", "frc971.autonomous.AutonomousMode"),
+            ("/autonomous", "frc971.autonomous.Goal"), ("/camera",
+                                                        "y2019.CameraLog"),
+            ("/camera", "y2019.control_loops.drivetrain.CameraFrame"),
+            ("/drivetrain",
+             "frc971.IMUValues"), ("/drivetrain",
+                                   "frc971.control_loops.drivetrain.Goal"),
+            ("/drivetrain",
+             "frc971.control_loops.drivetrain.LocalizerControl"),
+            ("/drivetrain", "frc971.control_loops.drivetrain.Output"),
+            ("/drivetrain", "frc971.control_loops.drivetrain.Position"),
+            ("/drivetrain", "frc971.control_loops.drivetrain.Status"),
+            ("/drivetrain", "frc971.sensors.GyroReading"),
+            ("/drivetrain",
+             "y2019.control_loops.drivetrain.TargetSelectorHint"),
+            ("/superstructure",
+             "y2019.StatusLight"), ("/superstructure",
+                                    "y2019.control_loops.superstructure.Goal"),
+            ("/superstructure", "y2019.control_loops.superstructure.Output"),
+            ("/superstructure", "y2019.control_loops.superstructure.Position"),
+            ("/superstructure", "y2019.control_loops.superstructure.Status")
+        ]
+        # A channel that is known to have data on it which we will use for testing.
+        self.test_channel = ("/aos", "aos.timing.Report")
+        # A non-existent channel
+        self.bad_channel = ("/aos", "aos.timing.FooBar")
+
+    def test_do_nothing(self):
+        """Tests that we sanely handle doing nothing.
+
+        A previous iteration of the log reader seg faulted when doing this."""
+        pass
+
+    def test_read_config(self):
+        """Tests that we can read the configuration from the logfile."""
+        config_bytes = self.reader.configuration()
+        config = Configuration.GetRootAsConfiguration(config_bytes, 0)
+
+        channel_set = set(self.all_channels)
+        for ii in range(config.ChannelsLength()):
+            channel = config.Channels(ii)
+            # Will raise KeyError if the channel does not exist
+            channel_set.remove((channel.Name().decode("utf-8"),
+                                channel.Type().decode("utf-8")))
+
+        self.assertEqual(0, len(channel_set))
+
+    def test_empty_process(self):
+        """Tests running process() without subscribing to anything succeeds."""
+        self.reader.process()
+        for channel in self.all_channels:
+            with self.assertRaises(ValueError) as context:
+                self.reader.get_data_for_channel(channel[0], channel[1])
+
+    def test_subscribe(self):
+        """Tests that we can subscribe to a channel and get data out."""
+        name = self.test_channel[0]
+        message_type = self.test_channel[1]
+        self.assertTrue(self.reader.subscribe(name, message_type))
+        self.reader.process()
+        data = self.reader.get_data_for_channel(name, message_type)
+        self.assertLess(100, len(data))
+        last_monotonic_time = 0
+        for entry in data:
+            monotonic_time = entry[0]
+            realtime_time = entry[1]
+            json_data = entry[2].replace('nan', '\"nan\"')
+            self.assertLess(last_monotonic_time, monotonic_time)
+            # Sanity check that the realtime times are in the correct range.
+            self.assertLess(1500000000e9, realtime_time)
+            self.assertGreater(2000000000e9, realtime_time)
+            parsed_json = json.loads(json_data)
+            self.assertIn("name", parsed_json)
+
+            last_monotonic_time = monotonic_time
+
+    def test_bad_subscribe(self):
+        """Tests that we return false when subscribing to a non-existent channel."""
+        self.assertFalse(
+            self.reader.subscribe(self.bad_channel[0], self.bad_channel[1]),
+            self.bad_channel)
+
+    def test_subscribe_after_process(self):
+        """Tests that an exception is thrown if we subscribe after calling process()."""
+        self.reader.process()
+        for channel in self.all_channels:
+            with self.assertRaises(RuntimeError) as context:
+                self.reader.subscribe(channel[0], channel[1])
+
+    def test_get_data_before_processj(self):
+        """Tests that an exception is thrown if we retrieve data before calling process()."""
+        for channel in self.all_channels:
+            with self.assertRaises(RuntimeError) as context:
+                self.reader.get_data_for_channel(channel[0], channel[1])
+
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/frc971/analysis/py_log_reader.cc b/frc971/analysis/py_log_reader.cc
new file mode 100644
index 0000000..5a6fce4
--- /dev/null
+++ b/frc971/analysis/py_log_reader.cc
@@ -0,0 +1,283 @@
+// This file provides a Python module for reading logfiles. See
+// log_reader_test.py for usage.
+//
+// This reader works by having the user specify exactly what channels they want
+// data for. We then process the logfile and store all the data on that channel
+// into a list of timestamps + JSON message data. The user can then use an
+// accessor method (get_data_for_channel) to retrieve the cached data.
+
+// Defining PY_SSIZE_T_CLEAN seems to be suggested by most of the Python
+// documentation.
+#define PY_SSIZE_T_CLEAN
+// Note that Python.h needs to be included before anything else.
+#include <Python.h>
+
+#include <memory>
+
+#include "aos/configuration.h"
+#include "aos/events/logging/logger.h"
+#include "aos/events/simulated_event_loop.h"
+#include "aos/flatbuffer_merge.h"
+#include "aos/json_to_flatbuffer.h"
+
+namespace frc971 {
+namespace analysis {
+namespace {
+
+// All the data corresponding to a single message.
+struct MessageData {
+  aos::monotonic_clock::time_point monotonic_sent_time;
+  aos::realtime_clock::time_point realtime_sent_time;
+  // JSON representation of the message.
+  std::string json_data;
+};
+
+// Data corresponding to an entire channel.
+struct ChannelData {
+  std::string name;
+  std::string type;
+  // Each message published on the channel, in order by monotonic time.
+  std::vector<MessageData> messages;
+};
+
+// All the objects that we need for managing reading a logfile.
+struct LogReaderTools {
+  std::unique_ptr<aos::logger::LogReader> reader;
+  std::unique_ptr<aos::SimulatedEventLoopFactory> event_loop_factory;
+  // Event loop to use for subscribing to buses.
+  std::unique_ptr<aos::EventLoop> event_loop;
+  std::vector<ChannelData> channel_data;
+  // Whether we have called process() on the reader yet.
+  bool processed = false;
+};
+
+struct LogReaderType {
+  PyObject_HEAD;
+  LogReaderTools *tools = nullptr;
+};
+
+void LogReader_dealloc(LogReaderType *self) {
+  LogReaderTools *tools = self->tools;
+  if (!tools->processed) {
+    tools->reader->Deregister();
+  }
+  delete tools;
+  Py_TYPE(self)->tp_free((PyObject *)self);
+}
+
+PyObject *LogReader_new(PyTypeObject *type, PyObject * /*args*/,
+                            PyObject * /*kwds*/) {
+  LogReaderType *self;
+  self = (LogReaderType *)type->tp_alloc(type, 0);
+  if (self != nullptr) {
+    self->tools = new LogReaderTools();
+    if (self->tools == nullptr) {
+      return nullptr;
+    }
+  }
+  return (PyObject *)self;
+}
+
+int LogReader_init(LogReaderType *self, PyObject *args, PyObject *kwds) {
+  const char *kwlist[] = {"log_file_name", nullptr};
+
+  const char *log_file_name;
+  if (!PyArg_ParseTupleAndKeywords(args, kwds, "s", const_cast<char **>(kwlist),
+                                   &log_file_name)) {
+    return -1;
+  }
+
+  LogReaderTools *tools = CHECK_NOTNULL(self->tools);
+  tools->reader = std::make_unique<aos::logger::LogReader>(log_file_name);
+  tools->event_loop_factory = std::make_unique<aos::SimulatedEventLoopFactory>(
+      tools->reader->configuration());
+  tools->reader->Register(tools->event_loop_factory.get());
+
+  tools->event_loop = tools->event_loop_factory->MakeEventLoop("data_fetcher");
+  tools->event_loop->SkipTimingReport();
+
+  return 0;
+}
+
+PyObject *LogReader_get_data_for_channel(LogReaderType *self,
+                                                PyObject *args,
+                                                PyObject *kwds) {
+  const char *kwlist[] = {"name", "type", nullptr};
+
+  const char *name;
+  const char *type;
+  if (!PyArg_ParseTupleAndKeywords(args, kwds, "ss",
+                                   const_cast<char **>(kwlist), &name, &type)) {
+    return nullptr;
+  }
+
+  LogReaderTools *tools = CHECK_NOTNULL(self->tools);
+
+  if (!tools->processed) {
+    PyErr_SetString(PyExc_RuntimeError,
+                    "Called get_data_for_bus before calling process().");
+    return nullptr;
+  }
+
+  for (const auto &channel : tools->channel_data) {
+    if (channel.name == name && channel.type == type) {
+      PyObject *list = PyList_New(channel.messages.size());
+      for (size_t ii = 0; ii < channel.messages.size(); ++ii)
+      {
+        const auto &message = channel.messages[ii];
+        PyObject *monotonic_time = PyLong_FromLongLong(
+            std::chrono::duration_cast<std::chrono::nanoseconds>(
+                message.monotonic_sent_time.time_since_epoch())
+                .count());
+        PyObject *realtime_time = PyLong_FromLongLong(
+            std::chrono::duration_cast<std::chrono::nanoseconds>(
+                message.realtime_sent_time.time_since_epoch())
+                .count());
+        PyObject *json_data = PyUnicode_FromStringAndSize(
+            message.json_data.data(), message.json_data.size());
+        PyObject *entry =
+            PyTuple_Pack(3, monotonic_time, realtime_time, json_data);
+        if (PyList_SetItem(list, ii, entry) != 0) {
+          return nullptr;
+        }
+      }
+      return list;
+    }
+  }
+  PyErr_SetString(PyExc_ValueError,
+                  "The provided channel was never subscribed to.");
+  return nullptr;
+}
+
+PyObject *LogReader_subscribe(LogReaderType *self, PyObject *args,
+                                     PyObject *kwds) {
+  const char *kwlist[] = {"name", "type", nullptr};
+
+  const char *name;
+  const char *type;
+  if (!PyArg_ParseTupleAndKeywords(args, kwds, "ss",
+                                   const_cast<char **>(kwlist), &name, &type)) {
+    return nullptr;
+  }
+
+  LogReaderTools *tools = CHECK_NOTNULL(self->tools);
+
+  if (tools->processed) {
+    PyErr_SetString(PyExc_RuntimeError,
+                    "Called subscribe after calling process().");
+    return nullptr;
+  }
+
+  const aos::Channel *const channel = aos::configuration::GetChannel(
+      tools->reader->configuration(), name, type, "", nullptr);
+  if (channel == nullptr) {
+    return Py_False;
+  }
+  const int index = tools->channel_data.size();
+  tools->channel_data.push_back(
+      {.name = name, .type = type, .messages = {}});
+  tools->event_loop->MakeRawWatcher(
+      channel, [channel, index, tools](const aos::Context &context,
+                                       const void *message) {
+        tools->channel_data[index].messages.push_back(
+            {.monotonic_sent_time = context.monotonic_event_time,
+             .realtime_sent_time = context.realtime_event_time,
+             .json_data = aos::FlatbufferToJson(
+                 channel->schema(), static_cast<const uint8_t *>(message))});
+      });
+  return Py_True;
+}
+
+static PyObject *LogReader_process(LogReaderType *self,
+                                   PyObject *Py_UNUSED(ignored)) {
+  LogReaderTools *tools = CHECK_NOTNULL(self->tools);
+
+  if (tools->processed) {
+    PyErr_SetString(PyExc_RuntimeError, "process() may only be called once.");
+    return nullptr;
+  }
+
+  tools->processed = true;
+
+  tools->event_loop_factory->Run();
+  tools->reader->Deregister();
+
+  Py_RETURN_NONE;
+}
+
+static PyObject *LogReader_configuration(LogReaderType *self,
+                                         PyObject *Py_UNUSED(ignored)) {
+  LogReaderTools *tools = CHECK_NOTNULL(self->tools);
+
+  // I have no clue if the Configuration that we get from the log reader is in a
+  // contiguous chunk of memory, and I'm too lazy to either figure it out or
+  // figure out how to extract the actual data buffer + offset.
+  // Instead, copy the flatbuffer and return a copy of the new buffer.
+  aos::FlatbufferDetachedBuffer<aos::Configuration> buffer =
+      aos::CopyFlatBuffer(tools->reader->configuration());
+
+  return PyBytes_FromStringAndSize(
+      reinterpret_cast<const char *>(buffer.data()), buffer.size());
+}
+
+static PyMethodDef LogReader_methods[] = {
+    {"configuration", (PyCFunction)LogReader_configuration, METH_NOARGS,
+     "Return a bytes buffer for the Configuration of the logfile."},
+    {"process", (PyCFunction)LogReader_process, METH_NOARGS,
+     "Processes the logfile and all the subscribed to channels."},
+    {"subscribe", (PyCFunction)LogReader_subscribe,
+     METH_VARARGS | METH_KEYWORDS,
+     "Attempts to subscribe to the provided channel name + type. Returns True "
+     "if successful."},
+    {"get_data_for_channel", (PyCFunction)LogReader_get_data_for_channel,
+     METH_VARARGS | METH_KEYWORDS,
+     "Returns the logged data for a given channel. Raises an exception if you "
+     "did not subscribe to the provided channel. Returned data is a list of "
+     "tuples where each tuple is of the form (monotonic_nsec, realtime_nsec, "
+     "json_message_data)."},
+    {nullptr, 0, 0, nullptr} /* Sentinel */
+};
+
+static PyTypeObject LogReaderType = {
+    PyVarObject_HEAD_INIT(NULL, 0).tp_name = "py_log_reader.LogReader",
+    .tp_doc = "LogReader objects",
+    .tp_basicsize = sizeof(LogReaderType),
+    .tp_itemsize = 0,
+    .tp_flags = Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE,
+    .tp_new = LogReader_new,
+    .tp_init = (initproc)LogReader_init,
+    .tp_dealloc = (destructor)LogReader_dealloc,
+    .tp_methods = LogReader_methods,
+};
+
+static PyModuleDef log_reader_module = {
+    PyModuleDef_HEAD_INIT,
+    .m_name = "py_log_reader",
+    .m_doc = "Example module that creates an extension type.",
+    .m_size = -1,
+};
+
+PyObject *InitModule() {
+  PyObject *m;
+  if (PyType_Ready(&LogReaderType) < 0) return nullptr;
+
+  m = PyModule_Create(&log_reader_module);
+  if (m == nullptr) return nullptr;
+
+  Py_INCREF(&LogReaderType);
+  if (PyModule_AddObject(m, "LogReader", (PyObject *)&LogReaderType) < 0) {
+    Py_DECREF(&LogReaderType);
+    Py_DECREF(m);
+    return nullptr;
+  }
+
+  return m;
+}
+
+}  // namespace
+}  // namespace analysis
+}  // namespace frc971
+
+PyMODINIT_FUNC PyInit_py_log_reader(void) {
+  return frc971::analysis::InitModule();
+}