Allow constructing control loops from flatbuffers
The core changes here are to:
* Allow constructing StateFeedbackLoop's from flatbuffers using the
code in *state_feedback_loop_converters.*
* Add constructors to the single-dof subsystem class to make use of
this.
* Add code to control_loops.py to generate JSON files with the requisite
constants (these end up containing identical information to the
generated .cc files).
* Add interfaces to actually support the new JSON codegen to single-dof
subsystem classes.
* Convert all of the drivetrains over to generating these. This I mostly
do so that I can write a test where Iconfirm that the .cc files and
the JSON files generate exactly the same content.
Change-Id: Iceac48f25ecac96200b7bf992c8f34a15fe6800c
Signed-off-by: James Kuszmaul <jabukuszmaul+collab@gmail.com>
diff --git a/frc971/control_loops/python/control_loop.py b/frc971/control_loops/python/control_loop.py
index e179193..864a4e4 100644
--- a/frc971/control_loops/python/control_loop.py
+++ b/frc971/control_loops/python/control_loop.py
@@ -1,6 +1,7 @@
import frc971.control_loops.python.controls as controls
import numpy
import os
+import json
class Constant(object):
@@ -25,6 +26,16 @@
(self.comment, typestring, self.name, self.value)
+def MatrixToJson(matrix):
+ """Returns JSON representation of a numpy matrix."""
+ return {
+ "rows": matrix.shape[0],
+ "cols": matrix.shape[1],
+ "storage_order": "ColMajor",
+ "data": numpy.array(matrix).flatten(order='F').tolist()
+ }
+
+
class ControlLoopWriter(object):
def __init__(self,
@@ -83,10 +94,12 @@
os.path.basename(header_file).upper().replace(
'.', '_').replace('/', '_') + '_')
- def Write(self, header_file, cc_file):
+ def Write(self, header_file, cc_file, json_file=None):
"""Writes the loops to the specified files."""
self.WriteHeader(header_file)
self.WriteCC(os.path.basename(header_file), cc_file)
+ if json_file is not None:
+ self.WriteJson(json_file)
def _GenericType(self, typename, extra_args=None):
"""Returns a loop template using typename for the type."""
@@ -262,6 +275,19 @@
fd.write(self._namespace_end)
fd.write('\n')
+ def WriteJson(self, json_file):
+ """Writes a JSON file of the loop constants to the specified json_file."""
+ loops = []
+ for loop in self._loops:
+ loop_json = {}
+ loop_json["plant"] = loop.DumpPlantJson(self._PlantCoeffType())
+ loop_json["controller"] = loop.DumpControllerJson()
+ loop_json["observer"] = loop.DumbObserverJson(
+ self._ObserverCoeffType())
+ loops.append(loop_json)
+ with open(json_file, 'w') as f:
+ f.write(json.dumps(loops))
+
class ControlLoop(object):
@@ -389,6 +415,27 @@
return '%s Make%sPlantCoefficients();\n' % (plant_coefficient_type,
self._name)
+ def DumpPlantJson(self, plant_coefficient_type):
+ result = {
+ "c": MatrixToJson(self.C),
+ "d": MatrixToJson(self.D),
+ "u_min": MatrixToJson(self.U_min),
+ "u_max": MatrixToJson(self.U_max),
+ "u_limit_coefficient": MatrixToJson(self.U_limit_coefficient),
+ "u_limit_constant": MatrixToJson(self.U_limit_constant),
+ "delayed_u": self.delayed_u
+ }
+ if plant_coefficient_type.startswith('StateFeedbackPlant'):
+ result["a"] = MatrixToJson(self.A)
+ result["b"] = MatrixToJson(self.B)
+ result["dt"] = int(self.dt * 1e9)
+ elif plant_coefficient_type.startswith('StateFeedbackHybridPlant'):
+ result["a_continuous"] = MatrixToJson(self.A_continuous)
+ result["b_continuous"] = MatrixToJson(self.B_continuous)
+ else:
+ glog.fatal('Unsupported plant type %s', plant_coefficient_type)
+ return result
+
def DumpPlant(self, plant_coefficient_type, scalar_type):
"""Writes out a c++ function which will create a PlantCoefficients object.
@@ -475,6 +522,10 @@
num_states, num_inputs, num_outputs, scalar_type,
self.ControllerFunction())
+ def DumpControllerJson(self):
+ result = {"k": MatrixToJson(self.K), "kff": MatrixToJson(self.Kff)}
+ return result
+
def DumpController(self, scalar_type):
"""Returns a c++ function which will create a Controller object.
@@ -511,6 +562,32 @@
return '%s %s;\n' % (observer_coefficient_type,
self.ObserverFunction())
+ def GetObserverCoefficients(self):
+ if hasattr(self, 'KalmanGain'):
+ KalmanGain = self.KalmanGain
+ Q = self.Q
+ R = self.R
+ else:
+ KalmanGain = numpy.linalg.inv(self.A) * self.L
+ Q = numpy.zeros(self.A.shape)
+ R = numpy.zeros((self.C.shape[0], self.C.shape[0]))
+ return (KalmanGain, Q, R)
+
+ def DumbObserverJson(self, observer_coefficient_type):
+ result = {"delayed_u": self.delayed_u}
+ if observer_coefficient_type.startswith('StateFeedbackObserver'):
+ KalmanGain, Q, R = self.GetObserverCoefficients()
+ result["kalman_gain"] = MatrixToJson(KalmanGain)
+ result["q"] = MatrixToJson(Q)
+ result["r"] = MatrixToJson(R)
+ elif observer_coefficient_type.startswith('HybridKalman'):
+ result["q_continuous"] = MatrixToJson(self.Q_continuous)
+ result["r_continuous"] = MatrixToJson(self.R_continuous)
+ result["p_steady_state"] = MatrixToJson(self.P_steady_state)
+ else:
+ glog.fatal('Unsupported plant type %s', observer_coefficient_type)
+ return result
+
def DumpObserver(self, observer_coefficient_type, scalar_type):
"""Returns a c++ function which will create a Observer object.
@@ -523,14 +600,7 @@
delayed_u_string = str(self.delayed_u)
if observer_coefficient_type.startswith('StateFeedbackObserver'):
- if hasattr(self, 'KalmanGain'):
- KalmanGain = self.KalmanGain
- Q = self.Q
- R = self.R
- else:
- KalmanGain = numpy.linalg.inv(self.A) * self.L
- Q = numpy.zeros(self.A.shape)
- R = numpy.zeros((self.C.shape[0], self.C.shape[0]))
+ KalmanGain, Q, R = self.GetObserverCoefficients()
ans.append(self._DumpMatrix('KalmanGain', KalmanGain, scalar_type))
ans.append(self._DumpMatrix('Q', Q, scalar_type))
ans.append(self._DumpMatrix('R', R, scalar_type))