| # Lint as: python3 |
| # Copyright 2020 Google Inc. All rights reserved. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| """Implementation of FlexBuffers binary format. |
| |
| For more info check https://google.github.io/flatbuffers/flexbuffers.html and |
| corresponding C++ implementation at |
| https://github.com/google/flatbuffers/blob/master/include/flatbuffers/flexbuffers.h |
| """ |
| |
| # pylint: disable=invalid-name |
| # TODO(dkovalev): Add type hints everywhere, so tools like pytypes could work. |
| |
| import array |
| import contextlib |
| import enum |
| import struct |
| |
| __all__ = ('Type', 'Builder', 'GetRoot', 'Dumps', 'Loads') |
| |
| |
| class BitWidth(enum.IntEnum): |
| """Supported bit widths of value types. |
| |
| These are used in the lower 2 bits of a type field to determine the size of |
| the elements (and or size field) of the item pointed to (e.g. vector). |
| """ |
| W8 = 0 # 2^0 = 1 byte |
| W16 = 1 # 2^1 = 2 bytes |
| W32 = 2 # 2^2 = 4 bytes |
| W64 = 3 # 2^3 = 8 bytes |
| |
| @staticmethod |
| def U(value): |
| """Returns the minimum `BitWidth` to encode unsigned integer value.""" |
| assert value >= 0 |
| |
| if value < (1 << 8): |
| return BitWidth.W8 |
| elif value < (1 << 16): |
| return BitWidth.W16 |
| elif value < (1 << 32): |
| return BitWidth.W32 |
| elif value < (1 << 64): |
| return BitWidth.W64 |
| else: |
| raise ValueError('value is too big to encode: %s' % value) |
| |
| @staticmethod |
| def I(value): |
| """Returns the minimum `BitWidth` to encode signed integer value.""" |
| # -2^(n-1) <= value < 2^(n-1) |
| # -2^n <= 2 * value < 2^n |
| # 2 * value < 2^n, when value >= 0 or 2 * (-value) <= 2^n, when value < 0 |
| # 2 * value < 2^n, when value >= 0 or 2 * (-value) - 1 < 2^n, when value < 0 |
| # |
| # if value >= 0: |
| # return BitWidth.U(2 * value) |
| # else: |
| # return BitWidth.U(2 * (-value) - 1) # ~x = -x - 1 |
| value *= 2 |
| return BitWidth.U(value if value >= 0 else ~value) |
| |
| @staticmethod |
| def F(value): |
| """Returns the `BitWidth` to encode floating point value.""" |
| if struct.unpack('f', struct.pack('f', value))[0] == value: |
| return BitWidth.W32 |
| return BitWidth.W64 |
| |
| @staticmethod |
| def B(byte_width): |
| return { |
| 1: BitWidth.W8, |
| 2: BitWidth.W16, |
| 4: BitWidth.W32, |
| 8: BitWidth.W64 |
| }[byte_width] |
| |
| |
| I = {1: 'b', 2: 'h', 4: 'i', 8: 'q'} # Integer formats |
| U = {1: 'B', 2: 'H', 4: 'I', 8: 'Q'} # Unsigned integer formats |
| F = {4: 'f', 8: 'd'} # Floating point formats |
| |
| |
| def _Unpack(fmt, buf): |
| return struct.unpack(fmt[len(buf)], buf)[0] |
| |
| |
| def _UnpackVector(fmt, buf, length): |
| byte_width = len(buf) // length |
| return struct.unpack('%d%s' % (length, fmt[byte_width]), buf) |
| |
| |
| def _Pack(fmt, value, byte_width): |
| return struct.pack(fmt[byte_width], value) |
| |
| |
| def _PackVector(fmt, values, byte_width): |
| return struct.pack('%d%s' % (len(values), fmt[byte_width]), *values) |
| |
| |
| def _Mutate(fmt, buf, value, byte_width, value_bit_width): |
| if (1 << value_bit_width) <= byte_width: |
| buf[:byte_width] = _Pack(fmt, value, byte_width) |
| return True |
| return False |
| |
| |
| # Computes how many bytes you'd have to pad to be able to write an |
| # "scalar_size" scalar if the buffer had grown to "buf_size", |
| # "scalar_size" is a power of two. |
| def _PaddingBytes(buf_size, scalar_size): |
| # ((buf_size + (scalar_size - 1)) // scalar_size) * scalar_size - buf_size |
| return -buf_size & (scalar_size - 1) |
| |
| |
| def _ShiftSlice(s, offset, length): |
| start = offset + (0 if s.start is None else s.start) |
| stop = offset + (length if s.stop is None else s.stop) |
| return slice(start, stop, s.step) |
| |
| |
| # https://en.cppreference.com/w/cpp/algorithm/lower_bound |
| def _LowerBound(values, value, pred): |
| """Implementation of C++ std::lower_bound() algorithm.""" |
| first, last = 0, len(values) |
| count = last - first |
| while count > 0: |
| i = first |
| step = count // 2 |
| i += step |
| if pred(values[i], value): |
| i += 1 |
| first = i |
| count -= step + 1 |
| else: |
| count = step |
| return first |
| |
| |
| # https://en.cppreference.com/w/cpp/algorithm/binary_search |
| def _BinarySearch(values, value, pred=lambda x, y: x < y): |
| """Implementation of C++ std::binary_search() algorithm.""" |
| index = _LowerBound(values, value, pred) |
| if index != len(values) and not pred(value, values[index]): |
| return index |
| return -1 |
| |
| |
| class Type(enum.IntEnum): |
| """Supported types of encoded data. |
| |
| These are used as the upper 6 bits of a type field to indicate the actual |
| type. |
| """ |
| NULL = 0 |
| INT = 1 |
| UINT = 2 |
| FLOAT = 3 |
| # Types above stored inline, types below store an offset. |
| KEY = 4 |
| STRING = 5 |
| INDIRECT_INT = 6 |
| INDIRECT_UINT = 7 |
| INDIRECT_FLOAT = 8 |
| MAP = 9 |
| VECTOR = 10 # Untyped. |
| |
| VECTOR_INT = 11 # Typed any size (stores no type table). |
| VECTOR_UINT = 12 |
| VECTOR_FLOAT = 13 |
| VECTOR_KEY = 14 |
| # DEPRECATED, use VECTOR or VECTOR_KEY instead. |
| # Read test.cpp/FlexBuffersDeprecatedTest() for details on why. |
| VECTOR_STRING_DEPRECATED = 15 |
| |
| VECTOR_INT2 = 16 # Typed tuple (no type table, no size field). |
| VECTOR_UINT2 = 17 |
| VECTOR_FLOAT2 = 18 |
| VECTOR_INT3 = 19 # Typed triple (no type table, no size field). |
| VECTOR_UINT3 = 20 |
| VECTOR_FLOAT3 = 21 |
| VECTOR_INT4 = 22 # Typed quad (no type table, no size field). |
| VECTOR_UINT4 = 23 |
| VECTOR_FLOAT4 = 24 |
| |
| BLOB = 25 |
| BOOL = 26 |
| VECTOR_BOOL = 36 # To do the same type of conversion of type to vector type |
| |
| @staticmethod |
| def Pack(type_, bit_width): |
| return (int(type_) << 2) | bit_width |
| |
| @staticmethod |
| def Unpack(packed_type): |
| return 1 << (packed_type & 0b11), Type(packed_type >> 2) |
| |
| @staticmethod |
| def IsInline(type_): |
| return type_ <= Type.FLOAT or type_ == Type.BOOL |
| |
| @staticmethod |
| def IsTypedVector(type_): |
| return Type.VECTOR_INT <= type_ <= Type.VECTOR_STRING_DEPRECATED or \ |
| type_ == Type.VECTOR_BOOL |
| |
| @staticmethod |
| def IsTypedVectorElementType(type_): |
| return Type.INT <= type_ <= Type.STRING or type_ == Type.BOOL |
| |
| @staticmethod |
| def ToTypedVectorElementType(type_): |
| if not Type.IsTypedVector(type_): |
| raise ValueError('must be typed vector type') |
| |
| return Type(type_ - Type.VECTOR_INT + Type.INT) |
| |
| @staticmethod |
| def IsFixedTypedVector(type_): |
| return Type.VECTOR_INT2 <= type_ <= Type.VECTOR_FLOAT4 |
| |
| @staticmethod |
| def IsFixedTypedVectorElementType(type_): |
| return Type.INT <= type_ <= Type.FLOAT |
| |
| @staticmethod |
| def ToFixedTypedVectorElementType(type_): |
| if not Type.IsFixedTypedVector(type_): |
| raise ValueError('must be fixed typed vector type') |
| |
| # 3 types each, starting from length 2. |
| fixed_type = type_ - Type.VECTOR_INT2 |
| return Type(fixed_type % 3 + Type.INT), fixed_type // 3 + 2 |
| |
| @staticmethod |
| def ToTypedVector(element_type, fixed_len=0): |
| """Converts element type to corresponding vector type. |
| |
| Args: |
| element_type: vector element type |
| fixed_len: number of elements: 0 for typed vector; 2, 3, or 4 for fixed |
| typed vector. |
| |
| Returns: |
| Typed vector type or fixed typed vector type. |
| """ |
| if fixed_len == 0: |
| if not Type.IsTypedVectorElementType(element_type): |
| raise ValueError('must be typed vector element type') |
| else: |
| if not Type.IsFixedTypedVectorElementType(element_type): |
| raise ValueError('must be fixed typed vector element type') |
| |
| offset = element_type - Type.INT |
| if fixed_len == 0: |
| return Type(offset + Type.VECTOR_INT) # TypedVector |
| elif fixed_len == 2: |
| return Type(offset + Type.VECTOR_INT2) # FixedTypedVector |
| elif fixed_len == 3: |
| return Type(offset + Type.VECTOR_INT3) # FixedTypedVector |
| elif fixed_len == 4: |
| return Type(offset + Type.VECTOR_INT4) # FixedTypedVector |
| else: |
| raise ValueError('unsupported fixed_len: %s' % fixed_len) |
| |
| |
| class Buf: |
| """Class to access underlying buffer object starting from the given offset.""" |
| |
| def __init__(self, buf, offset): |
| self._buf = buf |
| self._offset = offset if offset >= 0 else len(buf) + offset |
| self._length = len(buf) - self._offset |
| |
| def __getitem__(self, key): |
| if isinstance(key, slice): |
| return self._buf[_ShiftSlice(key, self._offset, self._length)] |
| elif isinstance(key, int): |
| return self._buf[self._offset + key] |
| else: |
| raise TypeError('invalid key type') |
| |
| def __setitem__(self, key, value): |
| if isinstance(key, slice): |
| self._buf[_ShiftSlice(key, self._offset, self._length)] = value |
| elif isinstance(key, int): |
| self._buf[self._offset + key] = key |
| else: |
| raise TypeError('invalid key type') |
| |
| def __repr__(self): |
| return 'buf[%d:]' % self._offset |
| |
| def Find(self, sub): |
| """Returns the lowest index where the sub subsequence is found.""" |
| return self._buf[self._offset:].find(sub) |
| |
| def Slice(self, offset): |
| """Returns new `Buf` which starts from the given offset.""" |
| return Buf(self._buf, self._offset + offset) |
| |
| def Indirect(self, offset, byte_width): |
| """Return new `Buf` based on the encoded offset (indirect encoding).""" |
| return self.Slice(offset - _Unpack(U, self[offset:offset + byte_width])) |
| |
| |
| class Object: |
| """Base class for all non-trivial data accessors.""" |
| __slots__ = '_buf', '_byte_width' |
| |
| def __init__(self, buf, byte_width): |
| self._buf = buf |
| self._byte_width = byte_width |
| |
| @property |
| def ByteWidth(self): |
| return self._byte_width |
| |
| |
| class Sized(Object): |
| """Base class for all data accessors which need to read encoded size.""" |
| __slots__ = '_size', |
| |
| def __init__(self, buf, byte_width, size=0): |
| super().__init__(buf, byte_width) |
| if size == 0: |
| self._size = _Unpack(U, self.SizeBytes) |
| else: |
| self._size = size |
| |
| @property |
| def SizeBytes(self): |
| return self._buf[-self._byte_width:0] |
| |
| def __len__(self): |
| return self._size |
| |
| |
| class Blob(Sized): |
| """Data accessor for the encoded blob bytes.""" |
| __slots__ = () |
| |
| @property |
| def Bytes(self): |
| return self._buf[0:len(self)] |
| |
| def __repr__(self): |
| return 'Blob(%s, size=%d)' % (self._buf, len(self)) |
| |
| |
| class String(Sized): |
| """Data accessor for the encoded string bytes.""" |
| __slots__ = () |
| |
| @property |
| def Bytes(self): |
| return self._buf[0:len(self)] |
| |
| def Mutate(self, value): |
| """Mutates underlying string bytes in place. |
| |
| Args: |
| value: New string to replace the existing one. New string must have less |
| or equal UTF-8-encoded bytes than the existing one to successfully |
| mutate underlying byte buffer. |
| |
| Returns: |
| Whether the value was mutated or not. |
| """ |
| encoded = value.encode('utf-8') |
| n = len(encoded) |
| if n <= len(self): |
| self._buf[-self._byte_width:0] = _Pack(U, n, self._byte_width) |
| self._buf[0:n] = encoded |
| self._buf[n:len(self)] = bytearray(len(self) - n) |
| return True |
| return False |
| |
| def __str__(self): |
| return self.Bytes.decode('utf-8') |
| |
| def __repr__(self): |
| return 'String(%s, size=%d)' % (self._buf, len(self)) |
| |
| |
| class Key(Object): |
| """Data accessor for the encoded key bytes.""" |
| __slots__ = () |
| |
| def __init__(self, buf, byte_width): |
| assert byte_width == 1 |
| super().__init__(buf, byte_width) |
| |
| @property |
| def Bytes(self): |
| return self._buf[0:len(self)] |
| |
| def __len__(self): |
| return self._buf.Find(0) |
| |
| def __str__(self): |
| return self.Bytes.decode('ascii') |
| |
| def __repr__(self): |
| return 'Key(%s, size=%d)' % (self._buf, len(self)) |
| |
| |
| class Vector(Sized): |
| """Data accessor for the encoded vector bytes.""" |
| __slots__ = () |
| |
| def __getitem__(self, index): |
| if index < 0 or index >= len(self): |
| raise IndexError('vector index %s is out of [0, %d) range' % \ |
| (index, len(self))) |
| |
| packed_type = self._buf[len(self) * self._byte_width + index] |
| buf = self._buf.Slice(index * self._byte_width) |
| return Ref.PackedType(buf, self._byte_width, packed_type) |
| |
| @property |
| def Value(self): |
| """Returns the underlying encoded data as a list object.""" |
| return [e.Value for e in self] |
| |
| def __repr__(self): |
| return 'Vector(%s, byte_width=%d, size=%d)' % \ |
| (self._buf, self._byte_width, self._size) |
| |
| |
| class TypedVector(Sized): |
| """Data accessor for the encoded typed vector or fixed typed vector bytes.""" |
| __slots__ = '_element_type', '_size' |
| |
| def __init__(self, buf, byte_width, element_type, size=0): |
| super().__init__(buf, byte_width, size) |
| |
| if element_type == Type.STRING: |
| # These can't be accessed as strings, since we don't know the bit-width |
| # of the size field, see the declaration of |
| # FBT_VECTOR_STRING_DEPRECATED above for details. |
| # We change the type here to be keys, which are a subtype of strings, |
| # and will ignore the size field. This will truncate strings with |
| # embedded nulls. |
| element_type = Type.KEY |
| |
| self._element_type = element_type |
| |
| @property |
| def Bytes(self): |
| return self._buf[:self._byte_width * len(self)] |
| |
| @property |
| def ElementType(self): |
| return self._element_type |
| |
| def __getitem__(self, index): |
| if index < 0 or index >= len(self): |
| raise IndexError('vector index %s is out of [0, %d) range' % \ |
| (index, len(self))) |
| |
| buf = self._buf.Slice(index * self._byte_width) |
| return Ref(buf, self._byte_width, 1, self._element_type) |
| |
| @property |
| def Value(self): |
| """Returns underlying data as list object.""" |
| if not self: |
| return [] |
| |
| if self._element_type is Type.BOOL: |
| return [bool(e) for e in _UnpackVector(U, self.Bytes, len(self))] |
| elif self._element_type is Type.INT: |
| return list(_UnpackVector(I, self.Bytes, len(self))) |
| elif self._element_type is Type.UINT: |
| return list(_UnpackVector(U, self.Bytes, len(self))) |
| elif self._element_type is Type.FLOAT: |
| return list(_UnpackVector(F, self.Bytes, len(self))) |
| elif self._element_type is Type.KEY: |
| return [e.AsKey for e in self] |
| elif self._element_type is Type.STRING: |
| return [e.AsString for e in self] |
| else: |
| raise TypeError('unsupported element_type: %s' % self._element_type) |
| |
| def __repr__(self): |
| return 'TypedVector(%s, byte_width=%d, element_type=%s, size=%d)' % \ |
| (self._buf, self._byte_width, self._element_type, self._size) |
| |
| |
| class Map(Vector): |
| """Data accessor for the encoded map bytes.""" |
| |
| @staticmethod |
| def CompareKeys(a, b): |
| if isinstance(a, Ref): |
| a = a.AsKeyBytes |
| if isinstance(b, Ref): |
| b = b.AsKeyBytes |
| return a < b |
| |
| def __getitem__(self, key): |
| if isinstance(key, int): |
| return super().__getitem__(key) |
| |
| index = _BinarySearch(self.Keys, key.encode('ascii'), self.CompareKeys) |
| if index != -1: |
| return super().__getitem__(index) |
| |
| raise KeyError(key) |
| |
| @property |
| def Keys(self): |
| byte_width = _Unpack(U, self._buf[-2 * self._byte_width:-self._byte_width]) |
| buf = self._buf.Indirect(-3 * self._byte_width, self._byte_width) |
| return TypedVector(buf, byte_width, Type.KEY) |
| |
| @property |
| def Values(self): |
| return Vector(self._buf, self._byte_width) |
| |
| @property |
| def Value(self): |
| return {k.Value: v.Value for k, v in zip(self.Keys, self.Values)} |
| |
| def __repr__(self): |
| return 'Map(%s, size=%d)' % (self._buf, len(self)) |
| |
| |
| class Ref: |
| """Data accessor for the encoded data bytes.""" |
| __slots__ = '_buf', '_parent_width', '_byte_width', '_type' |
| |
| @staticmethod |
| def PackedType(buf, parent_width, packed_type): |
| byte_width, type_ = Type.Unpack(packed_type) |
| return Ref(buf, parent_width, byte_width, type_) |
| |
| def __init__(self, buf, parent_width, byte_width, type_): |
| self._buf = buf |
| self._parent_width = parent_width |
| self._byte_width = byte_width |
| self._type = type_ |
| |
| def __repr__(self): |
| return 'Ref(%s, parent_width=%d, byte_width=%d, type_=%s)' % \ |
| (self._buf, self._parent_width, self._byte_width, self._type) |
| |
| @property |
| def _Bytes(self): |
| return self._buf[:self._parent_width] |
| |
| def _ConvertError(self, target_type): |
| raise TypeError('cannot convert %s to %s' % (self._type, target_type)) |
| |
| def _Indirect(self): |
| return self._buf.Indirect(0, self._parent_width) |
| |
| @property |
| def IsNull(self): |
| return self._type is Type.NULL |
| |
| @property |
| def IsBool(self): |
| return self._type is Type.BOOL |
| |
| @property |
| def AsBool(self): |
| if self._type is Type.BOOL: |
| return bool(_Unpack(U, self._Bytes)) |
| else: |
| return self.AsInt != 0 |
| |
| def MutateBool(self, value): |
| """Mutates underlying boolean value bytes in place. |
| |
| Args: |
| value: New boolean value. |
| |
| Returns: |
| Whether the value was mutated or not. |
| """ |
| return self.IsBool and \ |
| _Mutate(U, self._buf, value, self._parent_width, BitWidth.W8) |
| |
| @property |
| def IsNumeric(self): |
| return self.IsInt or self.IsFloat |
| |
| @property |
| def IsInt(self): |
| return self._type in (Type.INT, Type.INDIRECT_INT, Type.UINT, |
| Type.INDIRECT_UINT) |
| |
| @property |
| def AsInt(self): |
| """Returns current reference as integer value.""" |
| if self.IsNull: |
| return 0 |
| elif self.IsBool: |
| return int(self.AsBool) |
| elif self._type is Type.INT: |
| return _Unpack(I, self._Bytes) |
| elif self._type is Type.INDIRECT_INT: |
| return _Unpack(I, self._Indirect()[:self._byte_width]) |
| if self._type is Type.UINT: |
| return _Unpack(U, self._Bytes) |
| elif self._type is Type.INDIRECT_UINT: |
| return _Unpack(U, self._Indirect()[:self._byte_width]) |
| elif self.IsString: |
| return len(self.AsString) |
| elif self.IsKey: |
| return len(self.AsKey) |
| elif self.IsBlob: |
| return len(self.AsBlob) |
| elif self.IsVector: |
| return len(self.AsVector) |
| elif self.IsTypedVector: |
| return len(self.AsTypedVector) |
| elif self.IsFixedTypedVector: |
| return len(self.AsFixedTypedVector) |
| else: |
| raise self._ConvertError(Type.INT) |
| |
| def MutateInt(self, value): |
| """Mutates underlying integer value bytes in place. |
| |
| Args: |
| value: New integer value. It must fit to the byte size of the existing |
| encoded value. |
| |
| Returns: |
| Whether the value was mutated or not. |
| """ |
| if self._type is Type.INT: |
| return _Mutate(I, self._buf, value, self._parent_width, BitWidth.I(value)) |
| elif self._type is Type.INDIRECT_INT: |
| return _Mutate(I, self._Indirect(), value, self._byte_width, |
| BitWidth.I(value)) |
| elif self._type is Type.UINT: |
| return _Mutate(U, self._buf, value, self._parent_width, BitWidth.U(value)) |
| elif self._type is Type.INDIRECT_UINT: |
| return _Mutate(U, self._Indirect(), value, self._byte_width, |
| BitWidth.U(value)) |
| else: |
| return False |
| |
| @property |
| def IsFloat(self): |
| return self._type in (Type.FLOAT, Type.INDIRECT_FLOAT) |
| |
| @property |
| def AsFloat(self): |
| """Returns current reference as floating point value.""" |
| if self.IsNull: |
| return 0.0 |
| elif self.IsBool: |
| return float(self.AsBool) |
| elif self.IsInt: |
| return float(self.AsInt) |
| elif self._type is Type.FLOAT: |
| return _Unpack(F, self._Bytes) |
| elif self._type is Type.INDIRECT_FLOAT: |
| return _Unpack(F, self._Indirect()[:self._byte_width]) |
| elif self.IsString: |
| return float(self.AsString) |
| elif self.IsVector: |
| return float(len(self.AsVector)) |
| elif self.IsTypedVector(): |
| return float(len(self.AsTypedVector)) |
| elif self.IsFixedTypedVector(): |
| return float(len(self.FixedTypedVector)) |
| else: |
| raise self._ConvertError(Type.FLOAT) |
| |
| def MutateFloat(self, value): |
| """Mutates underlying floating point value bytes in place. |
| |
| Args: |
| value: New float value. It must fit to the byte size of the existing |
| encoded value. |
| |
| Returns: |
| Whether the value was mutated or not. |
| """ |
| if self._type is Type.FLOAT: |
| return _Mutate(F, self._buf, value, self._parent_width, |
| BitWidth.B(self._parent_width)) |
| elif self._type is Type.INDIRECT_FLOAT: |
| return _Mutate(F, self._Indirect(), value, self._byte_width, |
| BitWidth.B(self._byte_width)) |
| else: |
| return False |
| |
| @property |
| def IsKey(self): |
| return self._type is Type.KEY |
| |
| @property |
| def AsKeyBytes(self): |
| if self.IsKey: |
| return Key(self._Indirect(), self._byte_width).Bytes |
| else: |
| raise self._ConvertError(Type.KEY) |
| |
| @property |
| def AsKey(self): |
| if self.IsKey: |
| return str(Key(self._Indirect(), self._byte_width)) |
| else: |
| raise self._ConvertError(Type.KEY) |
| |
| @property |
| def IsString(self): |
| return self._type is Type.STRING |
| |
| @property |
| def AsString(self): |
| if self.IsString: |
| return str(String(self._Indirect(), self._byte_width)) |
| elif self.IsKey: |
| return self.AsKey |
| else: |
| raise self._ConvertError(Type.STRING) |
| |
| def MutateString(self, value): |
| return String(self._Indirect(), self._byte_width).Mutate(value) |
| |
| @property |
| def IsBlob(self): |
| return self._type is Type.BLOB |
| |
| @property |
| def AsBlob(self): |
| if self.IsBlob: |
| return Blob(self._Indirect(), self._byte_width).Bytes |
| else: |
| raise self._ConvertError(Type.BLOB) |
| |
| @property |
| def IsAnyVector(self): |
| return self.IsVector or self.IsTypedVector or self.IsFixedTypedVector() |
| |
| @property |
| def IsVector(self): |
| return self._type in (Type.VECTOR, Type.MAP) |
| |
| @property |
| def AsVector(self): |
| if self.IsVector: |
| return Vector(self._Indirect(), self._byte_width) |
| else: |
| raise self._ConvertError(Type.VECTOR) |
| |
| @property |
| def IsTypedVector(self): |
| return Type.IsTypedVector(self._type) |
| |
| @property |
| def AsTypedVector(self): |
| if self.IsTypedVector: |
| return TypedVector(self._Indirect(), self._byte_width, |
| Type.ToTypedVectorElementType(self._type)) |
| else: |
| raise self._ConvertError('TYPED_VECTOR') |
| |
| @property |
| def IsFixedTypedVector(self): |
| return Type.IsFixedTypedVector(self._type) |
| |
| @property |
| def AsFixedTypedVector(self): |
| if self.IsFixedTypedVector: |
| element_type, size = Type.ToFixedTypedVectorElementType(self._type) |
| return TypedVector(self._Indirect(), self._byte_width, element_type, size) |
| else: |
| raise self._ConvertError('FIXED_TYPED_VECTOR') |
| |
| @property |
| def IsMap(self): |
| return self._type is Type.MAP |
| |
| @property |
| def AsMap(self): |
| if self.IsMap: |
| return Map(self._Indirect(), self._byte_width) |
| else: |
| raise self._ConvertError(Type.MAP) |
| |
| @property |
| def Value(self): |
| """Converts current reference to value of corresponding type. |
| |
| This is equivalent to calling `AsInt` for integer values, `AsFloat` for |
| floating point values, etc. |
| |
| Returns: |
| Value of corresponding type. |
| """ |
| if self.IsNull: |
| return None |
| elif self.IsBool: |
| return self.AsBool |
| elif self.IsInt: |
| return self.AsInt |
| elif self.IsFloat: |
| return self.AsFloat |
| elif self.IsString: |
| return self.AsString |
| elif self.IsKey: |
| return self.AsKey |
| elif self.IsBlob: |
| return self.AsBlob |
| elif self.IsMap: |
| return self.AsMap.Value |
| elif self.IsVector: |
| return self.AsVector.Value |
| elif self.IsTypedVector: |
| return self.AsTypedVector.Value |
| elif self.IsFixedTypedVector: |
| return self.AsFixedTypedVector.Value |
| else: |
| raise TypeError('cannot convert %r to value' % self) |
| |
| |
| def _IsIterable(obj): |
| try: |
| iter(obj) |
| return True |
| except TypeError: |
| return False |
| |
| |
| class Value: |
| """Class to represent given value during the encoding process.""" |
| |
| @staticmethod |
| def Null(): |
| return Value(0, Type.NULL, BitWidth.W8) |
| |
| @staticmethod |
| def Bool(value): |
| return Value(value, Type.BOOL, BitWidth.W8) |
| |
| @staticmethod |
| def Int(value, bit_width): |
| return Value(value, Type.INT, bit_width) |
| |
| @staticmethod |
| def UInt(value, bit_width): |
| return Value(value, Type.UINT, bit_width) |
| |
| @staticmethod |
| def Float(value, bit_width): |
| return Value(value, Type.FLOAT, bit_width) |
| |
| @staticmethod |
| def Key(offset): |
| return Value(offset, Type.KEY, BitWidth.W8) |
| |
| def __init__(self, value, type_, min_bit_width): |
| self._value = value |
| self._type = type_ |
| |
| # For scalars: of itself, for vector: of its elements, for string: length. |
| self._min_bit_width = min_bit_width |
| |
| @property |
| def Value(self): |
| return self._value |
| |
| @property |
| def Type(self): |
| return self._type |
| |
| @property |
| def MinBitWidth(self): |
| return self._min_bit_width |
| |
| def StoredPackedType(self, parent_bit_width=BitWidth.W8): |
| return Type.Pack(self._type, self.StoredWidth(parent_bit_width)) |
| |
| # We have an absolute offset, but want to store a relative offset |
| # elem_index elements beyond the current buffer end. Since whether |
| # the relative offset fits in a certain byte_width depends on |
| # the size of the elements before it (and their alignment), we have |
| # to test for each size in turn. |
| def ElemWidth(self, buf_size, elem_index=0): |
| if Type.IsInline(self._type): |
| return self._min_bit_width |
| for byte_width in 1, 2, 4, 8: |
| offset_loc = buf_size + _PaddingBytes(buf_size, byte_width) + \ |
| elem_index * byte_width |
| bit_width = BitWidth.U(offset_loc - self._value) |
| if byte_width == (1 << bit_width): |
| return bit_width |
| raise ValueError('relative offset is too big') |
| |
| def StoredWidth(self, parent_bit_width=BitWidth.W8): |
| if Type.IsInline(self._type): |
| return max(self._min_bit_width, parent_bit_width) |
| return self._min_bit_width |
| |
| def __repr__(self): |
| return 'Value(%s, %s, %s)' % (self._value, self._type, self._min_bit_width) |
| |
| def __str__(self): |
| return str(self._value) |
| |
| |
| def InMap(func): |
| def wrapper(self, *args, **kwargs): |
| if isinstance(args[0], str): |
| self.Key(args[0]) |
| func(self, *args[1:], **kwargs) |
| else: |
| func(self, *args, **kwargs) |
| return wrapper |
| |
| |
| def InMapForString(func): |
| def wrapper(self, *args): |
| if len(args) == 1: |
| func(self, args[0]) |
| elif len(args) == 2: |
| self.Key(args[0]) |
| func(self, args[1]) |
| else: |
| raise ValueError('invalid number of arguments') |
| return wrapper |
| |
| |
| class Pool: |
| """Collection of (data, offset) pairs sorted by data for quick access.""" |
| |
| def __init__(self): |
| self._pool = [] # sorted list of (data, offset) tuples |
| |
| def FindOrInsert(self, data, offset): |
| do = data, offset |
| index = _BinarySearch(self._pool, do, lambda a, b: a[0] < b[0]) |
| if index != -1: |
| _, offset = self._pool[index] |
| return offset |
| self._pool.insert(index, do) |
| return None |
| |
| def Clear(self): |
| self._pool = [] |
| |
| @property |
| def Elements(self): |
| return [data for data, _ in self._pool] |
| |
| |
| class Builder: |
| """Helper class to encode structural data into flexbuffers format.""" |
| |
| def __init__(self, |
| share_strings=False, |
| share_keys=True, |
| force_min_bit_width=BitWidth.W8): |
| self._share_strings = share_strings |
| self._share_keys = share_keys |
| self._force_min_bit_width = force_min_bit_width |
| |
| self._string_pool = Pool() |
| self._key_pool = Pool() |
| |
| self._finished = False |
| self._buf = bytearray() |
| self._stack = [] |
| |
| def __len__(self): |
| return len(self._buf) |
| |
| @property |
| def StringPool(self): |
| return self._string_pool |
| |
| @property |
| def KeyPool(self): |
| return self._key_pool |
| |
| def Clear(self): |
| self._string_pool.Clear() |
| self._key_pool.Clear() |
| self._finished = False |
| self._buf = bytearray() |
| self._stack = [] |
| |
| def Finish(self): |
| """Finishes encoding process and returns underlying buffer.""" |
| if self._finished: |
| raise RuntimeError('builder has been already finished') |
| |
| # If you hit this exception, you likely have objects that were never |
| # included in a parent. You need to have exactly one root to finish a |
| # buffer. Check your Start/End calls are matched, and all objects are inside |
| # some other object. |
| if len(self._stack) != 1: |
| raise RuntimeError('internal stack size must be one') |
| |
| value = self._stack[0] |
| byte_width = self._Align(value.ElemWidth(len(self._buf))) |
| self._WriteAny(value, byte_width=byte_width) # Root value |
| self._Write(U, value.StoredPackedType(), byte_width=1) # Root type |
| self._Write(U, byte_width, byte_width=1) # Root size |
| |
| self.finished = True |
| return self._buf |
| |
| def _ReadKey(self, offset): |
| key = self._buf[offset:] |
| return key[:key.find(0)] |
| |
| def _Align(self, alignment): |
| byte_width = 1 << alignment |
| self._buf.extend(b'\x00' * _PaddingBytes(len(self._buf), byte_width)) |
| return byte_width |
| |
| def _Write(self, fmt, value, byte_width): |
| self._buf.extend(_Pack(fmt, value, byte_width)) |
| |
| def _WriteVector(self, fmt, values, byte_width): |
| self._buf.extend(_PackVector(fmt, values, byte_width)) |
| |
| def _WriteOffset(self, offset, byte_width): |
| relative_offset = len(self._buf) - offset |
| assert byte_width == 8 or relative_offset < (1 << (8 * byte_width)) |
| self._Write(U, relative_offset, byte_width) |
| |
| def _WriteAny(self, value, byte_width): |
| fmt = { |
| Type.NULL: U, Type.BOOL: U, Type.INT: I, Type.UINT: U, Type.FLOAT: F |
| }.get(value.Type) |
| if fmt: |
| self._Write(fmt, value.Value, byte_width) |
| else: |
| self._WriteOffset(value.Value, byte_width) |
| |
| def _WriteBlob(self, data, append_zero, type_): |
| bit_width = BitWidth.U(len(data)) |
| byte_width = self._Align(bit_width) |
| self._Write(U, len(data), byte_width) |
| loc = len(self._buf) |
| self._buf.extend(data) |
| if append_zero: |
| self._buf.append(0) |
| self._stack.append(Value(loc, type_, bit_width)) |
| return loc |
| |
| def _WriteScalarVector(self, element_type, byte_width, elements, fixed): |
| """Writes scalar vector elements to the underlying buffer.""" |
| bit_width = BitWidth.B(byte_width) |
| # If you get this exception, you're trying to write a vector with a size |
| # field that is bigger than the scalars you're trying to write (e.g. a |
| # byte vector > 255 elements). For such types, write a "blob" instead. |
| if BitWidth.U(len(elements)) > bit_width: |
| raise ValueError('too many elements for the given byte_width') |
| |
| self._Align(bit_width) |
| if not fixed: |
| self._Write(U, len(elements), byte_width) |
| |
| loc = len(self._buf) |
| |
| fmt = {Type.INT: I, Type.UINT: U, Type.FLOAT: F}.get(element_type) |
| if not fmt: |
| raise TypeError('unsupported element_type') |
| self._WriteVector(fmt, elements, byte_width) |
| |
| type_ = Type.ToTypedVector(element_type, len(elements) if fixed else 0) |
| self._stack.append(Value(loc, type_, bit_width)) |
| return loc |
| |
| def _CreateVector(self, elements, typed, fixed, keys=None): |
| """Writes vector elements to the underlying buffer.""" |
| length = len(elements) |
| |
| if fixed and not typed: |
| raise ValueError('fixed vector must be typed') |
| |
| # Figure out smallest bit width we can store this vector with. |
| bit_width = max(self._force_min_bit_width, BitWidth.U(length)) |
| prefix_elems = 1 # Vector size |
| if keys: |
| bit_width = max(bit_width, keys.ElemWidth(len(self._buf))) |
| prefix_elems += 2 # Offset to the keys vector and its byte width. |
| |
| vector_type = Type.KEY |
| # Check bit widths and types for all elements. |
| for i, e in enumerate(elements): |
| bit_width = max(bit_width, e.ElemWidth(len(self._buf), prefix_elems + i)) |
| |
| if typed: |
| if i == 0: |
| vector_type = e.Type |
| else: |
| if vector_type != e.Type: |
| raise RuntimeError('typed vector elements must be of the same type') |
| |
| if fixed and not Type.IsFixedTypedVectorElementType(vector_type): |
| raise RuntimeError('must be fixed typed vector element type') |
| |
| byte_width = self._Align(bit_width) |
| # Write vector. First the keys width/offset if available, and size. |
| if keys: |
| self._WriteOffset(keys.Value, byte_width) |
| self._Write(U, 1 << keys.MinBitWidth, byte_width) |
| |
| if not fixed: |
| self._Write(U, length, byte_width) |
| |
| # Then the actual data. |
| loc = len(self._buf) |
| for e in elements: |
| self._WriteAny(e, byte_width) |
| |
| # Then the types. |
| if not typed: |
| for e in elements: |
| self._buf.append(e.StoredPackedType(bit_width)) |
| |
| if keys: |
| type_ = Type.MAP |
| else: |
| if typed: |
| type_ = Type.ToTypedVector(vector_type, length if fixed else 0) |
| else: |
| type_ = Type.VECTOR |
| |
| return Value(loc, type_, bit_width) |
| |
| def _PushIndirect(self, value, type_, bit_width): |
| byte_width = self._Align(bit_width) |
| loc = len(self._buf) |
| fmt = { |
| Type.INDIRECT_INT: I, |
| Type.INDIRECT_UINT: U, |
| Type.INDIRECT_FLOAT: F |
| }[type_] |
| self._Write(fmt, value, byte_width) |
| self._stack.append(Value(loc, type_, bit_width)) |
| |
| @InMapForString |
| def String(self, value): |
| """Encodes string value.""" |
| reset_to = len(self._buf) |
| encoded = value.encode('utf-8') |
| loc = self._WriteBlob(encoded, append_zero=True, type_=Type.STRING) |
| if self._share_strings: |
| prev_loc = self._string_pool.FindOrInsert(encoded, loc) |
| if prev_loc is not None: |
| del self._buf[reset_to:] |
| self._stack[-1]._value = loc = prev_loc # pylint: disable=protected-access |
| |
| return loc |
| |
| @InMap |
| def Blob(self, value): |
| """Encodes binary blob value. |
| |
| Args: |
| value: A byte/bytearray value to encode |
| |
| Returns: |
| Offset of the encoded value in underlying the byte buffer. |
| """ |
| return self._WriteBlob(value, append_zero=False, type_=Type.BLOB) |
| |
| def Key(self, value): |
| """Encodes key value. |
| |
| Args: |
| value: A byte/bytearray/str value to encode. Byte object must not contain |
| zero bytes. String object must be convertible to ASCII. |
| |
| Returns: |
| Offset of the encoded value in the underlying byte buffer. |
| """ |
| if isinstance(value, (bytes, bytearray)): |
| encoded = value |
| else: |
| encoded = value.encode('ascii') |
| |
| if 0 in encoded: |
| raise ValueError('key contains zero byte') |
| |
| loc = len(self._buf) |
| self._buf.extend(encoded) |
| self._buf.append(0) |
| if self._share_keys: |
| prev_loc = self._key_pool.FindOrInsert(encoded, loc) |
| if prev_loc is not None: |
| del self._buf[loc:] |
| loc = prev_loc |
| |
| self._stack.append(Value.Key(loc)) |
| return loc |
| |
| def Null(self, key=None): |
| """Encodes None value.""" |
| if key: |
| self.Key(key) |
| self._stack.append(Value.Null()) |
| |
| @InMap |
| def Bool(self, value): |
| """Encodes boolean value. |
| |
| Args: |
| value: A boolean value. |
| """ |
| self._stack.append(Value.Bool(value)) |
| |
| @InMap |
| def Int(self, value, byte_width=0): |
| """Encodes signed integer value. |
| |
| Args: |
| value: A signed integer value. |
| byte_width: Number of bytes to use: 1, 2, 4, or 8. |
| """ |
| bit_width = BitWidth.I(value) if byte_width == 0 else BitWidth.B(byte_width) |
| self._stack.append(Value.Int(value, bit_width)) |
| |
| @InMap |
| def IndirectInt(self, value, byte_width=0): |
| """Encodes signed integer value indirectly. |
| |
| Args: |
| value: A signed integer value. |
| byte_width: Number of bytes to use: 1, 2, 4, or 8. |
| """ |
| bit_width = BitWidth.I(value) if byte_width == 0 else BitWidth.B(byte_width) |
| self._PushIndirect(value, Type.INDIRECT_INT, bit_width) |
| |
| @InMap |
| def UInt(self, value, byte_width=0): |
| """Encodes unsigned integer value. |
| |
| Args: |
| value: An unsigned integer value. |
| byte_width: Number of bytes to use: 1, 2, 4, or 8. |
| """ |
| bit_width = BitWidth.U(value) if byte_width == 0 else BitWidth.B(byte_width) |
| self._stack.append(Value.UInt(value, bit_width)) |
| |
| @InMap |
| def IndirectUInt(self, value, byte_width=0): |
| """Encodes unsigned integer value indirectly. |
| |
| Args: |
| value: An unsigned integer value. |
| byte_width: Number of bytes to use: 1, 2, 4, or 8. |
| """ |
| bit_width = BitWidth.U(value) if byte_width == 0 else BitWidth.B(byte_width) |
| self._PushIndirect(value, Type.INDIRECT_UINT, bit_width) |
| |
| @InMap |
| def Float(self, value, byte_width=0): |
| """Encodes floating point value. |
| |
| Args: |
| value: A floating point value. |
| byte_width: Number of bytes to use: 4 or 8. |
| """ |
| bit_width = BitWidth.F(value) if byte_width == 0 else BitWidth.B(byte_width) |
| self._stack.append(Value.Float(value, bit_width)) |
| |
| @InMap |
| def IndirectFloat(self, value, byte_width=0): |
| """Encodes floating point value indirectly. |
| |
| Args: |
| value: A floating point value. |
| byte_width: Number of bytes to use: 4 or 8. |
| """ |
| bit_width = BitWidth.F(value) if byte_width == 0 else BitWidth.B(byte_width) |
| self._PushIndirect(value, Type.INDIRECT_FLOAT, bit_width) |
| |
| def _StartVector(self): |
| """Starts vector construction.""" |
| return len(self._stack) |
| |
| def _EndVector(self, start, typed, fixed): |
| """Finishes vector construction by encodung its elements.""" |
| vec = self._CreateVector(self._stack[start:], typed, fixed) |
| del self._stack[start:] |
| self._stack.append(vec) |
| return vec.Value |
| |
| @contextlib.contextmanager |
| def Vector(self, key=None): |
| if key: |
| self.Key(key) |
| |
| try: |
| start = self._StartVector() |
| yield self |
| finally: |
| self._EndVector(start, typed=False, fixed=False) |
| |
| @InMap |
| def VectorFromElements(self, elements): |
| """Encodes sequence of any elements as a vector. |
| |
| Args: |
| elements: sequence of elements, they may have different types. |
| """ |
| with self.Vector(): |
| for e in elements: |
| self.Add(e) |
| |
| @contextlib.contextmanager |
| def TypedVector(self, key=None): |
| if key: |
| self.Key(key) |
| |
| try: |
| start = self._StartVector() |
| yield self |
| finally: |
| self._EndVector(start, typed=True, fixed=False) |
| |
| @InMap |
| def TypedVectorFromElements(self, elements, element_type=None): |
| """Encodes sequence of elements of the same type as typed vector. |
| |
| Args: |
| elements: Sequence of elements, they must be of the same type. |
| element_type: Suggested element type. Setting it to None means determining |
| correct value automatically based on the given elements. |
| """ |
| if isinstance(elements, array.array): |
| if elements.typecode == 'f': |
| self._WriteScalarVector(Type.FLOAT, 4, elements, fixed=False) |
| elif elements.typecode == 'd': |
| self._WriteScalarVector(Type.FLOAT, 8, elements, fixed=False) |
| elif elements.typecode in ('b', 'h', 'i', 'l', 'q'): |
| self._WriteScalarVector( |
| Type.INT, elements.itemsize, elements, fixed=False) |
| elif elements.typecode in ('B', 'H', 'I', 'L', 'Q'): |
| self._WriteScalarVector( |
| Type.UINT, elements.itemsize, elements, fixed=False) |
| else: |
| raise ValueError('unsupported array typecode: %s' % elements.typecode) |
| else: |
| add = self.Add if element_type is None else self.Adder(element_type) |
| with self.TypedVector(): |
| for e in elements: |
| add(e) |
| |
| @InMap |
| def FixedTypedVectorFromElements(self, |
| elements, |
| element_type=None, |
| byte_width=0): |
| """Encodes sequence of elements of the same type as fixed typed vector. |
| |
| Args: |
| elements: Sequence of elements, they must be of the same type. Allowed |
| types are `Type.INT`, `Type.UINT`, `Type.FLOAT`. Allowed number of |
| elements are 2, 3, or 4. |
| element_type: Suggested element type. Setting it to None means determining |
| correct value automatically based on the given elements. |
| byte_width: Number of bytes to use per element. For `Type.INT` and |
| `Type.UINT`: 1, 2, 4, or 8. For `Type.FLOAT`: 4 or 8. Setting it to 0 |
| means determining correct value automatically based on the given |
| elements. |
| """ |
| if not 2 <= len(elements) <= 4: |
| raise ValueError('only 2, 3, or 4 elements are supported') |
| |
| types = {type(e) for e in elements} |
| if len(types) != 1: |
| raise TypeError('all elements must be of the same type') |
| |
| type_, = types |
| |
| if element_type is None: |
| element_type = {int: Type.INT, float: Type.FLOAT}.get(type_) |
| if not element_type: |
| raise TypeError('unsupported element_type: %s' % type_) |
| |
| if byte_width == 0: |
| width = { |
| Type.UINT: BitWidth.U, |
| Type.INT: BitWidth.I, |
| Type.FLOAT: BitWidth.F |
| }[element_type] |
| byte_width = 1 << max(width(e) for e in elements) |
| |
| self._WriteScalarVector(element_type, byte_width, elements, fixed=True) |
| |
| def _StartMap(self): |
| """Starts map construction.""" |
| return len(self._stack) |
| |
| def _EndMap(self, start): |
| """Finishes map construction by encodung its elements.""" |
| # Interleaved keys and values on the stack. |
| stack = self._stack[start:] |
| |
| if len(stack) % 2 != 0: |
| raise RuntimeError('must be even number of keys and values') |
| |
| for key in stack[::2]: |
| if key.Type is not Type.KEY: |
| raise RuntimeError('all map keys must be of %s type' % Type.KEY) |
| |
| pairs = zip(stack[::2], stack[1::2]) # [(key, value), ...] |
| pairs = sorted(pairs, key=lambda pair: self._ReadKey(pair[0].Value)) |
| |
| del self._stack[start:] |
| for pair in pairs: |
| self._stack.extend(pair) |
| |
| keys = self._CreateVector(self._stack[start::2], typed=True, fixed=False) |
| values = self._CreateVector( |
| self._stack[start + 1::2], typed=False, fixed=False, keys=keys) |
| |
| del self._stack[start:] |
| self._stack.append(values) |
| return values.Value |
| |
| @contextlib.contextmanager |
| def Map(self, key=None): |
| if key: |
| self.Key(key) |
| |
| try: |
| start = self._StartMap() |
| yield self |
| finally: |
| self._EndMap(start) |
| |
| def MapFromElements(self, elements): |
| start = self._StartMap() |
| for k, v in elements.items(): |
| self.Key(k) |
| self.Add(v) |
| self._EndMap(start) |
| |
| def Adder(self, type_): |
| return { |
| Type.BOOL: self.Bool, |
| Type.INT: self.Int, |
| Type.INDIRECT_INT: self.IndirectInt, |
| Type.UINT: self.UInt, |
| Type.INDIRECT_UINT: self.IndirectUInt, |
| Type.FLOAT: self.Float, |
| Type.INDIRECT_FLOAT: self.IndirectFloat, |
| Type.KEY: self.Key, |
| Type.BLOB: self.Blob, |
| Type.STRING: self.String, |
| }[type_] |
| |
| @InMapForString |
| def Add(self, value): |
| """Encodes value of any supported type.""" |
| if value is None: |
| self.Null() |
| elif isinstance(value, bool): |
| self.Bool(value) |
| elif isinstance(value, int): |
| self.Int(value) |
| elif isinstance(value, float): |
| self.Float(value) |
| elif isinstance(value, str): |
| self.String(value) |
| elif isinstance(value, (bytes, bytearray)): |
| self.Blob(value) |
| elif isinstance(value, dict): |
| with self.Map(): |
| for k, v in value.items(): |
| self.Key(k) |
| self.Add(v) |
| elif isinstance(value, array.array): |
| self.TypedVectorFromElements(value) |
| elif _IsIterable(value): |
| self.VectorFromElements(value) |
| else: |
| raise TypeError('unsupported python type: %s' % type(value)) |
| |
| @property |
| def LastValue(self): |
| return self._stack[-1] |
| |
| @InMap |
| def ReuseValue(self, value): |
| self._stack.append(value) |
| |
| |
| def GetRoot(buf): |
| """Returns root `Ref` object for the given buffer.""" |
| if len(buf) < 3: |
| raise ValueError('buffer is too small') |
| byte_width = buf[-1] |
| return Ref.PackedType( |
| Buf(buf, -(2 + byte_width)), byte_width, packed_type=buf[-2]) |
| |
| |
| def Dumps(obj): |
| """Returns bytearray with the encoded python object.""" |
| fbb = Builder() |
| fbb.Add(obj) |
| return fbb.Finish() |
| |
| |
| def Loads(buf): |
| """Returns python object decoded from the buffer.""" |
| return GetRoot(buf).Value |