#!/usr/bin/env python3 # cryptomator-utils: Python utilities for inspecting Cryptomator drives # Copyright (C) 2024 Lee Yingtong Li (RunasSudo) # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . from Crypto.Cipher import AES from cryptography.hazmat.primitives.ciphers.aead import AESSIV from cryptography.hazmat.primitives.kdf.scrypt import Scrypt from cryptography.hazmat.primitives.keywrap import aes_key_unwrap, InvalidUnwrap import base64 import getpass import hashlib import hmac import json import os import struct import sys def main(): if len(sys.argv) < 3: print('Usage: {} /path/to/vault.cryptomator /plaintext/path/within/drive') sys.exit(1) vault_config_path = sys.argv[1] target_directory = sys.argv[2] vault_path = os.path.split(vault_config_path)[0] # Load vault config primary_master_key, hmac_master_key = load_vault_config(vault_config_path) target_directory_parts = target_directory.strip('/').split('/') # Traverse root directory directory_id = '' # Traverse path for path_part in target_directory_parts: if not path_part: continue hashed_directory_id = hash_directory_id(primary_master_key, hmac_master_key, directory_id) subdirectory_dir_file = os.path.join(vault_path, 'd', hashed_directory_id[:2], hashed_directory_id[2:], encrypt_filename(primary_master_key, hmac_master_key, directory_id, path_part), 'dir.c9r') with open(subdirectory_dir_file, 'r') as f: new_directory_id = f.read() directory_id = new_directory_id for filename in list_directory(vault_path, primary_master_key, hmac_master_key, directory_id): print(filename) def load_vault_config(vault_config_path): # ------------------ # Parse vault config with open(vault_config_path, 'r') as vault_config_file: vault_config_data = vault_config_file.read() vault_config_jwt_parts = vault_config_data.split('.') if len(vault_config_jwt_parts) != 3: print('Error: Invalid JWT (got {} parts, expected 3)'.format(len(vault_config_jwt_parts))) sys.exit(1) try: vault_config_header = json.loads(b64url_decode(vault_config_jwt_parts[0]).decode('utf-8')) vault_config_payload = json.loads(b64url_decode(vault_config_jwt_parts[1]).decode('utf-8')) except json.JSONDecodeError as ex: print('Error: Malformed JWT (invalid JSON)') import traceback; traceback.print_exc() sys.exit(1) if vault_config_header['typ'] != 'JWT': print('Error: Malformed JWT (no "typ" in header)') sys.exit(1) if vault_config_header['alg'] != 'HS256': print('Error: Unsupported JWT algorithm (got {}, expected HS256)'.format(vault_config_header['alg'])) sys.exit(1) if vault_config_payload['format'] != 8: print('Error: Unsupported vault format (got {}, expected 8)'.format(vault_config_payload['format'])) sys.exit(1) if vault_config_payload['cipherCombo'] != 'SIV_GCM': print('Error: Unsupported vault cipher mode (got {}, expected SIV_GCM)'.format(vault_config_payload['cipherCombo'])) sys.exit(1) # ------------- # Read key file if not vault_config_header['kid'].startswith('masterkeyfile:'): print('Error: Unsupported vault master key ID (got {}, expected masterkeyfile)'.format(vault_config_header['kid'])) sys.exit(1) master_key_path = os.path.join(os.path.split(vault_config_path)[0], vault_config_header['kid'][len('masterkeyfile:'):]) with open(master_key_path, 'r') as master_key_file: master_key_config = json.load(master_key_file) # ---------- # Derive KEK master_password = getpass.getpass() kek = Scrypt( base64.b64decode(master_key_config['scryptSalt']), 32, # Masterkey.SUBKEY_LEN_BYTES in cryptolib master_key_config['scryptCostParam'], master_key_config['scryptBlockSize'], 1 # Scrypt.P in cryptolib ).derive(master_password.encode('utf-8')) # ---------------------- # Unwrap encryption keys try: primary_master_key = aes_key_unwrap(kek, base64.b64decode(master_key_config['primaryMasterKey'])) hmac_master_key = aes_key_unwrap(kek, base64.b64decode(master_key_config['hmacMasterKey'])) except InvalidUnwrap: print('Error: Incorrect password') sys.exit(1) # ------------------------------- # Validate vault config signature # HMAC secret is the combined "masterkey" (primary_master_key + hmac_master_key) - see Masterkey class in cryptolib expected_signature = hmac.digest(primary_master_key + hmac_master_key, (vault_config_jwt_parts[0] + '.' + vault_config_jwt_parts[1]).encode('utf-8'), 'SHA256') if b64url_encode(expected_signature) != vault_config_jwt_parts[2]: print('Error: Invalid vault configuration file signature') sys.exit(1) return primary_master_key, hmac_master_key def list_directory(vault_path, primary_master_key, hmac_master_key, directory_id): hashed_directory_id = hash_directory_id(primary_master_key, hmac_master_key, directory_id) # Check dirid.c9r matches expected stored_directory_id = decrypt_file(vault_path, primary_master_key, hashed_directory_id, 'dirid.c9r') if stored_directory_id != directory_id.encode('utf-8'): print('Error: Unexpected dirid.c9r (got "{}", expected "{}")'.format(stored_directory_id.decode('utf-8'), directory_id)) sys.exit(1) # List directory contents directory_contents = [] for entry in os.scandir(os.path.join(vault_path, 'd', hashed_directory_id[:2], hashed_directory_id[2:])): if entry.name == 'dirid.c9r': continue elif entry.name.endswith('.c9r'): directory_contents.append(decrypt_filename(primary_master_key, hmac_master_key, directory_id, entry.name)) elif entry.name.endswith('.c9s'): print('Warning: Unsupported entry with long filename "{}" - not yet implemented'.format(entry.name)) else: print('Warning: Unknown file "{}" - ignoring'.format(entry.name)) return directory_contents def hash_directory_id(primary_master_key, hmac_master_key, directory_id): encrypted_directory_id = aes_siv_encrypt(primary_master_key, hmac_master_key, directory_id.encode('utf-8'), None) hashed_directory_id = base64.b32encode(hashlib.sha1(encrypted_directory_id).digest()).decode('utf-8') return hashed_directory_id def encrypt_filename(primary_master_key, hmac_master_key, directory_id, filename): return b64url_encode(aes_siv_encrypt(primary_master_key, hmac_master_key, filename.encode('utf-8'), [directory_id.encode('utf-8')]), strip_padding=False) + '.c9r' def decrypt_filename(primary_master_key, hmac_master_key, directory_id, encrypted_filename): if not encrypted_filename.endswith('.c9r'): raise ValueError('Encrypted filename must end with .c9r') ciphertext_filename = b64url_decode(encrypted_filename[:-len('.c9r')]) plaintext_filename = aes_siv_decrypt(primary_master_key, hmac_master_key, ciphertext_filename, [directory_id.encode('utf-8')]).decode('utf-8') return plaintext_filename def decrypt_file(vault_path, primary_master_key, hashed_directory_id, filename): encrypted_file_path = os.path.join(vault_path, 'd', hashed_directory_id[:2], hashed_directory_id[2:], filename) with open(encrypted_file_path, 'rb') as f: ciphertext = f.read() # Read header ciphertext_header = ciphertext[:68] header_nonce = ciphertext_header[:12] header_payload = ciphertext_header[12:-16] header_tag = ciphertext_header[-16:] # Decrypt header cipher = AES.new(primary_master_key, AES.MODE_GCM, nonce=header_nonce) plaintext_header = cipher.decrypt_and_verify(header_payload, header_tag) content_key = plaintext_header[8:] # Decrypt file in chunks plaintext = bytearray() for chunk_num, idx in enumerate(range(68, len(ciphertext), 32*1024 + 28)): ciphertext_chunk = ciphertext[idx:idx+32*1024+28] chunk_nonce = ciphertext_chunk[:12] chunk_payload = ciphertext_chunk[12:-16] chunk_tag = ciphertext_chunk[-16:] cipher = AES.new(content_key, AES.MODE_GCM, nonce=chunk_nonce) cipher.update(struct.pack('>Q', chunk_num)) # Contrary to documentation, this is a 64-bit not 32-bit integer (longToBigEndianByteArray in cryptolib: Long.SIZE) cipher.update(header_nonce) plaintext_chunk = cipher.decrypt_and_verify(chunk_payload, chunk_tag) plaintext.extend(plaintext_chunk) return bytes(plaintext) def aes_siv_encrypt(primary_master_key, hmac_master_key, plaintext, associated_data): if len(plaintext) == 0: # Must use PyCryptodome # https://github.com/pyca/cryptography/issues/10958 - cryptography AESSIV does not accept empty plaintext (e.g. root directory) if associated_data: if len(associated_data) > 1: # Incompatible with PyCryptodome raise ValueError('Cannot encrypt zero-length plaintext with AES-SIV with >1 associated data') # Only one associated data - equals the nonce if not associated_data[0]: # Incompatible with PyCryptodome raise ValueError('Cannot encrypt zero-length plaintext with AES-SIV with zero-length associated data') ciphertext, tag = AES.new(hmac_master_key + primary_master_key, AES.MODE_SIV, nonce=associated_data[0]).encrypt_and_digest(plaintext) return tag + ciphertext ciphertext, tag = AES.new(hmac_master_key + primary_master_key, AES.MODE_SIV).encrypt_and_digest(plaintext) return tag + ciphertext # Otherwise use cryptography tag_and_ciphertext = AESSIV(hmac_master_key + primary_master_key).encrypt(plaintext, associated_data) return tag_and_ciphertext def aes_siv_decrypt(primary_master_key, hmac_master_key, tag_and_ciphertext, associated_data): return AESSIV(hmac_master_key + primary_master_key).decrypt(tag_and_ciphertext, associated_data) def b64url_decode(s): padding_len = -(len(s) % -4) return base64.urlsafe_b64decode(s + '=' * padding_len) def b64url_encode(s, strip_padding=True): result = base64.urlsafe_b64encode(s) if strip_padding: result = result.rstrip(b'=') return result.decode('utf-8') if __name__ == '__main__': main()