James Kuszmaul | 7daef36 | 2019-12-31 18:28:17 -0800 | [diff] [blame] | 1 | #!/usr/bin/python3 |
| 2 | import json |
| 3 | import unittest |
| 4 | |
Stephan Pleines | 85b295c | 2024-02-04 17:50:26 -0800 | [diff] [blame^] | 5 | from aos.analysis.py_log_reader import LogReader |
James Kuszmaul | 7daef36 | 2019-12-31 18:28:17 -0800 | [diff] [blame] | 6 | |
| 7 | |
| 8 | class LogReaderTest(unittest.TestCase): |
Ravago Jones | 5127ccc | 2022-07-31 16:32:45 -0700 | [diff] [blame] | 9 | |
James Kuszmaul | 7daef36 | 2019-12-31 18:28:17 -0800 | [diff] [blame] | 10 | def setUp(self): |
| 11 | self.reader = LogReader("external/sample_logfile/file/log.fbs") |
| 12 | # A list of all the channels in the logfile--this is used to confirm that |
| 13 | # we did indeed read the config correctly. |
| 14 | self.all_channels = [ |
| 15 | ("/aos", "aos.JoystickState"), ("/aos", "aos.RobotState"), |
| 16 | ("/aos", "aos.timing.Report"), ("/aos", "frc971.PDPValues"), |
Ravago Jones | 5127ccc | 2022-07-31 16:32:45 -0700 | [diff] [blame] | 17 | ("/aos", "frc971.wpilib.PneumaticsToLog"), |
| 18 | ("/autonomous", "aos.common.actions.Status"), |
James Kuszmaul | 7daef36 | 2019-12-31 18:28:17 -0800 | [diff] [blame] | 19 | ("/autonomous", "frc971.autonomous.AutonomousMode"), |
Ravago Jones | 5127ccc | 2022-07-31 16:32:45 -0700 | [diff] [blame] | 20 | ("/autonomous", "frc971.autonomous.Goal"), |
| 21 | ("/camera", "y2019.CameraLog"), |
James Kuszmaul | 7daef36 | 2019-12-31 18:28:17 -0800 | [diff] [blame] | 22 | ("/camera", "y2019.control_loops.drivetrain.CameraFrame"), |
Ravago Jones | 5127ccc | 2022-07-31 16:32:45 -0700 | [diff] [blame] | 23 | ("/drivetrain", "frc971.IMUValues"), |
| 24 | ("/drivetrain", "frc971.control_loops.drivetrain.Goal"), |
James Kuszmaul | 7daef36 | 2019-12-31 18:28:17 -0800 | [diff] [blame] | 25 | ("/drivetrain", |
| 26 | "frc971.control_loops.drivetrain.LocalizerControl"), |
| 27 | ("/drivetrain", "frc971.control_loops.drivetrain.Output"), |
| 28 | ("/drivetrain", "frc971.control_loops.drivetrain.Position"), |
| 29 | ("/drivetrain", "frc971.control_loops.drivetrain.Status"), |
| 30 | ("/drivetrain", "frc971.sensors.GyroReading"), |
| 31 | ("/drivetrain", |
| 32 | "y2019.control_loops.drivetrain.TargetSelectorHint"), |
Ravago Jones | 5127ccc | 2022-07-31 16:32:45 -0700 | [diff] [blame] | 33 | ("/superstructure", "y2019.StatusLight"), |
| 34 | ("/superstructure", "y2019.control_loops.superstructure.Goal"), |
James Kuszmaul | 7daef36 | 2019-12-31 18:28:17 -0800 | [diff] [blame] | 35 | ("/superstructure", "y2019.control_loops.superstructure.Output"), |
| 36 | ("/superstructure", "y2019.control_loops.superstructure.Position"), |
| 37 | ("/superstructure", "y2019.control_loops.superstructure.Status") |
| 38 | ] |
| 39 | # A channel that is known to have data on it which we will use for testing. |
| 40 | self.test_channel = ("/aos", "aos.timing.Report") |
| 41 | # A non-existent channel |
| 42 | self.bad_channel = ("/aos", "aos.timing.FooBar") |
| 43 | |
| 44 | def test_do_nothing(self): |
| 45 | """Tests that we sanely handle doing nothing. |
| 46 | |
| 47 | A previous iteration of the log reader seg faulted when doing this.""" |
| 48 | pass |
| 49 | |
Austin Schuh | 7c75e58 | 2020-11-14 16:41:18 -0800 | [diff] [blame] | 50 | @unittest.skip("broken by flatbuffer upgrade") |
James Kuszmaul | 7daef36 | 2019-12-31 18:28:17 -0800 | [diff] [blame] | 51 | def test_read_config(self): |
| 52 | """Tests that we can read the configuration from the logfile.""" |
| 53 | config_bytes = self.reader.configuration() |
| 54 | config = Configuration.GetRootAsConfiguration(config_bytes, 0) |
| 55 | |
| 56 | channel_set = set(self.all_channels) |
| 57 | for ii in range(config.ChannelsLength()): |
| 58 | channel = config.Channels(ii) |
| 59 | # Will raise KeyError if the channel does not exist |
| 60 | channel_set.remove((channel.Name().decode("utf-8"), |
| 61 | channel.Type().decode("utf-8"))) |
| 62 | |
| 63 | self.assertEqual(0, len(channel_set)) |
| 64 | |
| 65 | def test_empty_process(self): |
| 66 | """Tests running process() without subscribing to anything succeeds.""" |
| 67 | self.reader.process() |
| 68 | for channel in self.all_channels: |
| 69 | with self.assertRaises(ValueError) as context: |
| 70 | self.reader.get_data_for_channel(channel[0], channel[1]) |
| 71 | |
| 72 | def test_subscribe(self): |
| 73 | """Tests that we can subscribe to a channel and get data out.""" |
| 74 | name = self.test_channel[0] |
| 75 | message_type = self.test_channel[1] |
| 76 | self.assertTrue(self.reader.subscribe(name, message_type)) |
| 77 | self.reader.process() |
| 78 | data = self.reader.get_data_for_channel(name, message_type) |
| 79 | self.assertLess(100, len(data)) |
| 80 | last_monotonic_time = 0 |
| 81 | for entry in data: |
| 82 | monotonic_time = entry[0] |
| 83 | realtime_time = entry[1] |
| 84 | json_data = entry[2].replace('nan', '\"nan\"') |
| 85 | self.assertLess(last_monotonic_time, monotonic_time) |
| 86 | # Sanity check that the realtime times are in the correct range. |
| 87 | self.assertLess(1500000000e9, realtime_time) |
| 88 | self.assertGreater(2000000000e9, realtime_time) |
| 89 | parsed_json = json.loads(json_data) |
| 90 | self.assertIn("name", parsed_json) |
| 91 | |
| 92 | last_monotonic_time = monotonic_time |
| 93 | |
| 94 | def test_bad_subscribe(self): |
| 95 | """Tests that we return false when subscribing to a non-existent channel.""" |
| 96 | self.assertFalse( |
| 97 | self.reader.subscribe(self.bad_channel[0], self.bad_channel[1]), |
| 98 | self.bad_channel) |
| 99 | |
| 100 | def test_subscribe_after_process(self): |
| 101 | """Tests that an exception is thrown if we subscribe after calling process().""" |
| 102 | self.reader.process() |
| 103 | for channel in self.all_channels: |
| 104 | with self.assertRaises(RuntimeError) as context: |
| 105 | self.reader.subscribe(channel[0], channel[1]) |
| 106 | |
| 107 | def test_get_data_before_processj(self): |
| 108 | """Tests that an exception is thrown if we retrieve data before calling process().""" |
| 109 | for channel in self.all_channels: |
| 110 | with self.assertRaises(RuntimeError) as context: |
| 111 | self.reader.get_data_for_channel(channel[0], channel[1]) |
| 112 | |
| 113 | |
| 114 | if __name__ == '__main__': |
| 115 | unittest.main() |