blob: b41aa3827be64e2c4a778ef98eeca81499d78bb7 [file] [log] [blame]
James Kuszmaul7daef362019-12-31 18:28:17 -08001#!/usr/bin/python3
2import json
3import unittest
4
James Kuszmaul7daef362019-12-31 18:28:17 -08005from frc971.analysis.py_log_reader import LogReader
6
7
8class LogReaderTest(unittest.TestCase):
9 def setUp(self):
10 self.reader = LogReader("external/sample_logfile/file/log.fbs")
11 # A list of all the channels in the logfile--this is used to confirm that
12 # we did indeed read the config correctly.
13 self.all_channels = [
14 ("/aos", "aos.JoystickState"), ("/aos", "aos.RobotState"),
15 ("/aos", "aos.timing.Report"), ("/aos", "frc971.PDPValues"),
16 ("/aos",
17 "frc971.wpilib.PneumaticsToLog"), ("/autonomous",
18 "aos.common.actions.Status"),
19 ("/autonomous", "frc971.autonomous.AutonomousMode"),
20 ("/autonomous", "frc971.autonomous.Goal"), ("/camera",
21 "y2019.CameraLog"),
22 ("/camera", "y2019.control_loops.drivetrain.CameraFrame"),
23 ("/drivetrain",
24 "frc971.IMUValues"), ("/drivetrain",
25 "frc971.control_loops.drivetrain.Goal"),
26 ("/drivetrain",
27 "frc971.control_loops.drivetrain.LocalizerControl"),
28 ("/drivetrain", "frc971.control_loops.drivetrain.Output"),
29 ("/drivetrain", "frc971.control_loops.drivetrain.Position"),
30 ("/drivetrain", "frc971.control_loops.drivetrain.Status"),
31 ("/drivetrain", "frc971.sensors.GyroReading"),
32 ("/drivetrain",
33 "y2019.control_loops.drivetrain.TargetSelectorHint"),
34 ("/superstructure",
35 "y2019.StatusLight"), ("/superstructure",
36 "y2019.control_loops.superstructure.Goal"),
37 ("/superstructure", "y2019.control_loops.superstructure.Output"),
38 ("/superstructure", "y2019.control_loops.superstructure.Position"),
39 ("/superstructure", "y2019.control_loops.superstructure.Status")
40 ]
41 # A channel that is known to have data on it which we will use for testing.
42 self.test_channel = ("/aos", "aos.timing.Report")
43 # A non-existent channel
44 self.bad_channel = ("/aos", "aos.timing.FooBar")
45
46 def test_do_nothing(self):
47 """Tests that we sanely handle doing nothing.
48
49 A previous iteration of the log reader seg faulted when doing this."""
50 pass
51
Austin Schuh7c75e582020-11-14 16:41:18 -080052 @unittest.skip("broken by flatbuffer upgrade")
James Kuszmaul7daef362019-12-31 18:28:17 -080053 def test_read_config(self):
54 """Tests that we can read the configuration from the logfile."""
55 config_bytes = self.reader.configuration()
56 config = Configuration.GetRootAsConfiguration(config_bytes, 0)
57
58 channel_set = set(self.all_channels)
59 for ii in range(config.ChannelsLength()):
60 channel = config.Channels(ii)
61 # Will raise KeyError if the channel does not exist
62 channel_set.remove((channel.Name().decode("utf-8"),
63 channel.Type().decode("utf-8")))
64
65 self.assertEqual(0, len(channel_set))
66
67 def test_empty_process(self):
68 """Tests running process() without subscribing to anything succeeds."""
69 self.reader.process()
70 for channel in self.all_channels:
71 with self.assertRaises(ValueError) as context:
72 self.reader.get_data_for_channel(channel[0], channel[1])
73
74 def test_subscribe(self):
75 """Tests that we can subscribe to a channel and get data out."""
76 name = self.test_channel[0]
77 message_type = self.test_channel[1]
78 self.assertTrue(self.reader.subscribe(name, message_type))
79 self.reader.process()
80 data = self.reader.get_data_for_channel(name, message_type)
81 self.assertLess(100, len(data))
82 last_monotonic_time = 0
83 for entry in data:
84 monotonic_time = entry[0]
85 realtime_time = entry[1]
86 json_data = entry[2].replace('nan', '\"nan\"')
87 self.assertLess(last_monotonic_time, monotonic_time)
88 # Sanity check that the realtime times are in the correct range.
89 self.assertLess(1500000000e9, realtime_time)
90 self.assertGreater(2000000000e9, realtime_time)
91 parsed_json = json.loads(json_data)
92 self.assertIn("name", parsed_json)
93
94 last_monotonic_time = monotonic_time
95
96 def test_bad_subscribe(self):
97 """Tests that we return false when subscribing to a non-existent channel."""
98 self.assertFalse(
99 self.reader.subscribe(self.bad_channel[0], self.bad_channel[1]),
100 self.bad_channel)
101
102 def test_subscribe_after_process(self):
103 """Tests that an exception is thrown if we subscribe after calling process()."""
104 self.reader.process()
105 for channel in self.all_channels:
106 with self.assertRaises(RuntimeError) as context:
107 self.reader.subscribe(channel[0], channel[1])
108
109 def test_get_data_before_processj(self):
110 """Tests that an exception is thrown if we retrieve data before calling process()."""
111 for channel in self.all_channels:
112 with self.assertRaises(RuntimeError) as context:
113 self.reader.get_data_for_channel(channel[0], channel[1])
114
115
116if __name__ == '__main__':
117 unittest.main()