James Kuszmaul | cf32412 | 2023-01-14 14:07:17 -0800 | [diff] [blame^] | 1 | #! /usr/bin/env python3 |
| 2 | # Copyright (c) FIRST and other WPILib contributors. |
| 3 | # Open Source Software; you can modify and/or share it under the terms of |
| 4 | # the WPILib BSD license file in the root directory of this project. |
| 5 | |
| 6 | import array |
| 7 | import struct |
| 8 | from typing import List, SupportsBytes |
| 9 | |
| 10 | __all__ = ["StartRecordData", "MetadataRecordData", "DataLogRecord", "DataLogReader"] |
| 11 | |
| 12 | floatStruct = struct.Struct("<f") |
| 13 | doubleStruct = struct.Struct("<d") |
| 14 | |
| 15 | kControlStart = 0 |
| 16 | kControlFinish = 1 |
| 17 | kControlSetMetadata = 2 |
| 18 | |
| 19 | |
| 20 | class StartRecordData: |
| 21 | """Data contained in a start control record as created by DataLog.start() when |
| 22 | writing the log. This can be read by calling DataLogRecord.getStartData(). |
| 23 | |
| 24 | entry: Entry ID; this will be used for this entry in future records. |
| 25 | name: Entry name. |
| 26 | type: Type of the stored data for this entry, as a string, e.g. "double". |
| 27 | metadata: Initial metadata. |
| 28 | """ |
| 29 | |
| 30 | def __init__(self, entry: int, name: str, type: str, metadata: str): |
| 31 | self.entry = entry |
| 32 | self.name = name |
| 33 | self.type = type |
| 34 | self.metadata = metadata |
| 35 | |
| 36 | |
| 37 | class MetadataRecordData: |
| 38 | """Data contained in a set metadata control record as created by |
| 39 | DataLog.setMetadata(). This can be read by calling |
| 40 | DataLogRecord.getSetMetadataData(). |
| 41 | |
| 42 | entry: Entry ID. |
| 43 | metadata: New metadata for the entry. |
| 44 | """ |
| 45 | |
| 46 | def __init__(self, entry: int, metadata: str): |
| 47 | self.entry = entry |
| 48 | self.metadata = metadata |
| 49 | |
| 50 | |
| 51 | class DataLogRecord: |
| 52 | """A record in the data log. May represent either a control record |
| 53 | (entry == 0) or a data record.""" |
| 54 | |
| 55 | def __init__(self, entry: int, timestamp: int, data: SupportsBytes): |
| 56 | self.entry = entry |
| 57 | self.timestamp = timestamp |
| 58 | self.data = data |
| 59 | |
| 60 | def isControl(self) -> bool: |
| 61 | return self.entry == 0 |
| 62 | |
| 63 | def _getControlType(self) -> int: |
| 64 | return self.data[0] |
| 65 | |
| 66 | def isStart(self) -> bool: |
| 67 | return ( |
| 68 | self.entry == 0 |
| 69 | and len(self.data) >= 17 |
| 70 | and self._getControlType() == kControlStart |
| 71 | ) |
| 72 | |
| 73 | def isFinish(self) -> bool: |
| 74 | return ( |
| 75 | self.entry == 0 |
| 76 | and len(self.data) == 5 |
| 77 | and self._getControlType() == kControlFinish |
| 78 | ) |
| 79 | |
| 80 | def isSetMetadata(self) -> bool: |
| 81 | return ( |
| 82 | self.entry == 0 |
| 83 | and len(self.data) >= 9 |
| 84 | and self._getControlType() == kControlSetMetadata |
| 85 | ) |
| 86 | |
| 87 | def getStartData(self) -> StartRecordData: |
| 88 | if not self.isStart(): |
| 89 | raise TypeError("not a start record") |
| 90 | entry = int.from_bytes(self.data[1:5], byteorder="little", signed=False) |
| 91 | name, pos = self._readInnerString(5) |
| 92 | type, pos = self._readInnerString(pos) |
| 93 | metadata = self._readInnerString(pos)[0] |
| 94 | return StartRecordData(entry, name, type, metadata) |
| 95 | |
| 96 | def getFinishEntry(self) -> int: |
| 97 | if not self.isFinish(): |
| 98 | raise TypeError("not a finish record") |
| 99 | return int.from_bytes(self.data[1:5], byteorder="little", signed=False) |
| 100 | |
| 101 | def getSetMetadataData(self) -> MetadataRecordData: |
| 102 | if not self.isSetMetadata(): |
| 103 | raise TypeError("not a finish record") |
| 104 | entry = int.from_bytes(self.data[1:5], byteorder="little", signed=False) |
| 105 | metadata = self._readInnerString(5)[0] |
| 106 | return MetadataRecordData(entry, metadata) |
| 107 | |
| 108 | def getBoolean(self) -> bool: |
| 109 | if len(self.data) != 1: |
| 110 | raise TypeError("not a boolean") |
| 111 | return self.data[0] != 0 |
| 112 | |
| 113 | def getInteger(self) -> int: |
| 114 | if len(self.data) != 8: |
| 115 | raise TypeError("not an integer") |
| 116 | return int.from_bytes(self.data, byteorder="little", signed=True) |
| 117 | |
| 118 | def getFloat(self) -> float: |
| 119 | if len(self.data) != 4: |
| 120 | raise TypeError("not a float") |
| 121 | return floatStruct.unpack(self.data)[0] |
| 122 | |
| 123 | def getDouble(self) -> float: |
| 124 | if len(self.data) != 8: |
| 125 | raise TypeError("not a double") |
| 126 | return doubleStruct.unpack(self.data)[0] |
| 127 | |
| 128 | def getString(self) -> str: |
| 129 | return str(self.data, encoding="utf-8") |
| 130 | |
| 131 | def getBooleanArray(self) -> List[bool]: |
| 132 | return [x != 0 for x in self.data] |
| 133 | |
| 134 | def getIntegerArray(self) -> array.array: |
| 135 | if (len(self.data) % 8) != 0: |
| 136 | raise TypeError("not an integer array") |
| 137 | arr = array.array("l") |
| 138 | arr.frombytes(self.data) |
| 139 | return arr |
| 140 | |
| 141 | def getFloatArray(self) -> array.array: |
| 142 | if (len(self.data) % 4) != 0: |
| 143 | raise TypeError("not a float array") |
| 144 | arr = array.array("f") |
| 145 | arr.frombytes(self.data) |
| 146 | return arr |
| 147 | |
| 148 | def getDoubleArray(self) -> array.array: |
| 149 | if (len(self.data) % 8) != 0: |
| 150 | raise TypeError("not a double array") |
| 151 | arr = array.array("d") |
| 152 | arr.frombytes(self.data) |
| 153 | return arr |
| 154 | |
| 155 | def getStringArray(self) -> List[str]: |
| 156 | size = int.from_bytes(self.data[:4], byteorder="little", signed=False) |
| 157 | if size > ((len(self.data) - 4) / 4): |
| 158 | raise TypeError("not a string array") |
| 159 | arr = [] |
| 160 | pos = 4 |
| 161 | for _ in range(size): |
| 162 | val, pos = self._readInnerString(pos) |
| 163 | arr.append(val) |
| 164 | return arr |
| 165 | |
| 166 | def _readInnerString(self, pos: int) -> tuple[str, int]: |
| 167 | size = int.from_bytes( |
| 168 | self.data[pos : pos + 4], byteorder="little", signed=False |
| 169 | ) |
| 170 | end = pos + 4 + size |
| 171 | if end > len(self.data): |
| 172 | raise TypeError("invalid string size") |
| 173 | return str(self.data[pos + 4 : end], encoding="utf-8"), end |
| 174 | |
| 175 | |
| 176 | class DataLogIterator: |
| 177 | """DataLogReader iterator.""" |
| 178 | |
| 179 | def __init__(self, buf: SupportsBytes, pos: int): |
| 180 | self.buf = buf |
| 181 | self.pos = pos |
| 182 | |
| 183 | def __iter__(self): |
| 184 | return self |
| 185 | |
| 186 | def _readVarInt(self, pos: int, len: int) -> int: |
| 187 | val = 0 |
| 188 | for i in range(len): |
| 189 | val |= self.buf[pos + i] << (i * 8) |
| 190 | return val |
| 191 | |
| 192 | def __next__(self) -> DataLogRecord: |
| 193 | if len(self.buf) < (self.pos + 4): |
| 194 | raise StopIteration |
| 195 | entryLen = (self.buf[self.pos] & 0x3) + 1 |
| 196 | sizeLen = ((self.buf[self.pos] >> 2) & 0x3) + 1 |
| 197 | timestampLen = ((self.buf[self.pos] >> 4) & 0x7) + 1 |
| 198 | headerLen = 1 + entryLen + sizeLen + timestampLen |
| 199 | if len(self.buf) < (self.pos + headerLen): |
| 200 | raise StopIteration |
| 201 | entry = self._readVarInt(self.pos + 1, entryLen) |
| 202 | size = self._readVarInt(self.pos + 1 + entryLen, sizeLen) |
| 203 | timestamp = self._readVarInt(self.pos + 1 + entryLen + sizeLen, timestampLen) |
| 204 | if len(self.buf) < (self.pos + headerLen + size): |
| 205 | raise StopIteration |
| 206 | record = DataLogRecord( |
| 207 | entry, |
| 208 | timestamp, |
| 209 | self.buf[self.pos + headerLen : self.pos + headerLen + size], |
| 210 | ) |
| 211 | self.pos += headerLen + size |
| 212 | return record |
| 213 | |
| 214 | |
| 215 | class DataLogReader: |
| 216 | """Data log reader (reads logs written by the DataLog class).""" |
| 217 | |
| 218 | def __init__(self, buf: SupportsBytes): |
| 219 | self.buf = buf |
| 220 | |
| 221 | def __bool__(self): |
| 222 | return self.isValid() |
| 223 | |
| 224 | def isValid(self) -> bool: |
| 225 | """Returns true if the data log is valid (e.g. has a valid header).""" |
| 226 | return ( |
| 227 | len(self.buf) >= 12 |
| 228 | and self.buf[:6] == b"WPILOG" |
| 229 | and self.getVersion() >= 0x0100 |
| 230 | ) |
| 231 | |
| 232 | def getVersion(self) -> int: |
| 233 | """Gets the data log version. Returns 0 if data log is invalid. |
| 234 | |
| 235 | @return Version number; most significant byte is major, least significant is |
| 236 | minor (so version 1.0 will be 0x0100)""" |
| 237 | if len(self.buf) < 12: |
| 238 | return 0 |
| 239 | return int.from_bytes(self.buf[6:8], byteorder="little", signed=False) |
| 240 | |
| 241 | def getExtraHeader(self) -> str: |
| 242 | """Gets the extra header data. |
| 243 | |
| 244 | @return Extra header data |
| 245 | """ |
| 246 | if len(self.buf) < 12: |
| 247 | return "" |
| 248 | size = int.from_bytes(self.buf[8:12], byteorder="little", signed=False) |
| 249 | return str(self.buf[12 : 12 + size], encoding="utf-8") |
| 250 | |
| 251 | def __iter__(self) -> DataLogIterator: |
| 252 | extraHeaderSize = int.from_bytes( |
| 253 | self.buf[8:12], byteorder="little", signed=False |
| 254 | ) |
| 255 | return DataLogIterator(self.buf, 12 + extraHeaderSize) |
| 256 | |
| 257 | |
| 258 | if __name__ == "__main__": |
| 259 | from datetime import datetime |
| 260 | import mmap |
| 261 | import sys |
| 262 | |
| 263 | if len(sys.argv) != 2: |
| 264 | print("Usage: datalog.py <file>", file=sys.stderr) |
| 265 | sys.exit(1) |
| 266 | |
| 267 | with open(sys.argv[1], "r") as f: |
| 268 | mm = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) |
| 269 | reader = DataLogReader(mm) |
| 270 | if not reader: |
| 271 | print("not a log file", file=sys.stderr) |
| 272 | sys.exit(1) |
| 273 | |
| 274 | entries = {} |
| 275 | for record in reader: |
| 276 | timestamp = record.timestamp / 1000000 |
| 277 | if record.isStart(): |
| 278 | try: |
| 279 | data = record.getStartData() |
| 280 | print( |
| 281 | f"Start({data.entry}, name='{data.name}', type='{data.type}', metadata='{data.metadata}') [{timestamp}]" |
| 282 | ) |
| 283 | if data.entry in entries: |
| 284 | print("...DUPLICATE entry ID, overriding") |
| 285 | entries[data.entry] = data |
| 286 | except TypeError as e: |
| 287 | print("Start(INVALID)") |
| 288 | elif record.isFinish(): |
| 289 | try: |
| 290 | entry = record.getFinishEntry() |
| 291 | print(f"Finish({entry}) [{timestamp}]") |
| 292 | if entry not in entries: |
| 293 | print("...ID not found") |
| 294 | else: |
| 295 | del entries[entry] |
| 296 | except TypeError as e: |
| 297 | print("Finish(INVALID)") |
| 298 | elif record.isSetMetadata(): |
| 299 | try: |
| 300 | data = record.getSetMetadataData() |
| 301 | print(f"SetMetadata({data.entry}, '{data.metadata}') [{timestamp}]") |
| 302 | if data.entry not in entries: |
| 303 | print("...ID not found") |
| 304 | except TypeError as e: |
| 305 | print("SetMetadata(INVALID)") |
| 306 | elif record.isControl(): |
| 307 | print("Unrecognized control record") |
| 308 | else: |
| 309 | print(f"Data({record.entry}, size={len(record.data)}) ", end="") |
| 310 | entry = entries.get(record.entry) |
| 311 | if entry is None: |
| 312 | print("<ID not found>") |
| 313 | continue |
| 314 | print(f"<name='{entry.name}', type='{entry.type}'> [{timestamp}]") |
| 315 | |
| 316 | try: |
| 317 | # handle systemTime specially |
| 318 | if entry.name == "systemTime" and entry.type == "int64": |
| 319 | dt = datetime.fromtimestamp(record.getInteger() / 1000000) |
| 320 | print(" {:%Y-%m-%d %H:%M:%S.%f}".format(dt)) |
| 321 | continue |
| 322 | |
| 323 | if entry.type == "double": |
| 324 | print(f" {record.getDouble()}") |
| 325 | elif entry.type == "int64": |
| 326 | print(f" {record.getInteger()}") |
| 327 | elif entry.type in ("string", "json"): |
| 328 | print(f" '{record.getString()}'") |
| 329 | elif entry.type == "boolean": |
| 330 | print(f" {record.getBoolean()}") |
| 331 | elif entry.type == "boolean[]": |
| 332 | arr = record.getBooleanArray() |
| 333 | print(f" {arr}") |
| 334 | elif entry.type == "double[]": |
| 335 | arr = record.getDoubleArray() |
| 336 | print(f" {arr}") |
| 337 | elif entry.type == "float[]": |
| 338 | arr = record.getFloatArray() |
| 339 | print(f" {arr}") |
| 340 | elif entry.type == "int64[]": |
| 341 | arr = record.getIntegerArray() |
| 342 | print(f" {arr}") |
| 343 | elif entry.type == "string[]": |
| 344 | arr = record.getStringArray() |
| 345 | print(f" {arr}") |
| 346 | except TypeError as e: |
| 347 | print(" invalid") |