James Kuszmaul | 7daef36 | 2019-12-31 18:28:17 -0800 | [diff] [blame^] | 1 | #!/usr/bin/python3 |
| 2 | import json |
| 3 | import unittest |
| 4 | |
| 5 | from aos.Configuration import Configuration |
| 6 | from frc971.analysis.py_log_reader import LogReader |
| 7 | |
| 8 | |
| 9 | class LogReaderTest(unittest.TestCase): |
| 10 | def setUp(self): |
| 11 | self.reader = LogReader("external/sample_logfile/file/log.fbs") |
| 12 | # A list of all the channels in the logfile--this is used to confirm that |
| 13 | # we did indeed read the config correctly. |
| 14 | self.all_channels = [ |
| 15 | ("/aos", "aos.JoystickState"), ("/aos", "aos.RobotState"), |
| 16 | ("/aos", "aos.timing.Report"), ("/aos", "frc971.PDPValues"), |
| 17 | ("/aos", |
| 18 | "frc971.wpilib.PneumaticsToLog"), ("/autonomous", |
| 19 | "aos.common.actions.Status"), |
| 20 | ("/autonomous", "frc971.autonomous.AutonomousMode"), |
| 21 | ("/autonomous", "frc971.autonomous.Goal"), ("/camera", |
| 22 | "y2019.CameraLog"), |
| 23 | ("/camera", "y2019.control_loops.drivetrain.CameraFrame"), |
| 24 | ("/drivetrain", |
| 25 | "frc971.IMUValues"), ("/drivetrain", |
| 26 | "frc971.control_loops.drivetrain.Goal"), |
| 27 | ("/drivetrain", |
| 28 | "frc971.control_loops.drivetrain.LocalizerControl"), |
| 29 | ("/drivetrain", "frc971.control_loops.drivetrain.Output"), |
| 30 | ("/drivetrain", "frc971.control_loops.drivetrain.Position"), |
| 31 | ("/drivetrain", "frc971.control_loops.drivetrain.Status"), |
| 32 | ("/drivetrain", "frc971.sensors.GyroReading"), |
| 33 | ("/drivetrain", |
| 34 | "y2019.control_loops.drivetrain.TargetSelectorHint"), |
| 35 | ("/superstructure", |
| 36 | "y2019.StatusLight"), ("/superstructure", |
| 37 | "y2019.control_loops.superstructure.Goal"), |
| 38 | ("/superstructure", "y2019.control_loops.superstructure.Output"), |
| 39 | ("/superstructure", "y2019.control_loops.superstructure.Position"), |
| 40 | ("/superstructure", "y2019.control_loops.superstructure.Status") |
| 41 | ] |
| 42 | # A channel that is known to have data on it which we will use for testing. |
| 43 | self.test_channel = ("/aos", "aos.timing.Report") |
| 44 | # A non-existent channel |
| 45 | self.bad_channel = ("/aos", "aos.timing.FooBar") |
| 46 | |
| 47 | def test_do_nothing(self): |
| 48 | """Tests that we sanely handle doing nothing. |
| 49 | |
| 50 | A previous iteration of the log reader seg faulted when doing this.""" |
| 51 | pass |
| 52 | |
| 53 | def test_read_config(self): |
| 54 | """Tests that we can read the configuration from the logfile.""" |
| 55 | config_bytes = self.reader.configuration() |
| 56 | config = Configuration.GetRootAsConfiguration(config_bytes, 0) |
| 57 | |
| 58 | channel_set = set(self.all_channels) |
| 59 | for ii in range(config.ChannelsLength()): |
| 60 | channel = config.Channels(ii) |
| 61 | # Will raise KeyError if the channel does not exist |
| 62 | channel_set.remove((channel.Name().decode("utf-8"), |
| 63 | channel.Type().decode("utf-8"))) |
| 64 | |
| 65 | self.assertEqual(0, len(channel_set)) |
| 66 | |
| 67 | def test_empty_process(self): |
| 68 | """Tests running process() without subscribing to anything succeeds.""" |
| 69 | self.reader.process() |
| 70 | for channel in self.all_channels: |
| 71 | with self.assertRaises(ValueError) as context: |
| 72 | self.reader.get_data_for_channel(channel[0], channel[1]) |
| 73 | |
| 74 | def test_subscribe(self): |
| 75 | """Tests that we can subscribe to a channel and get data out.""" |
| 76 | name = self.test_channel[0] |
| 77 | message_type = self.test_channel[1] |
| 78 | self.assertTrue(self.reader.subscribe(name, message_type)) |
| 79 | self.reader.process() |
| 80 | data = self.reader.get_data_for_channel(name, message_type) |
| 81 | self.assertLess(100, len(data)) |
| 82 | last_monotonic_time = 0 |
| 83 | for entry in data: |
| 84 | monotonic_time = entry[0] |
| 85 | realtime_time = entry[1] |
| 86 | json_data = entry[2].replace('nan', '\"nan\"') |
| 87 | self.assertLess(last_monotonic_time, monotonic_time) |
| 88 | # Sanity check that the realtime times are in the correct range. |
| 89 | self.assertLess(1500000000e9, realtime_time) |
| 90 | self.assertGreater(2000000000e9, realtime_time) |
| 91 | parsed_json = json.loads(json_data) |
| 92 | self.assertIn("name", parsed_json) |
| 93 | |
| 94 | last_monotonic_time = monotonic_time |
| 95 | |
| 96 | def test_bad_subscribe(self): |
| 97 | """Tests that we return false when subscribing to a non-existent channel.""" |
| 98 | self.assertFalse( |
| 99 | self.reader.subscribe(self.bad_channel[0], self.bad_channel[1]), |
| 100 | self.bad_channel) |
| 101 | |
| 102 | def test_subscribe_after_process(self): |
| 103 | """Tests that an exception is thrown if we subscribe after calling process().""" |
| 104 | self.reader.process() |
| 105 | for channel in self.all_channels: |
| 106 | with self.assertRaises(RuntimeError) as context: |
| 107 | self.reader.subscribe(channel[0], channel[1]) |
| 108 | |
| 109 | def test_get_data_before_processj(self): |
| 110 | """Tests that an exception is thrown if we retrieve data before calling process().""" |
| 111 | for channel in self.all_channels: |
| 112 | with self.assertRaises(RuntimeError) as context: |
| 113 | self.reader.get_data_for_channel(channel[0], channel[1]) |
| 114 | |
| 115 | |
| 116 | if __name__ == '__main__': |
| 117 | unittest.main() |