Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/bydantic/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
dynamic_field,
mapped_field,
Bitfield,
BitfieldConfig,
ValueMapper,
Scale,
IntScale,
Expand All @@ -43,6 +44,7 @@
"lit_str_field",
"dynamic_field",
"Bitfield",
"BitfieldConfig",
"ValueMapper",
"Scale",
"IntScale",
Expand Down
27 changes: 26 additions & 1 deletion src/bydantic/core.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from __future__ import annotations

from dataclasses import dataclass, field as dataclass_field

from typing_extensions import dataclass_transform, TypeVar as TypeVarDefault, Self
import typing as t
import inspect
Expand Down Expand Up @@ -1116,6 +1118,20 @@ class Foo(bd.Bitfield):
ContextT = TypeVarDefault("ContextT", default=None)


@dataclass()
class BitfieldConfig:
"""
A configuration object for the bitfield. This can be used to
configure the bitfield's behavior, such as whether to reorder
bits when serializing and deserializing.
"""

reorder_bits: t.Sequence[int] = dataclass_field(default_factory=list)
"""
A list of bit positions to reorder when serializing and deserializing.
"""


@dataclass_transform(
kw_only_default=True,
field_specifiers=(
Expand Down Expand Up @@ -1146,6 +1162,13 @@ class Bitfield(t.Generic[ContextT]):

__BYDANTIC_CONTEXT_STR__: t.ClassVar[str] = "ctx"

bitfield_config: t.ClassVar[BitfieldConfig] = BitfieldConfig()
"""
A configuration object for the bitfield. This can be used to
configure the bitfield's behavior, such as whether to reorder
bits when serializing and deserializing.
"""

ctx: ContextT | None = None
"""
A context object that can be referenced by dynamic fields while
Expand Down Expand Up @@ -1389,6 +1412,8 @@ def __bydantic_read_stream__(
):
proxy: AttrProxy = AttrProxy({cls.__BYDANTIC_CONTEXT_STR__: ctx})

stream = stream.reorder(cls.bitfield_config.reorder_bits)

for name, field in cls.__bydantic_fields__.items():
try:
value, stream = _read_bftype(
Expand Down Expand Up @@ -1426,7 +1451,7 @@ def __bydantic_write_stream__(
e, self.__class__.__name__, name
) from e

return stream
return stream.unreorder(self.bitfield_config.reorder_bits)


def _read_bftype(
Expand Down
43 changes: 43 additions & 0 deletions src/bydantic/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,43 @@
import typing as t


def _make_pairs(order: t.Sequence[int], size: int):
if not all(i < size for i in order) or not all(i >= 0 for i in order):
raise ValueError(
f"some indices in the reordering are out-of-bounds"
)

order_set = frozenset(order)

if len(order_set) != len(order):
raise ValueError(
f"duplicate indices in reordering"
)

return zip(
range(size),
(*order, *(i for i in range(size) if i not in order_set))
)


def reorder_bits(data: t.Sequence[bool], order: t.Sequence[int]) -> t.Tuple[bool, ...]:
if not order:
return tuple(data)

pairs = _make_pairs(order, len(data))

return tuple(data[i] for _, i in pairs)


def unreorder_bits(data: t.Sequence[bool], order: t.Sequence[int]) -> t.Tuple[bool, ...]:
if not order:
return tuple(data)

pairs = sorted(_make_pairs(order, len(data)), key=lambda x: x[1])

return tuple(data[i] for i, _ in pairs)


def bytes_to_bits(data: t.ByteString) -> t.Tuple[bool, ...]:
return tuple(
bit for byte in data for bit in uint_to_bits(byte, 8)
Expand Down Expand Up @@ -63,6 +100,9 @@ def as_bits(self) -> t.Tuple[bool, ...]:
def as_bytes(self) -> bytes:
return bits_to_bytes(self._bits)

def unreorder(self, order: t.Sequence[int]) -> BitstreamWriter:
return BitstreamWriter(unreorder_bits(self._bits, order))


class BitstreamReader:
_bits: t.Tuple[bool, ...]
Expand Down Expand Up @@ -118,6 +158,9 @@ def as_bits(self) -> t.Tuple[bool, ...]:
def as_bytes(self) -> bytes:
return self.take_bytes(self.bytes_remaining())[0]

def reorder(self, order: t.Sequence[int]) -> BitstreamReader:
return BitstreamReader(reorder_bits(self._bits, order))


class AttrProxy(t.Mapping[str, t.Any]):
_data: t.Dict[str, t.Any]
Expand Down
30 changes: 30 additions & 0 deletions tests/test_reorder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import bydantic as bd
import typing as t
from bydantic.utils import (
reorder_bits,
unreorder_bits
)


def test_bit_reorder():
b = tuple(i == "1" for i in "101100")
order = [1, 3, 5]

assert reorder_bits(b, order) == tuple(i == "1" for i in "010110")
assert unreorder_bits(reorder_bits(b, order), order) == b


def test_basic_reorder():
class Work(bd.Bitfield):
a: int = bd.uint_field(4)
b: t.List[int] = bd.list_field(bd.uint_field(3), 4)
c: str = bd.str_field(n_bytes=3)
d: bytes = bd.bytes_field(n_bytes=4)

bitfield_config = bd.BitfieldConfig(
reorder_bits=[*range(56, 56+16)]
)

work = Work(a=1, b=[1, 2, 3, 4], c="abc", d=b"abcd")
assert work.to_bytes() == b'abcabcd\x12\x9c'
assert Work.from_bytes_exact(work.to_bytes()) == work