diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..8d35cb3
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,2 @@
+__pycache__
+*.pyc
diff --git a/lib_cryptomator_utils/__init__.py b/lib_cryptomator_utils/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/lib_cryptomator_utils/aes.py b/lib_cryptomator_utils/aes.py
new file mode 100644
index 0000000..07cbb76
--- /dev/null
+++ b/lib_cryptomator_utils/aes.py
@@ -0,0 +1,55 @@
+# cryptomator-utils: Python utilities for inspecting Cryptomator drives
+# Copyright (C) 2024 Lee Yingtong Li (RunasSudo)
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see .
+
+from Crypto.Cipher import AES
+from cryptography.hazmat.primitives.ciphers.aead import AESSIV
+
+def aes_siv_encrypt(primary_master_key, hmac_master_key, plaintext, associated_data):
+ """
+ Encrypt the given bytes using AES-SIV
+ """
+
+ if len(plaintext) == 0:
+ # Must use PyCryptodome
+ # https://github.com/pyca/cryptography/issues/10958 - cryptography AESSIV does not accept empty plaintext (e.g. root directory has empty directory ID)
+ if associated_data:
+ if len(associated_data) > 1:
+ # Incompatible with PyCryptodome
+ raise ValueError('Cannot encrypt zero-length plaintext with AES-SIV with >1 associated data')
+
+ if not associated_data[0]:
+ # Incompatible with PyCryptodome
+ raise ValueError('Cannot encrypt zero-length plaintext with AES-SIV with zero-length associated data')
+
+ # If there is only one associated data, this is equivalent to the nonce, so we can use PyCryptodome
+ ciphertext, tag = AES.new(hmac_master_key + primary_master_key, AES.MODE_SIV, nonce=associated_data[0]).encrypt_and_digest(plaintext)
+ return tag + ciphertext
+
+ # Zero-length plaintext with no AAD - encrypt with PyCryptodome
+ ciphertext, tag = AES.new(hmac_master_key + primary_master_key, AES.MODE_SIV).encrypt_and_digest(plaintext)
+ return tag + ciphertext
+
+ # In all other cases, use cryptography AESSIV
+ tag_and_ciphertext = AESSIV(hmac_master_key + primary_master_key).encrypt(plaintext, associated_data)
+ return tag_and_ciphertext
+
+def aes_siv_decrypt(primary_master_key, hmac_master_key, tag_and_ciphertext, associated_data):
+ """
+ Decrypt the given AES-SIV ciphertext
+ """
+
+ # Use cryptography AESSIV
+ return AESSIV(hmac_master_key + primary_master_key).decrypt(tag_and_ciphertext, associated_data)
diff --git a/lib_cryptomator_utils/b64url.py b/lib_cryptomator_utils/b64url.py
new file mode 100644
index 0000000..3232ca5
--- /dev/null
+++ b/lib_cryptomator_utils/b64url.py
@@ -0,0 +1,35 @@
+# cryptomator-utils: Python utilities for inspecting Cryptomator drives
+# Copyright (C) 2024 Lee Yingtong Li (RunasSudo)
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see .
+
+import base64
+
+def b64url_decode(s):
+ """
+ Decode the Base64URL (optional padding) encoded string
+ """
+
+ padding_len = -(len(s) % -4)
+ return base64.urlsafe_b64decode(s + '=' * padding_len)
+
+def b64url_encode(s, strip_padding=True):
+ """
+ Encode the bytes with Base64URL, optionally stripping padding
+ """
+
+ result = base64.urlsafe_b64encode(s)
+ if strip_padding:
+ result = result.rstrip(b'=')
+ return result.decode('utf-8')
diff --git a/lib_cryptomator_utils/cryptomator.py b/lib_cryptomator_utils/cryptomator.py
new file mode 100644
index 0000000..f808e2d
--- /dev/null
+++ b/lib_cryptomator_utils/cryptomator.py
@@ -0,0 +1,223 @@
+# cryptomator-utils: Python utilities for inspecting Cryptomator drives
+# Copyright (C) 2024 Lee Yingtong Li (RunasSudo)
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see .
+
+from .aes import aes_siv_decrypt, aes_siv_encrypt
+from .b64url import b64url_decode, b64url_encode
+
+from Crypto.Cipher import AES
+from cryptography.hazmat.primitives.kdf.scrypt import Scrypt
+from cryptography.hazmat.primitives.keywrap import aes_key_unwrap, InvalidUnwrap
+
+import base64
+import getpass
+import hashlib
+import hmac
+import json
+import os
+import struct
+import sys
+
+def load_vault_config(vault_config_path):
+ """
+ Load and verify a vault.cryptomator configuration file and associated master key
+
+ Returns (primary_master_key, hmac_master_key)
+ """
+
+ # ------------------
+ # Parse vault config
+
+ with open(vault_config_path, 'r') as vault_config_file:
+ vault_config_data = vault_config_file.read()
+
+ # Split JWT header, payload and signature
+ vault_config_jwt_parts = vault_config_data.split('.')
+
+ if len(vault_config_jwt_parts) != 3:
+ print('Error: Invalid JWT (got {} parts, expected 3)'.format(len(vault_config_jwt_parts)))
+ sys.exit(1)
+
+ # Parse JWT header and payload
+ try:
+ vault_config_header = json.loads(b64url_decode(vault_config_jwt_parts[0]).decode('utf-8'))
+ vault_config_payload = json.loads(b64url_decode(vault_config_jwt_parts[1]).decode('utf-8'))
+ except json.JSONDecodeError as ex:
+ print('Error: Malformed JWT (invalid JSON)')
+ import traceback; traceback.print_exc()
+ sys.exit(1)
+
+ # Validate settings
+ if vault_config_header['typ'] != 'JWT':
+ print('Error: Malformed JWT (no "typ" in header)')
+ sys.exit(1)
+ if vault_config_header['alg'] != 'HS256':
+ print('Error: Unsupported JWT algorithm (got {}, expected HS256)'.format(vault_config_header['alg']))
+ sys.exit(1)
+ if vault_config_payload['format'] != 8: # Current Cryptomator vault format
+ print('Error: Unsupported vault format (got {}, expected 8)'.format(vault_config_payload['format']))
+ sys.exit(1)
+ if vault_config_payload['cipherCombo'] != 'SIV_GCM':
+ print('Error: Unsupported vault cipher mode (got {}, expected SIV_GCM)'.format(vault_config_payload['cipherCombo']))
+ sys.exit(1)
+
+ # -------------
+ # Read key file
+
+ if not vault_config_header['kid'].startswith('masterkeyfile:'):
+ print('Error: Unsupported vault master key ID (got {}, expected masterkeyfile)'.format(vault_config_header['kid']))
+ sys.exit(1)
+
+ master_key_path = os.path.join(os.path.split(vault_config_path)[0], vault_config_header['kid'][len('masterkeyfile:'):])
+
+ with open(master_key_path, 'r') as master_key_file:
+ master_key_config = json.load(master_key_file)
+
+ # ----------
+ # Derive KEK
+
+ master_password = getpass.getpass()
+
+ kek = Scrypt(
+ base64.b64decode(master_key_config['scryptSalt']),
+ 32, # Masterkey.SUBKEY_LEN_BYTES in cryptolib
+ master_key_config['scryptCostParam'],
+ master_key_config['scryptBlockSize'],
+ 1 # Scrypt.P in cryptolib
+ ).derive(master_password.encode('utf-8'))
+
+ # -----------------------------------------
+ # Unwrap encryption keys using AES Key Wrap
+
+ try:
+ primary_master_key = aes_key_unwrap(kek, base64.b64decode(master_key_config['primaryMasterKey']))
+ hmac_master_key = aes_key_unwrap(kek, base64.b64decode(master_key_config['hmacMasterKey']))
+ except InvalidUnwrap:
+ print('Error: Incorrect password')
+ sys.exit(1)
+
+ # -------------------------------
+ # Validate vault config signature
+
+ # HMAC secret is the combined "masterkey" (primary_master_key + hmac_master_key) - see Masterkey class in cryptolib
+ expected_signature = hmac.digest(primary_master_key + hmac_master_key, (vault_config_jwt_parts[0] + '.' + vault_config_jwt_parts[1]).encode('utf-8'), 'SHA256')
+
+ if b64url_encode(expected_signature) != vault_config_jwt_parts[2]:
+ print('Error: Invalid vault configuration file signature')
+ sys.exit(1)
+
+ return primary_master_key, hmac_master_key
+
+def hash_directory_id(primary_master_key, hmac_master_key, directory_id):
+ """
+ Compute a hashed encrypted directory ID for the given plaintext directory ID
+ """
+
+ # Directory ID is encrypted with AES-SIV and hashed
+ encrypted_directory_id = aes_siv_encrypt(primary_master_key, hmac_master_key, directory_id.encode('utf-8'), None)
+ hashed_directory_id = base64.b32encode(hashlib.sha1(encrypted_directory_id).digest()).decode('utf-8')
+ return hashed_directory_id
+
+def list_directory(vault_path, primary_master_key, hmac_master_key, directory_id):
+ """
+ Return a list of files and directories in the directory with the given plaintext directory ID
+ """
+
+ hashed_directory_id = hash_directory_id(primary_master_key, hmac_master_key, directory_id)
+
+ # Check directory ID given by dirid.c9r matches the expected directory ID
+ stored_directory_id = decrypt_file(vault_path, primary_master_key, hashed_directory_id, 'dirid.c9r')
+ if stored_directory_id != directory_id.encode('utf-8'):
+ print('Error: Unexpected dirid.c9r (got "{}", expected "{}")'.format(stored_directory_id.decode('utf-8'), directory_id))
+ sys.exit(1)
+
+ # List directory contents
+ directory_contents = []
+
+ for entry in os.scandir(os.path.join(vault_path, 'd', hashed_directory_id[:2], hashed_directory_id[2:])):
+ if entry.name == 'dirid.c9r':
+ continue
+ elif entry.name.endswith('.c9r'):
+ directory_contents.append(decrypt_filename(primary_master_key, hmac_master_key, directory_id, entry.name))
+ elif entry.name.endswith('.c9s'):
+ print('Warning: Unsupported entry with long filename "{}" - not yet implemented'.format(entry.name))
+ else:
+ print('Warning: Unknown file "{}" - ignoring'.format(entry.name))
+
+ return directory_contents
+
+def encrypt_filename(primary_master_key, hmac_master_key, directory_id, filename):
+ """
+ Return the encrypted filename for a given plaintext filename within a directory with the given plaintext directory ID
+ """
+
+ # Filename is encrypted with AES-SIV, passing the plaintext directory ID as AAD
+ return b64url_encode(aes_siv_encrypt(primary_master_key, hmac_master_key, filename.encode('utf-8'), [directory_id.encode('utf-8')]), strip_padding=False) + '.c9r'
+
+def decrypt_filename(primary_master_key, hmac_master_key, directory_id, encrypted_filename):
+ """
+ Return the plaintext filename for a given encrypted filename within a directory with the given plaintext directory ID
+ """
+
+ if not encrypted_filename.endswith('.c9r'):
+ raise ValueError('Encrypted filename must end with .c9r')
+
+ # Reverse the AES-SIV encryption, passing the plaintext directory ID as AAD
+ ciphertext_filename = b64url_decode(encrypted_filename[:-len('.c9r')])
+ plaintext_filename = aes_siv_decrypt(primary_master_key, hmac_master_key, ciphertext_filename, [directory_id.encode('utf-8')]).decode('utf-8')
+ return plaintext_filename
+
+def decrypt_file(vault_path, primary_master_key, hashed_directory_id, filename):
+ """
+ Return the decrypted contents of the file given by the plaintext filename and hashed directory ID
+ """
+
+ # Read the encrypted contents
+ encrypted_file_path = os.path.join(vault_path, 'd', hashed_directory_id[:2], hashed_directory_id[2:], filename)
+ with open(encrypted_file_path, 'rb') as f:
+ ciphertext = f.read()
+
+ # Read header
+ ciphertext_header = ciphertext[:68]
+ header_nonce = ciphertext_header[:12]
+ header_payload = ciphertext_header[12:-16]
+ header_tag = ciphertext_header[-16:]
+
+ # Decrypt header to obtain the content key
+ cipher = AES.new(primary_master_key, AES.MODE_GCM, nonce=header_nonce)
+ plaintext_header = cipher.decrypt_and_verify(header_payload, header_tag)
+ content_key = plaintext_header[8:]
+
+ # Decrypt file in chunks corresponding to 32 KiB plaintext
+ plaintext = bytearray()
+
+ for chunk_num, idx in enumerate(range(68, len(ciphertext), 32*1024 + 28)): # Skip 68 byte header; each encrypted chunk is 32 KiB + 28 byte overhead
+ ciphertext_chunk = ciphertext[idx:idx+32*1024+28]
+ chunk_nonce = ciphertext_chunk[:12]
+ chunk_payload = ciphertext_chunk[12:-16]
+ chunk_tag = ciphertext_chunk[-16:]
+
+ # Chunk is encrypted with AES-GCM
+ cipher = AES.new(content_key, AES.MODE_GCM, nonce=chunk_nonce)
+
+ # Chunk number and header nonce are passed as AAD
+ # Contrary to the Cryptomator documentation, chunk number is a 64-bit not 32-bit integer - https://github.com/cryptomator/docs/pull/54
+ cipher.update(struct.pack('>Q', chunk_num))
+ cipher.update(header_nonce)
+
+ plaintext_chunk = cipher.decrypt_and_verify(chunk_payload, chunk_tag)
+ plaintext.extend(plaintext_chunk)
+
+ return bytes(plaintext)
diff --git a/list_directory.py b/list_directory.py
index 2e92a2c..50970af 100755
--- a/list_directory.py
+++ b/list_directory.py
@@ -16,19 +16,9 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see .
-from Crypto.Cipher import AES
+from lib_cryptomator_utils.cryptomator import encrypt_filename, hash_directory_id, list_directory, load_vault_config
-from cryptography.hazmat.primitives.ciphers.aead import AESSIV
-from cryptography.hazmat.primitives.kdf.scrypt import Scrypt
-from cryptography.hazmat.primitives.keywrap import aes_key_unwrap, InvalidUnwrap
-
-import base64
-import getpass
-import hashlib
-import hmac
-import json
import os
-import struct
import sys
def main():
@@ -36,17 +26,19 @@ def main():
print('Usage: {} /path/to/vault.cryptomator /plaintext/path/within/drive')
sys.exit(1)
+ # Parse CLI arguments
vault_config_path = sys.argv[1]
target_directory = sys.argv[2]
vault_path = os.path.split(vault_config_path)[0]
- # Load vault config
+ # Load vault config (asks for password)
primary_master_key, hmac_master_key = load_vault_config(vault_config_path)
target_directory_parts = target_directory.strip('/').split('/')
- # Traverse root directory
+ # Begin in root directory
+ # The root directory in Cryptomator has an empty directory ID
directory_id = ''
# Traverse path
@@ -54,213 +46,23 @@ def main():
if not path_part:
continue
+ # Hash the current directory ID
hashed_directory_id = hash_directory_id(primary_master_key, hmac_master_key, directory_id)
- subdirectory_dir_file = os.path.join(vault_path, 'd', hashed_directory_id[:2], hashed_directory_id[2:], encrypt_filename(primary_master_key, hmac_master_key, directory_id, path_part), 'dir.c9r')
+
+ # Look up the encrypted path_part in the current directory
+ encrypted_filename = encrypt_filename(primary_master_key, hmac_master_key, directory_id, path_part)
+
+ # Get the directory ID of the part_path
+ subdirectory_dir_file = os.path.join(vault_path, 'd', hashed_directory_id[:2], hashed_directory_id[2:], encrypted_filename, 'dir.c9r')
with open(subdirectory_dir_file, 'r') as f:
new_directory_id = f.read()
+ # Traverse to the new directory ID
directory_id = new_directory_id
- for filename in list_directory(vault_path, primary_master_key, hmac_master_key, directory_id):
+ # Now we have reached the requested directory, so print directory listing
+ for filename in sorted(list_directory(vault_path, primary_master_key, hmac_master_key, directory_id)):
print(filename)
-def load_vault_config(vault_config_path):
- # ------------------
- # Parse vault config
-
- with open(vault_config_path, 'r') as vault_config_file:
- vault_config_data = vault_config_file.read()
-
- vault_config_jwt_parts = vault_config_data.split('.')
-
- if len(vault_config_jwt_parts) != 3:
- print('Error: Invalid JWT (got {} parts, expected 3)'.format(len(vault_config_jwt_parts)))
- sys.exit(1)
-
- try:
- vault_config_header = json.loads(b64url_decode(vault_config_jwt_parts[0]).decode('utf-8'))
- vault_config_payload = json.loads(b64url_decode(vault_config_jwt_parts[1]).decode('utf-8'))
- except json.JSONDecodeError as ex:
- print('Error: Malformed JWT (invalid JSON)')
- import traceback; traceback.print_exc()
- sys.exit(1)
-
- if vault_config_header['typ'] != 'JWT':
- print('Error: Malformed JWT (no "typ" in header)')
- sys.exit(1)
-
- if vault_config_header['alg'] != 'HS256':
- print('Error: Unsupported JWT algorithm (got {}, expected HS256)'.format(vault_config_header['alg']))
- sys.exit(1)
-
- if vault_config_payload['format'] != 8:
- print('Error: Unsupported vault format (got {}, expected 8)'.format(vault_config_payload['format']))
- sys.exit(1)
-
- if vault_config_payload['cipherCombo'] != 'SIV_GCM':
- print('Error: Unsupported vault cipher mode (got {}, expected SIV_GCM)'.format(vault_config_payload['cipherCombo']))
- sys.exit(1)
-
- # -------------
- # Read key file
-
- if not vault_config_header['kid'].startswith('masterkeyfile:'):
- print('Error: Unsupported vault master key ID (got {}, expected masterkeyfile)'.format(vault_config_header['kid']))
- sys.exit(1)
-
- master_key_path = os.path.join(os.path.split(vault_config_path)[0], vault_config_header['kid'][len('masterkeyfile:'):])
-
- with open(master_key_path, 'r') as master_key_file:
- master_key_config = json.load(master_key_file)
-
- # ----------
- # Derive KEK
-
- master_password = getpass.getpass()
-
- kek = Scrypt(
- base64.b64decode(master_key_config['scryptSalt']),
- 32, # Masterkey.SUBKEY_LEN_BYTES in cryptolib
- master_key_config['scryptCostParam'],
- master_key_config['scryptBlockSize'],
- 1 # Scrypt.P in cryptolib
- ).derive(master_password.encode('utf-8'))
-
- # ----------------------
- # Unwrap encryption keys
-
- try:
- primary_master_key = aes_key_unwrap(kek, base64.b64decode(master_key_config['primaryMasterKey']))
- hmac_master_key = aes_key_unwrap(kek, base64.b64decode(master_key_config['hmacMasterKey']))
- except InvalidUnwrap:
- print('Error: Incorrect password')
- sys.exit(1)
-
- # -------------------------------
- # Validate vault config signature
-
- # HMAC secret is the combined "masterkey" (primary_master_key + hmac_master_key) - see Masterkey class in cryptolib
- expected_signature = hmac.digest(primary_master_key + hmac_master_key, (vault_config_jwt_parts[0] + '.' + vault_config_jwt_parts[1]).encode('utf-8'), 'SHA256')
-
- if b64url_encode(expected_signature) != vault_config_jwt_parts[2]:
- print('Error: Invalid vault configuration file signature')
- sys.exit(1)
-
- return primary_master_key, hmac_master_key
-
-def list_directory(vault_path, primary_master_key, hmac_master_key, directory_id):
- hashed_directory_id = hash_directory_id(primary_master_key, hmac_master_key, directory_id)
-
- # Check dirid.c9r matches expected
- stored_directory_id = decrypt_file(vault_path, primary_master_key, hashed_directory_id, 'dirid.c9r')
- if stored_directory_id != directory_id.encode('utf-8'):
- print('Error: Unexpected dirid.c9r (got "{}", expected "{}")'.format(stored_directory_id.decode('utf-8'), directory_id))
- sys.exit(1)
-
- # List directory contents
- directory_contents = []
-
- for entry in os.scandir(os.path.join(vault_path, 'd', hashed_directory_id[:2], hashed_directory_id[2:])):
- if entry.name == 'dirid.c9r':
- continue
- elif entry.name.endswith('.c9r'):
- directory_contents.append(decrypt_filename(primary_master_key, hmac_master_key, directory_id, entry.name))
- elif entry.name.endswith('.c9s'):
- print('Warning: Unsupported entry with long filename "{}" - not yet implemented'.format(entry.name))
- else:
- print('Warning: Unknown file "{}" - ignoring'.format(entry.name))
-
- return directory_contents
-
-def hash_directory_id(primary_master_key, hmac_master_key, directory_id):
- encrypted_directory_id = aes_siv_encrypt(primary_master_key, hmac_master_key, directory_id.encode('utf-8'), None)
- hashed_directory_id = base64.b32encode(hashlib.sha1(encrypted_directory_id).digest()).decode('utf-8')
- return hashed_directory_id
-
-def encrypt_filename(primary_master_key, hmac_master_key, directory_id, filename):
- return b64url_encode(aes_siv_encrypt(primary_master_key, hmac_master_key, filename.encode('utf-8'), [directory_id.encode('utf-8')]), strip_padding=False) + '.c9r'
-
-def decrypt_filename(primary_master_key, hmac_master_key, directory_id, encrypted_filename):
- if not encrypted_filename.endswith('.c9r'):
- raise ValueError('Encrypted filename must end with .c9r')
-
- ciphertext_filename = b64url_decode(encrypted_filename[:-len('.c9r')])
- plaintext_filename = aes_siv_decrypt(primary_master_key, hmac_master_key, ciphertext_filename, [directory_id.encode('utf-8')]).decode('utf-8')
- return plaintext_filename
-
-def decrypt_file(vault_path, primary_master_key, hashed_directory_id, filename):
- encrypted_file_path = os.path.join(vault_path, 'd', hashed_directory_id[:2], hashed_directory_id[2:], filename)
-
- with open(encrypted_file_path, 'rb') as f:
- ciphertext = f.read()
-
- # Read header
- ciphertext_header = ciphertext[:68]
- header_nonce = ciphertext_header[:12]
- header_payload = ciphertext_header[12:-16]
- header_tag = ciphertext_header[-16:]
-
- # Decrypt header
- cipher = AES.new(primary_master_key, AES.MODE_GCM, nonce=header_nonce)
- plaintext_header = cipher.decrypt_and_verify(header_payload, header_tag)
-
- content_key = plaintext_header[8:]
-
- # Decrypt file in chunks
- plaintext = bytearray()
-
- for chunk_num, idx in enumerate(range(68, len(ciphertext), 32*1024 + 28)):
- ciphertext_chunk = ciphertext[idx:idx+32*1024+28]
- chunk_nonce = ciphertext_chunk[:12]
- chunk_payload = ciphertext_chunk[12:-16]
- chunk_tag = ciphertext_chunk[-16:]
-
- cipher = AES.new(content_key, AES.MODE_GCM, nonce=chunk_nonce)
- cipher.update(struct.pack('>Q', chunk_num)) # Contrary to documentation, this is a 64-bit not 32-bit integer (longToBigEndianByteArray in cryptolib: Long.SIZE)
- cipher.update(header_nonce)
- plaintext_chunk = cipher.decrypt_and_verify(chunk_payload, chunk_tag)
-
- plaintext.extend(plaintext_chunk)
-
- return bytes(plaintext)
-
-def aes_siv_encrypt(primary_master_key, hmac_master_key, plaintext, associated_data):
- if len(plaintext) == 0:
- # Must use PyCryptodome
- # https://github.com/pyca/cryptography/issues/10958 - cryptography AESSIV does not accept empty plaintext (e.g. root directory)
- if associated_data:
- if len(associated_data) > 1:
- # Incompatible with PyCryptodome
- raise ValueError('Cannot encrypt zero-length plaintext with AES-SIV with >1 associated data')
-
- # Only one associated data - equals the nonce
-
- if not associated_data[0]:
- # Incompatible with PyCryptodome
- raise ValueError('Cannot encrypt zero-length plaintext with AES-SIV with zero-length associated data')
-
- ciphertext, tag = AES.new(hmac_master_key + primary_master_key, AES.MODE_SIV, nonce=associated_data[0]).encrypt_and_digest(plaintext)
- return tag + ciphertext
-
- ciphertext, tag = AES.new(hmac_master_key + primary_master_key, AES.MODE_SIV).encrypt_and_digest(plaintext)
- return tag + ciphertext
-
- # Otherwise use cryptography
- tag_and_ciphertext = AESSIV(hmac_master_key + primary_master_key).encrypt(plaintext, associated_data)
- return tag_and_ciphertext
-
-def aes_siv_decrypt(primary_master_key, hmac_master_key, tag_and_ciphertext, associated_data):
- return AESSIV(hmac_master_key + primary_master_key).decrypt(tag_and_ciphertext, associated_data)
-
-def b64url_decode(s):
- padding_len = -(len(s) % -4)
- return base64.urlsafe_b64decode(s + '=' * padding_len)
-
-def b64url_encode(s, strip_padding=True):
- result = base64.urlsafe_b64encode(s)
- if strip_padding:
- result = result.rstrip(b'=')
- return result.decode('utf-8')
-
if __name__ == '__main__':
main()