Compare commits

...

5 Commits

Author SHA1 Message Date
d2ffd10191
Implement read for Decimal 2025-04-21 18:14:44 +10:00
0d25c9b6cc
Implement read for ListOfInt 2025-04-21 18:14:32 +10:00
5801539bb0
Quality of life improvements
Add helper functions to directly read/write bytes
2025-04-21 18:14:15 +10:00
98bcd88cb4
Implement write for Boolean, ByteArray and String 2025-04-21 17:35:27 +10:00
1f3cafd8b3
Implement read for Int32 and StringDictionaryKey 2025-04-21 17:34:50 +10:00
4 changed files with 123 additions and 14 deletions

View File

@ -14,20 +14,32 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from decimal import Decimal
import io
import struct
from typing import List
from .decimal import int32_to_decimal
from .known_types import CslaKnownTypes
from .serialization_info import ChildData, FieldData, SerializationInfo
class CslaBinaryReader:
"""Reads binary-serialised CSLA data into SerializationInfo objects"""
@classmethod
def read_from_bytes(cls, data: bytes) -> List[SerializationInfo]:
stream = io.BytesIO(data)
reader = cls(stream)
return reader.read()
# --------------
# Implementation
def __init__(self, stream: io.BufferedIOBase):
self.stream = stream
self.keywords_dictionary = {}
def read(self):
def read(self) -> List[SerializationInfo]:
# CslaBinaryReader.Read
total_count = self.read_int32()
@ -102,6 +114,14 @@ class CslaBinaryReader:
shift += 7
def read_decimal(self):
length = self.read_int32()
if length != 4:
raise ValueError('Unexpected length of Decimal, expected 4, got {}'.format(length))
decimal_parts = [self.read_int32() for _ in range(length)]
return int32_to_decimal(decimal_parts)
def read_int32(self):
# BinaryReader.ReadInt32
return struct.unpack('<i', self.stream.read(4))[0]
@ -130,7 +150,7 @@ class CslaBinaryReader:
raise NotImplementedError()
if known_type == CslaKnownTypes.Int32.value:
raise NotImplementedError()
return self.read_int32()
if known_type == CslaKnownTypes.UInt32.value:
raise NotImplementedError()
@ -148,7 +168,7 @@ class CslaBinaryReader:
raise NotImplementedError()
if known_type == CslaKnownTypes.Decimal.value:
raise NotImplementedError()
return self.read_decimal()
if known_type == CslaKnownTypes.DateTime.value:
raise NotImplementedError()
@ -173,7 +193,8 @@ class CslaBinaryReader:
raise NotImplementedError()
if known_type == CslaKnownTypes.ListOfInt.value:
raise NotImplementedError()
length = self.read_int32()
return [self.read_int32() for _ in range(length)]
if known_type == CslaKnownTypes.Null.value:
raise NotImplementedError()
@ -185,7 +206,8 @@ class CslaBinaryReader:
return system_string
if known_type == CslaKnownTypes.StringDictionaryKey.value:
raise NotImplementedError()
dictionary_key = self.read_int32()
return self.keywords_dictionary[dictionary_key]
raise ValueError('Unexpected object tag {}'.format(known_type))

View File

@ -24,6 +24,16 @@ from .serialization_info import SerializationInfo
class CslaBinaryWriter:
"""Writes SerializationInfo objects into binary-serialised CSLA data"""
@classmethod
def write_to_bytes(cls, serialisation_infos: List[SerializationInfo]) -> bytes:
stream = io.BytesIO()
writer = cls(stream)
writer.write(serialisation_infos)
return stream.getvalue()
# --------------
# Implementation
def __init__(self, stream: io.BufferedIOBase):
self.stream = stream
self.keywords_dictionary = []
@ -82,17 +92,37 @@ class CslaBinaryWriter:
def write_object(self, value):
# CslaBinaryWriter.Write(...)
raise NotImplementedError('Writing objects of dynamic type is not yet implemented')
if isinstance(value, bool):
return self.write_object_bool(value)
if isinstance(value, bytes):
return self.write_object_bytearray(value)
if isinstance(value, str):
return self.write_object_string(value)
raise NotImplementedError('CslaBinaryWriter.Write not implemented for type {}'.format(type(value).__name__))
def write_object_bool(self, value):
# CslaBinaryWriter.Write(bool)
self.stream.write(bytes([CslaKnownTypes.Boolean.value, 1 if value else 0]))
def write_object_bytearray(self, value):
# CslaBinaryWriter.Write(byte[])
self.stream.write(bytes([CslaKnownTypes.ByteArray.value]))
self.write_int32(len(value))
self.stream.write(value)
def write_object_int32(self, value):
# CslaBinaryWriter.Write(int)
self.stream.write(bytes([CslaKnownTypes.Int32.value]))
self.write_int32(value)
def write_object_string(self, value):
# CslaBinaryWriter.Write(string)
self.stream.write(bytes([CslaKnownTypes.String.value]))
self.write_string(value)
def write_object_system_string(self, value):
# CslaBinaryWriter.WriteSystemString
if value in self.keywords_dictionary:

57
csla_binary/decimal.py Normal file
View File

@ -0,0 +1,57 @@
# pycsla-binary: Python implementation of CSLA .NET binary serialisation
# Copyright (C) 2025 Lee Yingtong Li (RunasSudo)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from decimal import Decimal, getcontext
from typing import List
def int32_to_decimal(decimal_parts: List[int]) -> Decimal:
# System.Decimal(Int32[])
# The binary representation of a Decimal number consists of a 1-bit sign, a 96-bit integer number, and a scaling factor used to divide the integer number and specify what portion of it is a decimal fraction. The scaling factor is implicitly the number 10, raised to an exponent ranging from 0 to 28.
# bits is a four-element long array of 32-bit signed integers.
# bits [0], bits [1], and bits [2] contain the low, middle, and high 32 bits of the 96-bit integer number.
# bits [3] contains the scale factor and sign, and consists of following parts:
# - Bits 0 to 15, the lower word, are unused and must be zero.
# - Bits 16 to 23 must contain an exponent between 0 and 28, which indicates the power of 10 to divide the integer number.
# - Bits 24 to 30 are unused and must be zero.
# - Bit 31 contains the sign; 0 meaning positive, and 1 meaning negative.
getcontext().prec = 29
if decimal_parts[3] & 0b1111111111111111 != 0:
raise ValueError('Invalid Decimal: Nonzero unused bits')
if decimal_parts[3] & 0b1111111000000000000000000000000 != 0:
raise ValueError('Invalid Decimal: Nonzero unused bits')
mantissa = (decimal_parts[2] << 64) | (decimal_parts[1] << 32) | decimal_parts[0]
exponent = (decimal_parts[3] >> 16) & 0b11111111
sign = (decimal_parts[3] >> 31) & 0b1
if exponent < 0 or exponent > 28:
raise ValueError('Invalid Decimal: Invalid exponent')
return Decimal(mantissa * (-1 if sign == 1 else 1)) / (Decimal(10) ** exponent)
def test_int32_to_decimal():
# https://learn.microsoft.com/en-us/dotnet/api/system.decimal.-ctor?view=net-9.0#system-decimal-ctor(system-int32())
assert int32_to_decimal([0x0, 0x0, 0x0, 0x0]) == Decimal('0')
assert int32_to_decimal([0x3B9ACA00, 0x0, 0x0, 0x0]) == Decimal('1000000000')
assert int32_to_decimal([0x0, 0x3B9ACA00, 0x0, 0x0]) == Decimal('4294967296000000000')
assert int32_to_decimal([0x0, 0x0, 0x3B9ACA00, 0x0]) == Decimal('18446744073709551616000000000')
assert int32_to_decimal([0xFFFFFFFF, 0xFFFFFFFF, 0xFFFFFFFF, 0x0]) == Decimal('79228162514264337593543950335')
assert int32_to_decimal([0xFFFFFFFF, 0xFFFFFFFF, 0xFFFFFFFF, 0x80000000]) == Decimal('-79228162514264337593543950335')
assert int32_to_decimal([0xFFFFFFFF, 0x0, 0x0, 0x100000]) == Decimal('0.0000004294967295')
assert int32_to_decimal([0xFFFFFFFF, 0x0, 0x0, 0x1C0000]) == Decimal('0.0000000000000000004294967295')
assert int32_to_decimal([0xF0000, 0xF0000, 0xF0000, 0xF0000]) == Decimal('18133887298.441562272235520')

View File

@ -20,23 +20,23 @@ from typing import Any, List
@dataclass
class ChildData:
# SerializationInfo.ChildData
name: str
is_dirty: bool
reference_id: int
name: str = ''
is_dirty: bool = False
reference_id: int = -1
@dataclass
class FieldData:
# SerializationInfo.FieldData
name: str
enum_type_name: str
is_dirty: bool
value: Any
name: str = ''
enum_type_name: str = None
is_dirty: bool = False
value: Any = None
@dataclass
class SerializationInfo:
"""Format-agnostic representation of serialised CSLA object"""
reference_id: int = 0
reference_id: int = -1
type_name: str = ''
children: List[ChildData] = field(default_factory=list)
values: List[FieldData] = field(default_factory=list)