blob: d325bd7b556111bda5b758fa0a4d46244726adf9 [file] [log] [blame]
James Kuszmaul61a971f2020-01-01 15:06:18 -08001#!/usr/bin/python3
2# Sample usage:
3# bazel run -c opt //frc971/analysis:plot -- --logfile /tmp/log.fbs --config gyro.pb
4import argparse
5import json
6import os.path
7from pathlib import Path
8import sys
9
10from frc971.analysis.py_log_reader import LogReader
11from frc971.analysis.plot_config_pb2 import PlotConfig, Signal
12from google.protobuf import text_format
13
James Kuszmaul7a7fe472020-01-12 22:07:10 -080014import numpy as np
James Kuszmaul61a971f2020-01-01 15:06:18 -080015import matplotlib
16from matplotlib import pyplot as plt
17
18
19class Plotter:
20 def __init__(self, plot_config: PlotConfig, reader: LogReader):
21 self.config = plot_config
22 self.reader = reader
23 # Data streams, indexed by alias.
24 self.data = {}
25
26 def process_logfile(self):
27 aliases = set()
28 for channel in self.config.channel:
29 if channel.alias in aliases:
30 raise ValueError("Duplicate alias " + channel.alias)
31 aliases.add(channel.alias)
32 if not self.reader.subscribe(channel.name, channel.type):
33 raise ValueError("No such channel with name " + channel.name +
34 " and type " + channel.type)
35
36 self.reader.process()
37
38 for channel in self.config.channel:
39 self.data[channel.alias] = []
40 for message in self.reader.get_data_for_channel(
41 channel.name, channel.type):
42 valid_json = message[2].replace('nan', '"nan"')
43 parsed_json = json.loads(valid_json)
44 self.data[channel.alias].append((message[0], message[1],
45 parsed_json))
James Kuszmaul7a7fe472020-01-12 22:07:10 -080046 self.calculate_signals()
47
48 def calculate_imu_signals(self):
49 if 'IMU' in self.data:
50 entries = []
51 for entry in self.data['IMU']:
52 accel_x = 'accelerometer_x'
53 accel_y = 'accelerometer_y'
54 accel_z = 'accelerometer_z'
55 msg = entry[2]
56 new_msg = {}
57 if accel_x in msg and accel_y in msg and accel_x in msg:
58 total_acceleration = np.sqrt(
59 msg[accel_x]**2 + msg[accel_y]**2 + msg[accel_z]**2)
60 new_msg['total_acceleration'] = total_acceleration
61 entries.append((entry[0], entry[1], new_msg))
62 if 'CalcIMU' in self.data:
63 raise RuntimeError('CalcIMU is already a member of data.')
64 self.data['CalcIMU'] = entries
65
66 def calculate_signals(self):
67 """Calculate any derived signals for plotting.
68
69 See calculate_imu_signals for an example, but the basic idea is that
70 for any data that is read in from the logfile, we may want to calculate
71 some derived signals--possibly as simple as doing unit conversions,
72 or more complicated version where we do some filtering or the such.
73 The way this will work is that the calculate_* functions will, if the
74 raw data is available, calculate the derived signals and place them into
75 fake "messages" with an alias of "Calc*". E.g., we currently calculate
76 an overall magnitude for the accelerometer readings, which is helpful
77 to understanding how some internal filters work."""
78 self.calculate_imu_signals()
James Kuszmaul61a971f2020-01-01 15:06:18 -080079
80 def plot_signal(self, axes: matplotlib.axes.Axes, signal: Signal):
81 if not signal.channel in self.data:
82 raise ValueError("No channel alias " + signal.channel)
83 field_path = signal.field.split('.')
84 monotonic_time = []
85 signal_data = []
86 for entry in self.data[signal.channel]:
87 monotonic_time.append(entry[0] * 1e-9)
88 value = entry[2]
89 for name in field_path:
James Kuszmaul9560e902020-01-11 14:13:08 -080090 # If the value wasn't populated in a given message, fill in
91 # NaN rather than crashing.
92 if name in value:
James Kuszmaul7a7fe472020-01-12 22:07:10 -080093 value = value[name]
James Kuszmaul9560e902020-01-11 14:13:08 -080094 else:
James Kuszmaul7a7fe472020-01-12 22:07:10 -080095 value = float("nan")
96 break
James Kuszmaul61a971f2020-01-01 15:06:18 -080097 # Catch NaNs and convert them to floats.
98 value = float(value)
99 signal_data.append(value)
100 label_name = signal.channel + "." + signal.field
James Kuszmaul440ee252020-01-03 20:03:52 -0800101 axes.plot(monotonic_time, signal_data, marker='o', label=label_name)
James Kuszmaul61a971f2020-01-01 15:06:18 -0800102
103 def plot(self):
James Kuszmaul440ee252020-01-03 20:03:52 -0800104 shared_axis = None
James Kuszmaul61a971f2020-01-01 15:06:18 -0800105 for figure_config in self.config.figure:
106 fig = plt.figure()
107 num_subplots = len(figure_config.axes)
108 for ii in range(num_subplots):
James Kuszmaul7a7fe472020-01-12 22:07:10 -0800109 axes = fig.add_subplot(
110 num_subplots, 1, ii + 1, sharex=shared_axis)
James Kuszmaul440ee252020-01-03 20:03:52 -0800111 shared_axis = shared_axis or axes
James Kuszmaul61a971f2020-01-01 15:06:18 -0800112 axes_config = figure_config.axes[ii]
113 for signal in axes_config.signal:
114 self.plot_signal(axes, signal)
James Kuszmaul440ee252020-01-03 20:03:52 -0800115 # Make the legend transparent so we can see behind it.
116 legend = axes.legend(framealpha=0.5)
James Kuszmaul61a971f2020-01-01 15:06:18 -0800117 axes.set_xlabel("Monotonic Time (sec)")
James Kuszmaul440ee252020-01-03 20:03:52 -0800118 axes.grid(True)
James Kuszmaul61a971f2020-01-01 15:06:18 -0800119 if axes_config.HasField("ylabel"):
120 axes.set_ylabel(axes_config.ylabel)
121
122
123def main(argv):
124 parser = argparse.ArgumentParser(
125 description="Plot data from an aos logfile.")
126 parser.add_argument(
127 "--logfile",
128 type=str,
129 required=True,
130 help="Path to the logfile to parse.")
131 parser.add_argument(
132 "--config",
133 type=str,
134 required=True,
135 help="Name of the plot config to use.")
136 parser.add_argument(
137 "--config_dir",
138 type=str,
139 default="frc971/analysis/plot_configs",
140 help="Directory to look for plot configs in.")
141 args = parser.parse_args(argv[1:])
142
143 if not os.path.isdir(args.config_dir):
144 print(args.config_dir + " is not a directory.")
145 return 1
146 config_path = os.path.join(args.config_dir, args.config)
147 if not os.path.isfile(config_path):
148 print(config_path +
149 " does not exist or is not a file--available configs are")
150 for file_name in os.listdir(args.config_dir):
151 print(os.path.basename(file_name))
152 return 1
153
154 config = PlotConfig()
155 with open(config_path) as config_file:
156 text_format.Merge(config_file.read(), config)
157
158 if not os.path.isfile(args.logfile):
159 print(args.logfile + " is not a file.")
160 return 1
161
162 reader = LogReader(args.logfile)
163
164 plotter = Plotter(config, reader)
165 plotter.process_logfile()
166 plotter.plot()
167 plt.show()
168
169 return 0
170
171
172if __name__ == '__main__':
173 sys.exit(main(sys.argv))