Improve plotting script
Also, add a bunch of plotting configs for the new signals that I've
added.
Change-Id: I7aa5221939351ede989b897db561b6a57f023910
diff --git a/frc971/analysis/plot.py b/frc971/analysis/plot.py
index 1eba716..ecb23b3 100644
--- a/frc971/analysis/plot.py
+++ b/frc971/analysis/plot.py
@@ -28,36 +28,124 @@
for channel in self.config.channel:
if channel.alias in aliases:
raise ValueError("Duplicate alias " + channel.alias)
- aliases.add(channel.alias)
if not self.reader.subscribe(channel.name, channel.type):
- raise ValueError("No such channel with name " + channel.name +
- " and type " + channel.type)
+ print("Warning: No such channel with name " + channel.name +
+ " and type " + channel.type)
+ continue
+ aliases.add(channel.alias)
self.reader.process()
for channel in self.config.channel:
self.data[channel.alias] = []
+ if channel.alias not in aliases:
+ print("Unable to plot channel alias " + channel.alias)
+ continue
for message in self.reader.get_data_for_channel(
channel.name, channel.type):
valid_json = message[2].replace('nan', '"nan"')
- parsed_json = json.loads(valid_json)
- self.data[channel.alias].append((message[0], message[1],
- parsed_json))
+ valid_json = valid_json.replace(' inf', ' "inf"')
+ valid_json = valid_json.replace('-inf', '"-inf"')
+ try:
+ parsed_json = json.loads(valid_json)
+ except json.decoder.JSONDecodeError as ex:
+ print("JSON Decode failed:")
+ print(valid_json)
+ raise ex
+ self.data[channel.alias].append(
+ (message[0], message[1], parsed_json))
self.calculate_signals()
+ def calculate_down_estimator_signals(self):
+ if 'DrivetrainStatus' in self.data:
+ # Calculates a rolling mean of the acceleration output from
+ # the down estimator.
+ entries = []
+ buffer_len = 100
+ last_100_accels = np.zeros((buffer_len, 3))
+ accels_next_row = 0
+ for entry in self.data['DrivetrainStatus']:
+ msg = entry[2]
+ new_msg = {}
+ if 'down_estimator' not in msg:
+ continue
+ down_estimator = msg['down_estimator']
+ new_msg['down_estimator'] = {}
+ accel_x = 'accel_x'
+ accel_y = 'accel_y'
+ accel_z = 'accel_z'
+ if (accel_x in down_estimator and accel_y in down_estimator
+ and accel_x in down_estimator):
+ last_100_accels[accels_next_row, :] = [
+ down_estimator[accel_x], down_estimator[accel_y],
+ down_estimator[accel_z]
+ ]
+
+ accels_next_row += 1
+ accels_next_row = accels_next_row % buffer_len
+ mean_accels = np.mean(last_100_accels, axis=0)
+ new_msg['down_estimator'][
+ 'accel_x_rolling_mean'] = mean_accels[0]
+ new_msg['down_estimator'][
+ 'accel_y_rolling_mean'] = mean_accels[1]
+ new_msg['down_estimator'][
+ 'accel_z_rolling_mean'] = mean_accels[2]
+ entries.append((entry[0], entry[1], new_msg))
+ if 'CalcDrivetrainStatus' in self.data:
+ raise RuntimeError(
+ 'CalcDrivetrainStatus is already a member of data.')
+ self.data['CalcDrivetrainStatus'] = entries
+
def calculate_imu_signals(self):
if 'IMU' in self.data:
entries = []
+ # Calculates a rolling mean of the raw output from the IMU.
+ buffer_len = 1000
+ last_1000_accels = np.zeros((buffer_len, 3))
+ accels_next_row = 0
+ last_1000_gyros = np.zeros((buffer_len, 3))
+ gyros_next_row = 0
for entry in self.data['IMU']:
accel_x = 'accelerometer_x'
accel_y = 'accelerometer_y'
accel_z = 'accelerometer_z'
+ gyro_x = 'gyro_x'
+ gyro_y = 'gyro_y'
+ gyro_z = 'gyro_z'
msg = entry[2]
new_msg = {}
if accel_x in msg and accel_y in msg and accel_x in msg:
- total_acceleration = np.sqrt(
- msg[accel_x]**2 + msg[accel_y]**2 + msg[accel_z]**2)
+ last_1000_accels[accels_next_row, :] = [
+ msg[accel_x], msg[accel_y], msg[accel_z]
+ ]
+ total_acceleration = np.linalg.norm(
+ last_1000_accels[accels_next_row, :])
new_msg['total_acceleration'] = total_acceleration
+
+ accels_next_row += 1
+ accels_next_row = accels_next_row % buffer_len
+ std_accels = np.std(last_1000_accels, axis=0)
+ new_msg['accel_x_rolling_std'] = std_accels[0]
+ new_msg['accel_y_rolling_std'] = std_accels[1]
+ new_msg['accel_z_rolling_std'] = std_accels[2]
+ mean_accels = np.mean(last_1000_accels, axis=0)
+ new_msg['accel_x_rolling_mean'] = mean_accels[0]
+ new_msg['accel_y_rolling_mean'] = mean_accels[1]
+ new_msg['accel_z_rolling_mean'] = mean_accels[2]
+ if gyro_x in msg and gyro_y in msg and gyro_z in msg:
+ last_1000_gyros[gyros_next_row, :] = [
+ msg[gyro_x], msg[gyro_y], msg[gyro_z]
+ ]
+ gyros_next_row += 1
+ gyros_next_row = gyros_next_row % buffer_len
+ std_gyros = np.std(last_1000_gyros, axis=0)
+ new_msg['gyro_x_rolling_std'] = std_gyros[0]
+ new_msg['gyro_y_rolling_std'] = std_gyros[1]
+ new_msg['gyro_z_rolling_std'] = std_gyros[2]
+ mean_gyros = np.mean(last_1000_gyros, axis=0)
+ new_msg['gyro_x_rolling_mean'] = mean_gyros[0]
+ new_msg['gyro_y_rolling_mean'] = mean_gyros[1]
+ new_msg['gyro_z_rolling_mean'] = mean_gyros[2]
timestamp = 'monotonic_timestamp_ns'
if timestamp in msg:
timestamp_sec = msg[timestamp] * 1e-9
@@ -80,6 +168,7 @@
an overall magnitude for the accelerometer readings, which is helpful
to understanding how some internal filters work."""
self.calculate_imu_signals()
+ self.calculate_down_estimator_signals()
def extract_field(self, message: dict, field: str):
"""Extracts a field with the given name from the message.
@@ -139,15 +228,20 @@
fig = plt.figure()
num_subplots = len(figure_config.axes)
for ii in range(num_subplots):
- axes = fig.add_subplot(
- num_subplots, 1, ii + 1, sharex=shared_axis)
- shared_axis = shared_axis or axes
axes_config = figure_config.axes[ii]
+ share_axis = axes_config.share_x_axis
+ axes = fig.add_subplot(
+ num_subplots,
+ 1,
+ ii + 1,
+ sharex=shared_axis if share_axis else None)
+ if share_axis and shared_axis is None:
+ shared_axis = axes
for line in axes_config.line:
self.plot_line(axes, line)
# Make the legend transparent so we can see behind it.
- legend = axes.legend(framealpha=0.5)
- axes.set_xlabel("Monotonic Time (sec)")
+ legend = axes.legend(framealpha=0.5, loc='upper right')
+ axes.set_xlabel(axes_config.xlabel)
axes.grid(True)
if axes_config.HasField("ylabel"):
axes.set_ylabel(axes_config.ylabel)
@@ -156,21 +250,18 @@
def main(argv):
parser = argparse.ArgumentParser(
description="Plot data from an aos logfile.")
- parser.add_argument(
- "--logfile",
- type=str,
- required=True,
- help="Path to the logfile to parse.")
- parser.add_argument(
- "--config",
- type=str,
- required=True,
- help="Name of the plot config to use.")
- parser.add_argument(
- "--config_dir",
- type=str,
- default="frc971/analysis/plot_configs",
- help="Directory to look for plot configs in.")
+ parser.add_argument("--logfile",
+ type=str,
+ required=True,
+ help="Path to the logfile to parse.")
+ parser.add_argument("--config",
+ type=str,
+ required=True,
+ help="Name of the plot config to use.")
+ parser.add_argument("--config_dir",
+ type=str,
+ default="frc971/analysis/plot_configs",
+ help="Directory to look for plot configs in.")
args = parser.parse_args(argv[1:])
if not os.path.isdir(args.config_dir):