# pycsla-binary: Python implementation of CSLA .NET binary serialisation # Copyright (C) 2025 Lee Yingtong Li (RunasSudo) # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <https://www.gnu.org/licenses/>. from datetime import datetime from decimal import Decimal import io import struct from typing import List from .decimal import decimal_to_int32 from .known_types import CslaKnownTypes from .serialization_info import SerializationInfo class CslaBinaryWriter: """Writes SerializationInfo objects into binary-serialised CSLA data""" @classmethod def write_to_bytes(cls, serialisation_infos: List[SerializationInfo]) -> bytes: stream = io.BytesIO() writer = cls(stream) writer.write(serialisation_infos) return stream.getvalue() # -------------- # Implementation def __init__(self, stream: io.BufferedIOBase): self.stream = stream self.keywords_dictionary = [] def write(self, serialisation_infos: List[SerializationInfo]): # Reverse of CslaBinaryReader.Read self.write_int32(len(serialisation_infos)) for info in serialisation_infos: # Write ReferenceID self.write_int32(info.reference_id) # Write TypeName self.write_object_system_string(info.type_name) # Write children self.write_int32(len(info.children)) for child in info.children: self.write_object_system_string(child.name) self.write_object_bool(child.is_dirty) self.write_object_int32(child.reference_id) # Write field values self.write_int32(len(info.values)) for value in info.values: self.write_object_system_string(value.name) self.write_object_system_string(value.enum_type_name or '') self.write_object_bool(value.is_dirty) self.write_object(value.value) def write_7bit_encoded_int(self, value): # BinaryWriter.Write7BitEncodedInt # "The integer of the value parameter is written out seven bits at a time, starting with the seven least-significant bits. The high bit of a byte indicates whether there are more bytes to be written after this one." while True: value_7lsb = value & 0b01111111 value >>= 7 if value == 0: # Final byte self.stream.write(bytes([value_7lsb])) return else: # Further bytes remaining self.stream.write(bytes([value_7lsb | 0b10000000])) def write_int32(self, value): # BinaryWriter.WriteInt32 self.stream.write(struct.pack('<i', value)) def write_int64(self, value): # BinaryWriter.WriteInt64 self.stream.write(struct.pack('<q', value)) def write_string(self, value): # BinaryWriter.WriteString - "The string is prefixed with the length, encoded as an integer seven bits at a time." encoded_string = value.encode('utf-8') self.write_7bit_encoded_int(len(encoded_string)) self.stream.write(encoded_string) def write_object(self, value): # CslaBinaryWriter.Write(...) if value is None: return self.write_object_null() if isinstance(value, bool): return self.write_object_bool(value) if isinstance(value, int): return self.write_object_int32(value) if isinstance(value, Decimal): return self.write_object_decimal(value) if isinstance(value, datetime): return self.write_object_datetime(value) if isinstance(value, str): return self.write_object_string(value) if isinstance(value, bytes): return self.write_object_bytearray(value) raise NotImplementedError('CslaBinaryWriter.Write not implemented for type {}'.format(type(value).__name__)) def write_object_bool(self, value): # CslaBinaryWriter.Write(bool) self.stream.write(bytes([CslaKnownTypes.Boolean.value, 1 if value else 0])) def write_object_bytearray(self, value): # CslaBinaryWriter.Write(byte[]) self.stream.write(bytes([CslaKnownTypes.ByteArray.value])) self.write_int32(len(value)) self.stream.write(value) def write_object_datetime(self, value): # CslaBinaryWriter.Write(DateTime) timestamp_unix = value.timestamp() timestamp = timestamp_unix * 10000000 # Convert seconds to ticks (10^9 / 100) timestamp += 621355968000000000 # Add unix epoch in .NET ticks - https://gist.github.com/kristopherjohnson/397d0f74213a0087f1a1 timestamp = int(timestamp) # Number of 100-nanosecond intervals that have elapsed since January 1, 0001 at 00:00:00.000 self.stream.write(bytes([CslaKnownTypes.DateTime.value])) self.write_int64(timestamp) def write_object_decimal(self, value): # CslaBinaryWriter.Write(Decimal) self.stream.write(bytes([CslaKnownTypes.Decimal.value])) int32_repr = decimal_to_int32(value) self.write_int32(len(int32_repr)) # Should be 4 always for part in int32_repr: self.write_int32(part) def write_object_int32(self, value): # CslaBinaryWriter.Write(int) self.stream.write(bytes([CslaKnownTypes.Int32.value])) self.write_int32(value) def write_object_null(self): self.stream.write(bytes([CslaKnownTypes.Null.value])) def write_object_string(self, value): # CslaBinaryWriter.Write(string) self.stream.write(bytes([CslaKnownTypes.String.value])) self.write_string(value) def write_object_system_string(self, value): # CslaBinaryWriter.WriteSystemString if value in self.keywords_dictionary: # Dictionary key already exists dictionary_key = self.keywords_dictionary.index(value) self.stream.write(bytes([CslaKnownTypes.StringDictionaryKey.value])) self.write_int32(dictionary_key) else: # New dictionary key dictionary_key = len(self.keywords_dictionary) self.keywords_dictionary.append(value) self.stream.write(bytes([CslaKnownTypes.StringWithDictionaryKey.value])) self.write_string(value) self.write_int32(dictionary_key)