Brian Silverman | 4787a6e | 2018-10-06 16:00:54 -0700 | [diff] [blame] | 1 | #!/usr/bin/python3 |
| 2 | |
| 3 | # This is a program to parse output from the ITM and DWT. |
| 4 | # The "Debug ITM and DWT Packet Protocol" section of the ARMv7-M Architecture |
| 5 | # Reference Manual is a good reference. |
| 6 | # |
| 7 | # This seems like it might be a poster child for using coroutines, but those |
| 8 | # look scary so we're going to stick with generators. |
| 9 | |
| 10 | import io |
| 11 | import os |
| 12 | import sys |
| 13 | |
| 14 | def open_file_for_bytes(path): |
| 15 | '''Returns a file-like object which reads bytes without buffering.''' |
| 16 | # Not using `open` because it's unclear from the docs how (if it's possible at |
| 17 | # all) to get something that will only do one read call and return what that |
| 18 | # gets on a fifo. |
| 19 | try: |
| 20 | return io.FileIO(path, 'r') |
| 21 | except FileNotFoundError: |
| 22 | # If it wasn't found, try (once) to create it and then open again. |
| 23 | try: |
| 24 | os.mkfifo(path) |
| 25 | except FileExistsError: |
| 26 | pass |
| 27 | return io.FileIO(path, 'r') |
| 28 | |
| 29 | def read_bytes(path): |
| 30 | '''Reads bytes from a file. This is appropriate both for regular files and |
| 31 | fifos. |
| 32 | Args: |
| 33 | path: A path-like object to open. |
| 34 | Yields: |
| 35 | Individual bytes from the file, until hitting EOF. |
| 36 | ''' |
| 37 | with open_file_for_bytes(path) as f: |
| 38 | while True: |
| 39 | buf = f.read(1024) |
| 40 | if not buf: |
| 41 | return |
| 42 | for byte in buf: |
| 43 | yield byte |
| 44 | |
| 45 | def parse_packets(source): |
| 46 | '''Parses a stream of bytes into packets. |
| 47 | Args: |
| 48 | source: A generator of individual bytes. |
| 49 | Generates: |
| 50 | Packets as bytes objects. |
| 51 | ''' |
| 52 | try: |
| 53 | while True: |
| 54 | header = next(source) |
| 55 | if header == 0: |
| 56 | # Synchronization packets consist of a bunch of 0 bits (not necessarily |
| 57 | # a whole number of bytes), followed by a 128 byte. This is for hardware |
| 58 | # to synchronize on, but we're not in a position to do that, so |
| 59 | # presumably those should get filtered out before getting here? |
| 60 | raise 'Not sure how to handle synchronization packets' |
| 61 | packet = bytearray() |
| 62 | packet.append(header) |
| 63 | header_size = header & 3 |
| 64 | if header_size == 0: |
| 65 | while packet[-1] & 128 and len(packet) < 7: |
| 66 | packet.append(next(source)) |
| 67 | else: |
| 68 | if header_size == 3: |
| 69 | header_size = 4 |
| 70 | for _ in range(header_size): |
| 71 | packet.append(next(source)) |
| 72 | yield bytes(packet) |
| 73 | except StopIteration: |
| 74 | return |
| 75 | |
| 76 | class PacketParser(object): |
| 77 | def __init__(self): |
| 78 | self.stimulus_handlers = {} |
| 79 | |
| 80 | def register_stimulus_handler(self, port_number, handler): |
| 81 | '''Registers a function to call on packets to the specified port.''' |
| 82 | self.stimulus_handlers[port_number] = handler |
| 83 | |
| 84 | def process(self, path): |
| 85 | for packet in parse_packets(read_bytes(path)): |
| 86 | header = packet[0] |
| 87 | header_size = header & 3 |
| 88 | if header_size == 0: |
| 89 | # TODO(Brian): At least handle overflow packets here. |
| 90 | pass |
| 91 | else: |
| 92 | port_number = header >> 3 |
| 93 | if port_number in self.stimulus_handlers: |
| 94 | self.stimulus_handlers[port_number](packet[1:]) |
| 95 | else: |
| 96 | print('Warning: unhandled stimulus port %d' % port_number, |
| 97 | file=sys.stderr) |
| 98 | self.stimulus_handlers[port_number] = lambda _: None |
| 99 | |
| 100 | if __name__ == '__main__': |
| 101 | parser = PacketParser() |
| 102 | def print_byte(payload): |
| 103 | sys.stdout.write(payload.decode('ascii')) |
| 104 | parser.register_stimulus_handler(0, print_byte) |
| 105 | |
| 106 | for path in sys.argv[1:]: |
| 107 | parser.process(path) |