| #! /usr/bin/env python3 |
| # Copyright (c) FIRST and other WPILib contributors. |
| # Open Source Software; you can modify and/or share it under the terms of |
| # the WPILib BSD license file in the root directory of this project. |
| |
| import array |
| import struct |
| from typing import List, SupportsBytes |
| |
| __all__ = ["StartRecordData", "MetadataRecordData", "DataLogRecord", "DataLogReader"] |
| |
| floatStruct = struct.Struct("<f") |
| doubleStruct = struct.Struct("<d") |
| |
| kControlStart = 0 |
| kControlFinish = 1 |
| kControlSetMetadata = 2 |
| |
| |
| class StartRecordData: |
| """Data contained in a start control record as created by DataLog.start() when |
| writing the log. This can be read by calling DataLogRecord.getStartData(). |
| |
| entry: Entry ID; this will be used for this entry in future records. |
| name: Entry name. |
| type: Type of the stored data for this entry, as a string, e.g. "double". |
| metadata: Initial metadata. |
| """ |
| |
| def __init__(self, entry: int, name: str, type: str, metadata: str): |
| self.entry = entry |
| self.name = name |
| self.type = type |
| self.metadata = metadata |
| |
| |
| class MetadataRecordData: |
| """Data contained in a set metadata control record as created by |
| DataLog.setMetadata(). This can be read by calling |
| DataLogRecord.getSetMetadataData(). |
| |
| entry: Entry ID. |
| metadata: New metadata for the entry. |
| """ |
| |
| def __init__(self, entry: int, metadata: str): |
| self.entry = entry |
| self.metadata = metadata |
| |
| |
| class DataLogRecord: |
| """A record in the data log. May represent either a control record |
| (entry == 0) or a data record.""" |
| |
| def __init__(self, entry: int, timestamp: int, data: SupportsBytes): |
| self.entry = entry |
| self.timestamp = timestamp |
| self.data = data |
| |
| def isControl(self) -> bool: |
| return self.entry == 0 |
| |
| def _getControlType(self) -> int: |
| return self.data[0] |
| |
| def isStart(self) -> bool: |
| return ( |
| self.entry == 0 |
| and len(self.data) >= 17 |
| and self._getControlType() == kControlStart |
| ) |
| |
| def isFinish(self) -> bool: |
| return ( |
| self.entry == 0 |
| and len(self.data) == 5 |
| and self._getControlType() == kControlFinish |
| ) |
| |
| def isSetMetadata(self) -> bool: |
| return ( |
| self.entry == 0 |
| and len(self.data) >= 9 |
| and self._getControlType() == kControlSetMetadata |
| ) |
| |
| def getStartData(self) -> StartRecordData: |
| if not self.isStart(): |
| raise TypeError("not a start record") |
| entry = int.from_bytes(self.data[1:5], byteorder="little", signed=False) |
| name, pos = self._readInnerString(5) |
| type, pos = self._readInnerString(pos) |
| metadata = self._readInnerString(pos)[0] |
| return StartRecordData(entry, name, type, metadata) |
| |
| def getFinishEntry(self) -> int: |
| if not self.isFinish(): |
| raise TypeError("not a finish record") |
| return int.from_bytes(self.data[1:5], byteorder="little", signed=False) |
| |
| def getSetMetadataData(self) -> MetadataRecordData: |
| if not self.isSetMetadata(): |
| raise TypeError("not a finish record") |
| entry = int.from_bytes(self.data[1:5], byteorder="little", signed=False) |
| metadata = self._readInnerString(5)[0] |
| return MetadataRecordData(entry, metadata) |
| |
| def getBoolean(self) -> bool: |
| if len(self.data) != 1: |
| raise TypeError("not a boolean") |
| return self.data[0] != 0 |
| |
| def getInteger(self) -> int: |
| if len(self.data) != 8: |
| raise TypeError("not an integer") |
| return int.from_bytes(self.data, byteorder="little", signed=True) |
| |
| def getFloat(self) -> float: |
| if len(self.data) != 4: |
| raise TypeError("not a float") |
| return floatStruct.unpack(self.data)[0] |
| |
| def getDouble(self) -> float: |
| if len(self.data) != 8: |
| raise TypeError("not a double") |
| return doubleStruct.unpack(self.data)[0] |
| |
| def getString(self) -> str: |
| return str(self.data, encoding="utf-8") |
| |
| def getBooleanArray(self) -> List[bool]: |
| return [x != 0 for x in self.data] |
| |
| def getIntegerArray(self) -> array.array: |
| if (len(self.data) % 8) != 0: |
| raise TypeError("not an integer array") |
| arr = array.array("l") |
| arr.frombytes(self.data) |
| return arr |
| |
| def getFloatArray(self) -> array.array: |
| if (len(self.data) % 4) != 0: |
| raise TypeError("not a float array") |
| arr = array.array("f") |
| arr.frombytes(self.data) |
| return arr |
| |
| def getDoubleArray(self) -> array.array: |
| if (len(self.data) % 8) != 0: |
| raise TypeError("not a double array") |
| arr = array.array("d") |
| arr.frombytes(self.data) |
| return arr |
| |
| def getStringArray(self) -> List[str]: |
| size = int.from_bytes(self.data[:4], byteorder="little", signed=False) |
| if size > ((len(self.data) - 4) / 4): |
| raise TypeError("not a string array") |
| arr = [] |
| pos = 4 |
| for _ in range(size): |
| val, pos = self._readInnerString(pos) |
| arr.append(val) |
| return arr |
| |
| def _readInnerString(self, pos: int) -> tuple[str, int]: |
| size = int.from_bytes( |
| self.data[pos : pos + 4], byteorder="little", signed=False |
| ) |
| end = pos + 4 + size |
| if end > len(self.data): |
| raise TypeError("invalid string size") |
| return str(self.data[pos + 4 : end], encoding="utf-8"), end |
| |
| |
| class DataLogIterator: |
| """DataLogReader iterator.""" |
| |
| def __init__(self, buf: SupportsBytes, pos: int): |
| self.buf = buf |
| self.pos = pos |
| |
| def __iter__(self): |
| return self |
| |
| def _readVarInt(self, pos: int, len: int) -> int: |
| val = 0 |
| for i in range(len): |
| val |= self.buf[pos + i] << (i * 8) |
| return val |
| |
| def __next__(self) -> DataLogRecord: |
| if len(self.buf) < (self.pos + 4): |
| raise StopIteration |
| entryLen = (self.buf[self.pos] & 0x3) + 1 |
| sizeLen = ((self.buf[self.pos] >> 2) & 0x3) + 1 |
| timestampLen = ((self.buf[self.pos] >> 4) & 0x7) + 1 |
| headerLen = 1 + entryLen + sizeLen + timestampLen |
| if len(self.buf) < (self.pos + headerLen): |
| raise StopIteration |
| entry = self._readVarInt(self.pos + 1, entryLen) |
| size = self._readVarInt(self.pos + 1 + entryLen, sizeLen) |
| timestamp = self._readVarInt(self.pos + 1 + entryLen + sizeLen, timestampLen) |
| if len(self.buf) < (self.pos + headerLen + size): |
| raise StopIteration |
| record = DataLogRecord( |
| entry, |
| timestamp, |
| self.buf[self.pos + headerLen : self.pos + headerLen + size], |
| ) |
| self.pos += headerLen + size |
| return record |
| |
| |
| class DataLogReader: |
| """Data log reader (reads logs written by the DataLog class).""" |
| |
| def __init__(self, buf: SupportsBytes): |
| self.buf = buf |
| |
| def __bool__(self): |
| return self.isValid() |
| |
| def isValid(self) -> bool: |
| """Returns true if the data log is valid (e.g. has a valid header).""" |
| return ( |
| len(self.buf) >= 12 |
| and self.buf[:6] == b"WPILOG" |
| and self.getVersion() >= 0x0100 |
| ) |
| |
| def getVersion(self) -> int: |
| """Gets the data log version. Returns 0 if data log is invalid. |
| |
| @return Version number; most significant byte is major, least significant is |
| minor (so version 1.0 will be 0x0100)""" |
| if len(self.buf) < 12: |
| return 0 |
| return int.from_bytes(self.buf[6:8], byteorder="little", signed=False) |
| |
| def getExtraHeader(self) -> str: |
| """Gets the extra header data. |
| |
| @return Extra header data |
| """ |
| if len(self.buf) < 12: |
| return "" |
| size = int.from_bytes(self.buf[8:12], byteorder="little", signed=False) |
| return str(self.buf[12 : 12 + size], encoding="utf-8") |
| |
| def __iter__(self) -> DataLogIterator: |
| extraHeaderSize = int.from_bytes( |
| self.buf[8:12], byteorder="little", signed=False |
| ) |
| return DataLogIterator(self.buf, 12 + extraHeaderSize) |
| |
| |
| if __name__ == "__main__": |
| from datetime import datetime |
| import mmap |
| import sys |
| |
| if len(sys.argv) != 2: |
| print("Usage: datalog.py <file>", file=sys.stderr) |
| sys.exit(1) |
| |
| with open(sys.argv[1], "r") as f: |
| mm = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) |
| reader = DataLogReader(mm) |
| if not reader: |
| print("not a log file", file=sys.stderr) |
| sys.exit(1) |
| |
| entries = {} |
| for record in reader: |
| timestamp = record.timestamp / 1000000 |
| if record.isStart(): |
| try: |
| data = record.getStartData() |
| print( |
| f"Start({data.entry}, name='{data.name}', type='{data.type}', metadata='{data.metadata}') [{timestamp}]" |
| ) |
| if data.entry in entries: |
| print("...DUPLICATE entry ID, overriding") |
| entries[data.entry] = data |
| except TypeError as e: |
| print("Start(INVALID)") |
| elif record.isFinish(): |
| try: |
| entry = record.getFinishEntry() |
| print(f"Finish({entry}) [{timestamp}]") |
| if entry not in entries: |
| print("...ID not found") |
| else: |
| del entries[entry] |
| except TypeError as e: |
| print("Finish(INVALID)") |
| elif record.isSetMetadata(): |
| try: |
| data = record.getSetMetadataData() |
| print(f"SetMetadata({data.entry}, '{data.metadata}') [{timestamp}]") |
| if data.entry not in entries: |
| print("...ID not found") |
| except TypeError as e: |
| print("SetMetadata(INVALID)") |
| elif record.isControl(): |
| print("Unrecognized control record") |
| else: |
| print(f"Data({record.entry}, size={len(record.data)}) ", end="") |
| entry = entries.get(record.entry) |
| if entry is None: |
| print("<ID not found>") |
| continue |
| print(f"<name='{entry.name}', type='{entry.type}'> [{timestamp}]") |
| |
| try: |
| # handle systemTime specially |
| if entry.name == "systemTime" and entry.type == "int64": |
| dt = datetime.fromtimestamp(record.getInteger() / 1000000) |
| print(" {:%Y-%m-%d %H:%M:%S.%f}".format(dt)) |
| continue |
| |
| if entry.type == "double": |
| print(f" {record.getDouble()}") |
| elif entry.type == "int64": |
| print(f" {record.getInteger()}") |
| elif entry.type in ("string", "json"): |
| print(f" '{record.getString()}'") |
| elif entry.type == "boolean": |
| print(f" {record.getBoolean()}") |
| elif entry.type == "boolean[]": |
| arr = record.getBooleanArray() |
| print(f" {arr}") |
| elif entry.type == "double[]": |
| arr = record.getDoubleArray() |
| print(f" {arr}") |
| elif entry.type == "float[]": |
| arr = record.getFloatArray() |
| print(f" {arr}") |
| elif entry.type == "int64[]": |
| arr = record.getIntegerArray() |
| print(f" {arr}") |
| elif entry.type == "string[]": |
| arr = record.getStringArray() |
| print(f" {arr}") |
| except TypeError as e: |
| print(" invalid") |