commit f0cf05824a95700b2d18246193a8bb844ab0c0dc Author: RunasSudo Date: Sun May 26 00:30:15 2024 +1000 Initial commit diff --git a/list_directory.py b/list_directory.py new file mode 100755 index 0000000..2e92a2c --- /dev/null +++ b/list_directory.py @@ -0,0 +1,266 @@ +#!/usr/bin/env python3 + +# cryptomator-utils: Python utilities for inspecting Cryptomator drives +# Copyright (C) 2024 Lee Yingtong Li (RunasSudo) +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +from Crypto.Cipher import AES + +from cryptography.hazmat.primitives.ciphers.aead import AESSIV +from cryptography.hazmat.primitives.kdf.scrypt import Scrypt +from cryptography.hazmat.primitives.keywrap import aes_key_unwrap, InvalidUnwrap + +import base64 +import getpass +import hashlib +import hmac +import json +import os +import struct +import sys + +def main(): + if len(sys.argv) < 3: + print('Usage: {} /path/to/vault.cryptomator /plaintext/path/within/drive') + sys.exit(1) + + vault_config_path = sys.argv[1] + target_directory = sys.argv[2] + + vault_path = os.path.split(vault_config_path)[0] + + # Load vault config + primary_master_key, hmac_master_key = load_vault_config(vault_config_path) + + target_directory_parts = target_directory.strip('/').split('/') + + # Traverse root directory + directory_id = '' + + # Traverse path + for path_part in target_directory_parts: + if not path_part: + continue + + hashed_directory_id = hash_directory_id(primary_master_key, hmac_master_key, directory_id) + subdirectory_dir_file = os.path.join(vault_path, 'd', hashed_directory_id[:2], hashed_directory_id[2:], encrypt_filename(primary_master_key, hmac_master_key, directory_id, path_part), 'dir.c9r') + with open(subdirectory_dir_file, 'r') as f: + new_directory_id = f.read() + + directory_id = new_directory_id + + for filename in list_directory(vault_path, primary_master_key, hmac_master_key, directory_id): + print(filename) + +def load_vault_config(vault_config_path): + # ------------------ + # Parse vault config + + with open(vault_config_path, 'r') as vault_config_file: + vault_config_data = vault_config_file.read() + + vault_config_jwt_parts = vault_config_data.split('.') + + if len(vault_config_jwt_parts) != 3: + print('Error: Invalid JWT (got {} parts, expected 3)'.format(len(vault_config_jwt_parts))) + sys.exit(1) + + try: + vault_config_header = json.loads(b64url_decode(vault_config_jwt_parts[0]).decode('utf-8')) + vault_config_payload = json.loads(b64url_decode(vault_config_jwt_parts[1]).decode('utf-8')) + except json.JSONDecodeError as ex: + print('Error: Malformed JWT (invalid JSON)') + import traceback; traceback.print_exc() + sys.exit(1) + + if vault_config_header['typ'] != 'JWT': + print('Error: Malformed JWT (no "typ" in header)') + sys.exit(1) + + if vault_config_header['alg'] != 'HS256': + print('Error: Unsupported JWT algorithm (got {}, expected HS256)'.format(vault_config_header['alg'])) + sys.exit(1) + + if vault_config_payload['format'] != 8: + print('Error: Unsupported vault format (got {}, expected 8)'.format(vault_config_payload['format'])) + sys.exit(1) + + if vault_config_payload['cipherCombo'] != 'SIV_GCM': + print('Error: Unsupported vault cipher mode (got {}, expected SIV_GCM)'.format(vault_config_payload['cipherCombo'])) + sys.exit(1) + + # ------------- + # Read key file + + if not vault_config_header['kid'].startswith('masterkeyfile:'): + print('Error: Unsupported vault master key ID (got {}, expected masterkeyfile)'.format(vault_config_header['kid'])) + sys.exit(1) + + master_key_path = os.path.join(os.path.split(vault_config_path)[0], vault_config_header['kid'][len('masterkeyfile:'):]) + + with open(master_key_path, 'r') as master_key_file: + master_key_config = json.load(master_key_file) + + # ---------- + # Derive KEK + + master_password = getpass.getpass() + + kek = Scrypt( + base64.b64decode(master_key_config['scryptSalt']), + 32, # Masterkey.SUBKEY_LEN_BYTES in cryptolib + master_key_config['scryptCostParam'], + master_key_config['scryptBlockSize'], + 1 # Scrypt.P in cryptolib + ).derive(master_password.encode('utf-8')) + + # ---------------------- + # Unwrap encryption keys + + try: + primary_master_key = aes_key_unwrap(kek, base64.b64decode(master_key_config['primaryMasterKey'])) + hmac_master_key = aes_key_unwrap(kek, base64.b64decode(master_key_config['hmacMasterKey'])) + except InvalidUnwrap: + print('Error: Incorrect password') + sys.exit(1) + + # ------------------------------- + # Validate vault config signature + + # HMAC secret is the combined "masterkey" (primary_master_key + hmac_master_key) - see Masterkey class in cryptolib + expected_signature = hmac.digest(primary_master_key + hmac_master_key, (vault_config_jwt_parts[0] + '.' + vault_config_jwt_parts[1]).encode('utf-8'), 'SHA256') + + if b64url_encode(expected_signature) != vault_config_jwt_parts[2]: + print('Error: Invalid vault configuration file signature') + sys.exit(1) + + return primary_master_key, hmac_master_key + +def list_directory(vault_path, primary_master_key, hmac_master_key, directory_id): + hashed_directory_id = hash_directory_id(primary_master_key, hmac_master_key, directory_id) + + # Check dirid.c9r matches expected + stored_directory_id = decrypt_file(vault_path, primary_master_key, hashed_directory_id, 'dirid.c9r') + if stored_directory_id != directory_id.encode('utf-8'): + print('Error: Unexpected dirid.c9r (got "{}", expected "{}")'.format(stored_directory_id.decode('utf-8'), directory_id)) + sys.exit(1) + + # List directory contents + directory_contents = [] + + for entry in os.scandir(os.path.join(vault_path, 'd', hashed_directory_id[:2], hashed_directory_id[2:])): + if entry.name == 'dirid.c9r': + continue + elif entry.name.endswith('.c9r'): + directory_contents.append(decrypt_filename(primary_master_key, hmac_master_key, directory_id, entry.name)) + elif entry.name.endswith('.c9s'): + print('Warning: Unsupported entry with long filename "{}" - not yet implemented'.format(entry.name)) + else: + print('Warning: Unknown file "{}" - ignoring'.format(entry.name)) + + return directory_contents + +def hash_directory_id(primary_master_key, hmac_master_key, directory_id): + encrypted_directory_id = aes_siv_encrypt(primary_master_key, hmac_master_key, directory_id.encode('utf-8'), None) + hashed_directory_id = base64.b32encode(hashlib.sha1(encrypted_directory_id).digest()).decode('utf-8') + return hashed_directory_id + +def encrypt_filename(primary_master_key, hmac_master_key, directory_id, filename): + return b64url_encode(aes_siv_encrypt(primary_master_key, hmac_master_key, filename.encode('utf-8'), [directory_id.encode('utf-8')]), strip_padding=False) + '.c9r' + +def decrypt_filename(primary_master_key, hmac_master_key, directory_id, encrypted_filename): + if not encrypted_filename.endswith('.c9r'): + raise ValueError('Encrypted filename must end with .c9r') + + ciphertext_filename = b64url_decode(encrypted_filename[:-len('.c9r')]) + plaintext_filename = aes_siv_decrypt(primary_master_key, hmac_master_key, ciphertext_filename, [directory_id.encode('utf-8')]).decode('utf-8') + return plaintext_filename + +def decrypt_file(vault_path, primary_master_key, hashed_directory_id, filename): + encrypted_file_path = os.path.join(vault_path, 'd', hashed_directory_id[:2], hashed_directory_id[2:], filename) + + with open(encrypted_file_path, 'rb') as f: + ciphertext = f.read() + + # Read header + ciphertext_header = ciphertext[:68] + header_nonce = ciphertext_header[:12] + header_payload = ciphertext_header[12:-16] + header_tag = ciphertext_header[-16:] + + # Decrypt header + cipher = AES.new(primary_master_key, AES.MODE_GCM, nonce=header_nonce) + plaintext_header = cipher.decrypt_and_verify(header_payload, header_tag) + + content_key = plaintext_header[8:] + + # Decrypt file in chunks + plaintext = bytearray() + + for chunk_num, idx in enumerate(range(68, len(ciphertext), 32*1024 + 28)): + ciphertext_chunk = ciphertext[idx:idx+32*1024+28] + chunk_nonce = ciphertext_chunk[:12] + chunk_payload = ciphertext_chunk[12:-16] + chunk_tag = ciphertext_chunk[-16:] + + cipher = AES.new(content_key, AES.MODE_GCM, nonce=chunk_nonce) + cipher.update(struct.pack('>Q', chunk_num)) # Contrary to documentation, this is a 64-bit not 32-bit integer (longToBigEndianByteArray in cryptolib: Long.SIZE) + cipher.update(header_nonce) + plaintext_chunk = cipher.decrypt_and_verify(chunk_payload, chunk_tag) + + plaintext.extend(plaintext_chunk) + + return bytes(plaintext) + +def aes_siv_encrypt(primary_master_key, hmac_master_key, plaintext, associated_data): + if len(plaintext) == 0: + # Must use PyCryptodome + # https://github.com/pyca/cryptography/issues/10958 - cryptography AESSIV does not accept empty plaintext (e.g. root directory) + if associated_data: + if len(associated_data) > 1: + # Incompatible with PyCryptodome + raise ValueError('Cannot encrypt zero-length plaintext with AES-SIV with >1 associated data') + + # Only one associated data - equals the nonce + + if not associated_data[0]: + # Incompatible with PyCryptodome + raise ValueError('Cannot encrypt zero-length plaintext with AES-SIV with zero-length associated data') + + ciphertext, tag = AES.new(hmac_master_key + primary_master_key, AES.MODE_SIV, nonce=associated_data[0]).encrypt_and_digest(plaintext) + return tag + ciphertext + + ciphertext, tag = AES.new(hmac_master_key + primary_master_key, AES.MODE_SIV).encrypt_and_digest(plaintext) + return tag + ciphertext + + # Otherwise use cryptography + tag_and_ciphertext = AESSIV(hmac_master_key + primary_master_key).encrypt(plaintext, associated_data) + return tag_and_ciphertext + +def aes_siv_decrypt(primary_master_key, hmac_master_key, tag_and_ciphertext, associated_data): + return AESSIV(hmac_master_key + primary_master_key).decrypt(tag_and_ciphertext, associated_data) + +def b64url_decode(s): + padding_len = -(len(s) % -4) + return base64.urlsafe_b64decode(s + '=' * padding_len) + +def b64url_encode(s, strip_padding=True): + result = base64.urlsafe_b64encode(s) + if strip_padding: + result = result.rstrip(b'=') + return result.decode('utf-8') + +if __name__ == '__main__': + main()