blob: 7af08e6b13f7d8a4a3b1539545d1f908c2b8c847 [file] [log] [blame]
James Kuszmaul7daef362019-12-31 18:28:17 -08001#!/usr/bin/python3
2import json
3import unittest
4
James Kuszmaul7daef362019-12-31 18:28:17 -08005from frc971.analysis.py_log_reader import LogReader
6
7
8class LogReaderTest(unittest.TestCase):
Ravago Jones5127ccc2022-07-31 16:32:45 -07009
James Kuszmaul7daef362019-12-31 18:28:17 -080010 def setUp(self):
11 self.reader = LogReader("external/sample_logfile/file/log.fbs")
12 # A list of all the channels in the logfile--this is used to confirm that
13 # we did indeed read the config correctly.
14 self.all_channels = [
15 ("/aos", "aos.JoystickState"), ("/aos", "aos.RobotState"),
16 ("/aos", "aos.timing.Report"), ("/aos", "frc971.PDPValues"),
Ravago Jones5127ccc2022-07-31 16:32:45 -070017 ("/aos", "frc971.wpilib.PneumaticsToLog"),
18 ("/autonomous", "aos.common.actions.Status"),
James Kuszmaul7daef362019-12-31 18:28:17 -080019 ("/autonomous", "frc971.autonomous.AutonomousMode"),
Ravago Jones5127ccc2022-07-31 16:32:45 -070020 ("/autonomous", "frc971.autonomous.Goal"),
21 ("/camera", "y2019.CameraLog"),
James Kuszmaul7daef362019-12-31 18:28:17 -080022 ("/camera", "y2019.control_loops.drivetrain.CameraFrame"),
Ravago Jones5127ccc2022-07-31 16:32:45 -070023 ("/drivetrain", "frc971.IMUValues"),
24 ("/drivetrain", "frc971.control_loops.drivetrain.Goal"),
James Kuszmaul7daef362019-12-31 18:28:17 -080025 ("/drivetrain",
26 "frc971.control_loops.drivetrain.LocalizerControl"),
27 ("/drivetrain", "frc971.control_loops.drivetrain.Output"),
28 ("/drivetrain", "frc971.control_loops.drivetrain.Position"),
29 ("/drivetrain", "frc971.control_loops.drivetrain.Status"),
30 ("/drivetrain", "frc971.sensors.GyroReading"),
31 ("/drivetrain",
32 "y2019.control_loops.drivetrain.TargetSelectorHint"),
Ravago Jones5127ccc2022-07-31 16:32:45 -070033 ("/superstructure", "y2019.StatusLight"),
34 ("/superstructure", "y2019.control_loops.superstructure.Goal"),
James Kuszmaul7daef362019-12-31 18:28:17 -080035 ("/superstructure", "y2019.control_loops.superstructure.Output"),
36 ("/superstructure", "y2019.control_loops.superstructure.Position"),
37 ("/superstructure", "y2019.control_loops.superstructure.Status")
38 ]
39 # A channel that is known to have data on it which we will use for testing.
40 self.test_channel = ("/aos", "aos.timing.Report")
41 # A non-existent channel
42 self.bad_channel = ("/aos", "aos.timing.FooBar")
43
44 def test_do_nothing(self):
45 """Tests that we sanely handle doing nothing.
46
47 A previous iteration of the log reader seg faulted when doing this."""
48 pass
49
Austin Schuh7c75e582020-11-14 16:41:18 -080050 @unittest.skip("broken by flatbuffer upgrade")
James Kuszmaul7daef362019-12-31 18:28:17 -080051 def test_read_config(self):
52 """Tests that we can read the configuration from the logfile."""
53 config_bytes = self.reader.configuration()
54 config = Configuration.GetRootAsConfiguration(config_bytes, 0)
55
56 channel_set = set(self.all_channels)
57 for ii in range(config.ChannelsLength()):
58 channel = config.Channels(ii)
59 # Will raise KeyError if the channel does not exist
60 channel_set.remove((channel.Name().decode("utf-8"),
61 channel.Type().decode("utf-8")))
62
63 self.assertEqual(0, len(channel_set))
64
65 def test_empty_process(self):
66 """Tests running process() without subscribing to anything succeeds."""
67 self.reader.process()
68 for channel in self.all_channels:
69 with self.assertRaises(ValueError) as context:
70 self.reader.get_data_for_channel(channel[0], channel[1])
71
72 def test_subscribe(self):
73 """Tests that we can subscribe to a channel and get data out."""
74 name = self.test_channel[0]
75 message_type = self.test_channel[1]
76 self.assertTrue(self.reader.subscribe(name, message_type))
77 self.reader.process()
78 data = self.reader.get_data_for_channel(name, message_type)
79 self.assertLess(100, len(data))
80 last_monotonic_time = 0
81 for entry in data:
82 monotonic_time = entry[0]
83 realtime_time = entry[1]
84 json_data = entry[2].replace('nan', '\"nan\"')
85 self.assertLess(last_monotonic_time, monotonic_time)
86 # Sanity check that the realtime times are in the correct range.
87 self.assertLess(1500000000e9, realtime_time)
88 self.assertGreater(2000000000e9, realtime_time)
89 parsed_json = json.loads(json_data)
90 self.assertIn("name", parsed_json)
91
92 last_monotonic_time = monotonic_time
93
94 def test_bad_subscribe(self):
95 """Tests that we return false when subscribing to a non-existent channel."""
96 self.assertFalse(
97 self.reader.subscribe(self.bad_channel[0], self.bad_channel[1]),
98 self.bad_channel)
99
100 def test_subscribe_after_process(self):
101 """Tests that an exception is thrown if we subscribe after calling process()."""
102 self.reader.process()
103 for channel in self.all_channels:
104 with self.assertRaises(RuntimeError) as context:
105 self.reader.subscribe(channel[0], channel[1])
106
107 def test_get_data_before_processj(self):
108 """Tests that an exception is thrown if we retrieve data before calling process()."""
109 for channel in self.all_channels:
110 with self.assertRaises(RuntimeError) as context:
111 self.reader.get_data_for_channel(channel[0], channel[1])
112
113
114if __name__ == '__main__':
115 unittest.main()