blob: d616da418adec3dc98e98635df91d92d0e1a5077 [file] [log] [blame]
Brian Silverman4787a6e2018-10-06 16:00:54 -07001#!/usr/bin/python3
2
3# This is a program to parse output from the ITM and DWT.
4# The "Debug ITM and DWT Packet Protocol" section of the ARMv7-M Architecture
5# Reference Manual is a good reference.
6#
7# This seems like it might be a poster child for using coroutines, but those
8# look scary so we're going to stick with generators.
9
10import io
11import os
12import sys
13
14def open_file_for_bytes(path):
15 '''Returns a file-like object which reads bytes without buffering.'''
16 # Not using `open` because it's unclear from the docs how (if it's possible at
17 # all) to get something that will only do one read call and return what that
18 # gets on a fifo.
19 try:
20 return io.FileIO(path, 'r')
21 except FileNotFoundError:
22 # If it wasn't found, try (once) to create it and then open again.
23 try:
24 os.mkfifo(path)
25 except FileExistsError:
26 pass
27 return io.FileIO(path, 'r')
28
29def read_bytes(path):
30 '''Reads bytes from a file. This is appropriate both for regular files and
31 fifos.
32 Args:
33 path: A path-like object to open.
34 Yields:
35 Individual bytes from the file, until hitting EOF.
36 '''
37 with open_file_for_bytes(path) as f:
38 while True:
39 buf = f.read(1024)
40 if not buf:
41 return
42 for byte in buf:
43 yield byte
44
45def parse_packets(source):
46 '''Parses a stream of bytes into packets.
47 Args:
48 source: A generator of individual bytes.
49 Generates:
50 Packets as bytes objects.
51 '''
52 try:
53 while True:
54 header = next(source)
55 if header == 0:
56 # Synchronization packets consist of a bunch of 0 bits (not necessarily
57 # a whole number of bytes), followed by a 128 byte. This is for hardware
58 # to synchronize on, but we're not in a position to do that, so
59 # presumably those should get filtered out before getting here?
60 raise 'Not sure how to handle synchronization packets'
61 packet = bytearray()
62 packet.append(header)
63 header_size = header & 3
64 if header_size == 0:
65 while packet[-1] & 128 and len(packet) < 7:
66 packet.append(next(source))
67 else:
68 if header_size == 3:
69 header_size = 4
70 for _ in range(header_size):
71 packet.append(next(source))
72 yield bytes(packet)
73 except StopIteration:
74 return
75
76class PacketParser(object):
77 def __init__(self):
78 self.stimulus_handlers = {}
79
80 def register_stimulus_handler(self, port_number, handler):
81 '''Registers a function to call on packets to the specified port.'''
82 self.stimulus_handlers[port_number] = handler
83
84 def process(self, path):
85 for packet in parse_packets(read_bytes(path)):
86 header = packet[0]
87 header_size = header & 3
88 if header_size == 0:
89 # TODO(Brian): At least handle overflow packets here.
90 pass
91 else:
92 port_number = header >> 3
93 if port_number in self.stimulus_handlers:
94 self.stimulus_handlers[port_number](packet[1:])
95 else:
96 print('Warning: unhandled stimulus port %d' % port_number,
97 file=sys.stderr)
98 self.stimulus_handlers[port_number] = lambda _: None
99
100if __name__ == '__main__':
101 parser = PacketParser()
102 def print_byte(payload):
103 sys.stdout.write(payload.decode('ascii'))
104 parser.register_stimulus_handler(0, print_byte)
105
106 for path in sys.argv[1:]:
107 parser.process(path)