blob: 3389d00240437695a7181a5c88bbf6874b96d8b0 [file] [log] [blame]
James Kuszmaul7daef362019-12-31 18:28:17 -08001#!/usr/bin/python3
2import json
3import unittest
4
5from aos.Configuration import Configuration
6from frc971.analysis.py_log_reader import LogReader
7
8
9class LogReaderTest(unittest.TestCase):
10 def setUp(self):
11 self.reader = LogReader("external/sample_logfile/file/log.fbs")
12 # A list of all the channels in the logfile--this is used to confirm that
13 # we did indeed read the config correctly.
14 self.all_channels = [
15 ("/aos", "aos.JoystickState"), ("/aos", "aos.RobotState"),
16 ("/aos", "aos.timing.Report"), ("/aos", "frc971.PDPValues"),
17 ("/aos",
18 "frc971.wpilib.PneumaticsToLog"), ("/autonomous",
19 "aos.common.actions.Status"),
20 ("/autonomous", "frc971.autonomous.AutonomousMode"),
21 ("/autonomous", "frc971.autonomous.Goal"), ("/camera",
22 "y2019.CameraLog"),
23 ("/camera", "y2019.control_loops.drivetrain.CameraFrame"),
24 ("/drivetrain",
25 "frc971.IMUValues"), ("/drivetrain",
26 "frc971.control_loops.drivetrain.Goal"),
27 ("/drivetrain",
28 "frc971.control_loops.drivetrain.LocalizerControl"),
29 ("/drivetrain", "frc971.control_loops.drivetrain.Output"),
30 ("/drivetrain", "frc971.control_loops.drivetrain.Position"),
31 ("/drivetrain", "frc971.control_loops.drivetrain.Status"),
32 ("/drivetrain", "frc971.sensors.GyroReading"),
33 ("/drivetrain",
34 "y2019.control_loops.drivetrain.TargetSelectorHint"),
35 ("/superstructure",
36 "y2019.StatusLight"), ("/superstructure",
37 "y2019.control_loops.superstructure.Goal"),
38 ("/superstructure", "y2019.control_loops.superstructure.Output"),
39 ("/superstructure", "y2019.control_loops.superstructure.Position"),
40 ("/superstructure", "y2019.control_loops.superstructure.Status")
41 ]
42 # A channel that is known to have data on it which we will use for testing.
43 self.test_channel = ("/aos", "aos.timing.Report")
44 # A non-existent channel
45 self.bad_channel = ("/aos", "aos.timing.FooBar")
46
47 def test_do_nothing(self):
48 """Tests that we sanely handle doing nothing.
49
50 A previous iteration of the log reader seg faulted when doing this."""
51 pass
52
53 def test_read_config(self):
54 """Tests that we can read the configuration from the logfile."""
55 config_bytes = self.reader.configuration()
56 config = Configuration.GetRootAsConfiguration(config_bytes, 0)
57
58 channel_set = set(self.all_channels)
59 for ii in range(config.ChannelsLength()):
60 channel = config.Channels(ii)
61 # Will raise KeyError if the channel does not exist
62 channel_set.remove((channel.Name().decode("utf-8"),
63 channel.Type().decode("utf-8")))
64
65 self.assertEqual(0, len(channel_set))
66
67 def test_empty_process(self):
68 """Tests running process() without subscribing to anything succeeds."""
69 self.reader.process()
70 for channel in self.all_channels:
71 with self.assertRaises(ValueError) as context:
72 self.reader.get_data_for_channel(channel[0], channel[1])
73
74 def test_subscribe(self):
75 """Tests that we can subscribe to a channel and get data out."""
76 name = self.test_channel[0]
77 message_type = self.test_channel[1]
78 self.assertTrue(self.reader.subscribe(name, message_type))
79 self.reader.process()
80 data = self.reader.get_data_for_channel(name, message_type)
81 self.assertLess(100, len(data))
82 last_monotonic_time = 0
83 for entry in data:
84 monotonic_time = entry[0]
85 realtime_time = entry[1]
86 json_data = entry[2].replace('nan', '\"nan\"')
87 self.assertLess(last_monotonic_time, monotonic_time)
88 # Sanity check that the realtime times are in the correct range.
89 self.assertLess(1500000000e9, realtime_time)
90 self.assertGreater(2000000000e9, realtime_time)
91 parsed_json = json.loads(json_data)
92 self.assertIn("name", parsed_json)
93
94 last_monotonic_time = monotonic_time
95
96 def test_bad_subscribe(self):
97 """Tests that we return false when subscribing to a non-existent channel."""
98 self.assertFalse(
99 self.reader.subscribe(self.bad_channel[0], self.bad_channel[1]),
100 self.bad_channel)
101
102 def test_subscribe_after_process(self):
103 """Tests that an exception is thrown if we subscribe after calling process()."""
104 self.reader.process()
105 for channel in self.all_channels:
106 with self.assertRaises(RuntimeError) as context:
107 self.reader.subscribe(channel[0], channel[1])
108
109 def test_get_data_before_processj(self):
110 """Tests that an exception is thrown if we retrieve data before calling process()."""
111 for channel in self.all_channels:
112 with self.assertRaises(RuntimeError) as context:
113 self.reader.get_data_for_channel(channel[0], channel[1])
114
115
116if __name__ == '__main__':
117 unittest.main()