blob: b1035dd12b8048f6cea819f172f6b44aa89c7398 [file] [log] [blame]
James Kuszmaulcf324122023-01-14 14:07:17 -08001#! /usr/bin/env python3
2# Copyright (c) FIRST and other WPILib contributors.
3# Open Source Software; you can modify and/or share it under the terms of
4# the WPILib BSD license file in the root directory of this project.
5
6import array
7import struct
8from typing import List, SupportsBytes
9
10__all__ = ["StartRecordData", "MetadataRecordData", "DataLogRecord", "DataLogReader"]
11
12floatStruct = struct.Struct("<f")
13doubleStruct = struct.Struct("<d")
14
15kControlStart = 0
16kControlFinish = 1
17kControlSetMetadata = 2
18
19
20class StartRecordData:
21 """Data contained in a start control record as created by DataLog.start() when
22 writing the log. This can be read by calling DataLogRecord.getStartData().
23
24 entry: Entry ID; this will be used for this entry in future records.
25 name: Entry name.
26 type: Type of the stored data for this entry, as a string, e.g. "double".
27 metadata: Initial metadata.
28 """
29
30 def __init__(self, entry: int, name: str, type: str, metadata: str):
31 self.entry = entry
32 self.name = name
33 self.type = type
34 self.metadata = metadata
35
36
37class MetadataRecordData:
38 """Data contained in a set metadata control record as created by
39 DataLog.setMetadata(). This can be read by calling
40 DataLogRecord.getSetMetadataData().
41
42 entry: Entry ID.
43 metadata: New metadata for the entry.
44 """
45
46 def __init__(self, entry: int, metadata: str):
47 self.entry = entry
48 self.metadata = metadata
49
50
51class DataLogRecord:
52 """A record in the data log. May represent either a control record
53 (entry == 0) or a data record."""
54
55 def __init__(self, entry: int, timestamp: int, data: SupportsBytes):
56 self.entry = entry
57 self.timestamp = timestamp
58 self.data = data
59
60 def isControl(self) -> bool:
61 return self.entry == 0
62
63 def _getControlType(self) -> int:
64 return self.data[0]
65
66 def isStart(self) -> bool:
67 return (
68 self.entry == 0
69 and len(self.data) >= 17
70 and self._getControlType() == kControlStart
71 )
72
73 def isFinish(self) -> bool:
74 return (
75 self.entry == 0
76 and len(self.data) == 5
77 and self._getControlType() == kControlFinish
78 )
79
80 def isSetMetadata(self) -> bool:
81 return (
82 self.entry == 0
83 and len(self.data) >= 9
84 and self._getControlType() == kControlSetMetadata
85 )
86
87 def getStartData(self) -> StartRecordData:
88 if not self.isStart():
89 raise TypeError("not a start record")
90 entry = int.from_bytes(self.data[1:5], byteorder="little", signed=False)
91 name, pos = self._readInnerString(5)
92 type, pos = self._readInnerString(pos)
93 metadata = self._readInnerString(pos)[0]
94 return StartRecordData(entry, name, type, metadata)
95
96 def getFinishEntry(self) -> int:
97 if not self.isFinish():
98 raise TypeError("not a finish record")
99 return int.from_bytes(self.data[1:5], byteorder="little", signed=False)
100
101 def getSetMetadataData(self) -> MetadataRecordData:
102 if not self.isSetMetadata():
103 raise TypeError("not a finish record")
104 entry = int.from_bytes(self.data[1:5], byteorder="little", signed=False)
105 metadata = self._readInnerString(5)[0]
106 return MetadataRecordData(entry, metadata)
107
108 def getBoolean(self) -> bool:
109 if len(self.data) != 1:
110 raise TypeError("not a boolean")
111 return self.data[0] != 0
112
113 def getInteger(self) -> int:
114 if len(self.data) != 8:
115 raise TypeError("not an integer")
116 return int.from_bytes(self.data, byteorder="little", signed=True)
117
118 def getFloat(self) -> float:
119 if len(self.data) != 4:
120 raise TypeError("not a float")
121 return floatStruct.unpack(self.data)[0]
122
123 def getDouble(self) -> float:
124 if len(self.data) != 8:
125 raise TypeError("not a double")
126 return doubleStruct.unpack(self.data)[0]
127
128 def getString(self) -> str:
129 return str(self.data, encoding="utf-8")
130
131 def getBooleanArray(self) -> List[bool]:
132 return [x != 0 for x in self.data]
133
134 def getIntegerArray(self) -> array.array:
135 if (len(self.data) % 8) != 0:
136 raise TypeError("not an integer array")
137 arr = array.array("l")
138 arr.frombytes(self.data)
139 return arr
140
141 def getFloatArray(self) -> array.array:
142 if (len(self.data) % 4) != 0:
143 raise TypeError("not a float array")
144 arr = array.array("f")
145 arr.frombytes(self.data)
146 return arr
147
148 def getDoubleArray(self) -> array.array:
149 if (len(self.data) % 8) != 0:
150 raise TypeError("not a double array")
151 arr = array.array("d")
152 arr.frombytes(self.data)
153 return arr
154
155 def getStringArray(self) -> List[str]:
156 size = int.from_bytes(self.data[:4], byteorder="little", signed=False)
157 if size > ((len(self.data) - 4) / 4):
158 raise TypeError("not a string array")
159 arr = []
160 pos = 4
161 for _ in range(size):
162 val, pos = self._readInnerString(pos)
163 arr.append(val)
164 return arr
165
166 def _readInnerString(self, pos: int) -> tuple[str, int]:
167 size = int.from_bytes(
168 self.data[pos : pos + 4], byteorder="little", signed=False
169 )
170 end = pos + 4 + size
171 if end > len(self.data):
172 raise TypeError("invalid string size")
173 return str(self.data[pos + 4 : end], encoding="utf-8"), end
174
175
176class DataLogIterator:
177 """DataLogReader iterator."""
178
179 def __init__(self, buf: SupportsBytes, pos: int):
180 self.buf = buf
181 self.pos = pos
182
183 def __iter__(self):
184 return self
185
186 def _readVarInt(self, pos: int, len: int) -> int:
187 val = 0
188 for i in range(len):
189 val |= self.buf[pos + i] << (i * 8)
190 return val
191
192 def __next__(self) -> DataLogRecord:
193 if len(self.buf) < (self.pos + 4):
194 raise StopIteration
195 entryLen = (self.buf[self.pos] & 0x3) + 1
196 sizeLen = ((self.buf[self.pos] >> 2) & 0x3) + 1
197 timestampLen = ((self.buf[self.pos] >> 4) & 0x7) + 1
198 headerLen = 1 + entryLen + sizeLen + timestampLen
199 if len(self.buf) < (self.pos + headerLen):
200 raise StopIteration
201 entry = self._readVarInt(self.pos + 1, entryLen)
202 size = self._readVarInt(self.pos + 1 + entryLen, sizeLen)
203 timestamp = self._readVarInt(self.pos + 1 + entryLen + sizeLen, timestampLen)
204 if len(self.buf) < (self.pos + headerLen + size):
205 raise StopIteration
206 record = DataLogRecord(
207 entry,
208 timestamp,
209 self.buf[self.pos + headerLen : self.pos + headerLen + size],
210 )
211 self.pos += headerLen + size
212 return record
213
214
215class DataLogReader:
216 """Data log reader (reads logs written by the DataLog class)."""
217
218 def __init__(self, buf: SupportsBytes):
219 self.buf = buf
220
221 def __bool__(self):
222 return self.isValid()
223
224 def isValid(self) -> bool:
225 """Returns true if the data log is valid (e.g. has a valid header)."""
226 return (
227 len(self.buf) >= 12
228 and self.buf[:6] == b"WPILOG"
229 and self.getVersion() >= 0x0100
230 )
231
232 def getVersion(self) -> int:
233 """Gets the data log version. Returns 0 if data log is invalid.
234
235 @return Version number; most significant byte is major, least significant is
236 minor (so version 1.0 will be 0x0100)"""
237 if len(self.buf) < 12:
238 return 0
239 return int.from_bytes(self.buf[6:8], byteorder="little", signed=False)
240
241 def getExtraHeader(self) -> str:
242 """Gets the extra header data.
243
244 @return Extra header data
245 """
246 if len(self.buf) < 12:
247 return ""
248 size = int.from_bytes(self.buf[8:12], byteorder="little", signed=False)
249 return str(self.buf[12 : 12 + size], encoding="utf-8")
250
251 def __iter__(self) -> DataLogIterator:
252 extraHeaderSize = int.from_bytes(
253 self.buf[8:12], byteorder="little", signed=False
254 )
255 return DataLogIterator(self.buf, 12 + extraHeaderSize)
256
257
258if __name__ == "__main__":
259 from datetime import datetime
260 import mmap
261 import sys
262
263 if len(sys.argv) != 2:
264 print("Usage: datalog.py <file>", file=sys.stderr)
265 sys.exit(1)
266
267 with open(sys.argv[1], "r") as f:
268 mm = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
269 reader = DataLogReader(mm)
270 if not reader:
271 print("not a log file", file=sys.stderr)
272 sys.exit(1)
273
274 entries = {}
275 for record in reader:
276 timestamp = record.timestamp / 1000000
277 if record.isStart():
278 try:
279 data = record.getStartData()
280 print(
281 f"Start({data.entry}, name='{data.name}', type='{data.type}', metadata='{data.metadata}') [{timestamp}]"
282 )
283 if data.entry in entries:
284 print("...DUPLICATE entry ID, overriding")
285 entries[data.entry] = data
286 except TypeError as e:
287 print("Start(INVALID)")
288 elif record.isFinish():
289 try:
290 entry = record.getFinishEntry()
291 print(f"Finish({entry}) [{timestamp}]")
292 if entry not in entries:
293 print("...ID not found")
294 else:
295 del entries[entry]
296 except TypeError as e:
297 print("Finish(INVALID)")
298 elif record.isSetMetadata():
299 try:
300 data = record.getSetMetadataData()
301 print(f"SetMetadata({data.entry}, '{data.metadata}') [{timestamp}]")
302 if data.entry not in entries:
303 print("...ID not found")
304 except TypeError as e:
305 print("SetMetadata(INVALID)")
306 elif record.isControl():
307 print("Unrecognized control record")
308 else:
309 print(f"Data({record.entry}, size={len(record.data)}) ", end="")
310 entry = entries.get(record.entry)
311 if entry is None:
312 print("<ID not found>")
313 continue
314 print(f"<name='{entry.name}', type='{entry.type}'> [{timestamp}]")
315
316 try:
317 # handle systemTime specially
318 if entry.name == "systemTime" and entry.type == "int64":
319 dt = datetime.fromtimestamp(record.getInteger() / 1000000)
320 print(" {:%Y-%m-%d %H:%M:%S.%f}".format(dt))
321 continue
322
323 if entry.type == "double":
324 print(f" {record.getDouble()}")
325 elif entry.type == "int64":
326 print(f" {record.getInteger()}")
327 elif entry.type in ("string", "json"):
328 print(f" '{record.getString()}'")
329 elif entry.type == "boolean":
330 print(f" {record.getBoolean()}")
331 elif entry.type == "boolean[]":
332 arr = record.getBooleanArray()
333 print(f" {arr}")
334 elif entry.type == "double[]":
335 arr = record.getDoubleArray()
336 print(f" {arr}")
337 elif entry.type == "float[]":
338 arr = record.getFloatArray()
339 print(f" {arr}")
340 elif entry.type == "int64[]":
341 arr = record.getIntegerArray()
342 print(f" {arr}")
343 elif entry.type == "string[]":
344 arr = record.getStringArray()
345 print(f" {arr}")
346 except TypeError as e:
347 print(" invalid")