# cryptomator-utils: Python utilities for inspecting Cryptomator drives # Copyright (C) 2024 Lee Yingtong Li (RunasSudo) # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . from .aes import aes_siv_decrypt, aes_siv_encrypt from .b64url import b64url_decode, b64url_encode from cryptography.hazmat.primitives.ciphers import Cipher from cryptography.hazmat.primitives.ciphers.aead import AESGCM from cryptography.hazmat.primitives.ciphers.algorithms import AES from cryptography.hazmat.primitives.ciphers.modes import GCM from cryptography.hazmat.primitives.kdf.scrypt import Scrypt from cryptography.hazmat.primitives.keywrap import aes_key_unwrap, InvalidUnwrap import base64 import getpass import hashlib import hmac import json import os import struct import sys def load_vault_config(vault_config_path): """ Load and verify a vault.cryptomator configuration file and associated master key Returns (primary_master_key, hmac_master_key) """ # ------------------ # Parse vault config with open(vault_config_path, 'r') as vault_config_file: vault_config_data = vault_config_file.read() # Split JWT header, payload and signature vault_config_jwt_parts = vault_config_data.split('.') if len(vault_config_jwt_parts) != 3: print('Error: Invalid JWT (got {} parts, expected 3)'.format(len(vault_config_jwt_parts)), file=sys.stderr) sys.exit(1) # Parse JWT header and payload try: vault_config_header = json.loads(b64url_decode(vault_config_jwt_parts[0]).decode('utf-8')) vault_config_payload = json.loads(b64url_decode(vault_config_jwt_parts[1]).decode('utf-8')) except json.JSONDecodeError as ex: print('Error: Malformed JWT (invalid JSON)', file=sys.stderr) import traceback; traceback.print_exc() sys.exit(1) # Validate settings if vault_config_header['typ'] != 'JWT': print('Error: Malformed JWT (no "typ" in header)', file=sys.stderr) sys.exit(1) if vault_config_header['alg'] != 'HS256': print('Error: Unsupported JWT algorithm (got {}, expected HS256)'.format(vault_config_header['alg']), file=sys.stderr) sys.exit(1) if vault_config_payload['format'] != 8: # Current Cryptomator vault format print('Error: Unsupported vault format (got {}, expected 8)'.format(vault_config_payload['format']), file=sys.stderr) sys.exit(1) if vault_config_payload['cipherCombo'] != 'SIV_GCM': print('Error: Unsupported vault cipher mode (got {}, expected SIV_GCM)'.format(vault_config_payload['cipherCombo']), file=sys.stderr) sys.exit(1) # ------------- # Read key file if not vault_config_header['kid'].startswith('masterkeyfile:'): print('Error: Unsupported vault master key ID (got {}, expected masterkeyfile)'.format(vault_config_header['kid']), file=sys.stderr) sys.exit(1) master_key_path = os.path.join(os.path.split(vault_config_path)[0], vault_config_header['kid'][len('masterkeyfile:'):]) with open(master_key_path, 'r') as master_key_file: master_key_config = json.load(master_key_file) # ---------- # Derive KEK master_password = getpass.getpass() kek = Scrypt( base64.b64decode(master_key_config['scryptSalt']), 32, # Masterkey.SUBKEY_LEN_BYTES in cryptolib master_key_config['scryptCostParam'], master_key_config['scryptBlockSize'], 1 # Scrypt.P in cryptolib ).derive(master_password.encode('utf-8')) # ----------------------------------------- # Unwrap encryption keys using AES Key Wrap try: primary_master_key = aes_key_unwrap(kek, base64.b64decode(master_key_config['primaryMasterKey'])) hmac_master_key = aes_key_unwrap(kek, base64.b64decode(master_key_config['hmacMasterKey'])) except InvalidUnwrap: print('Error: Incorrect password', file=sys.stderr) sys.exit(1) # ------------------------------- # Validate vault config signature # HMAC secret is the combined "masterkey" (primary_master_key + hmac_master_key) - see Masterkey class in cryptolib expected_signature = hmac.digest(primary_master_key + hmac_master_key, (vault_config_jwt_parts[0] + '.' + vault_config_jwt_parts[1]).encode('utf-8'), 'SHA256') if b64url_encode(expected_signature) != vault_config_jwt_parts[2]: print('Error: Invalid vault configuration file signature', file=sys.stderr) sys.exit(1) return primary_master_key, hmac_master_key def hash_directory_id(primary_master_key, hmac_master_key, directory_id): """ Compute a hashed encrypted directory ID for the given plaintext directory ID """ # Directory ID is encrypted with AES-SIV and hashed encrypted_directory_id = aes_siv_encrypt(primary_master_key, hmac_master_key, directory_id.encode('utf-8'), None) hashed_directory_id = base64.b32encode(hashlib.sha1(encrypted_directory_id).digest()).decode('utf-8') return hashed_directory_id def directory_path_to_id(vault_path, primary_master_key, hmac_master_key, directory_path): """ Recurse the drive to resolve the directory ID for the directory at the given plaintext path """ directory_path_parts = directory_path.strip('/').split('/') # Begin in root directory # The root directory in Cryptomator has an empty directory ID directory_id = '' # Traverse path for path_part in directory_path_parts: if not path_part: continue # Hash the current directory ID hashed_directory_id = hash_directory_id(primary_master_key, hmac_master_key, directory_id) # Look up the encrypted path_part in the current directory encrypted_filename = encrypt_filename(primary_master_key, hmac_master_key, directory_id, path_part) # Get the directory ID of the part_path subdirectory_dir_file = os.path.join(vault_path, 'd', hashed_directory_id[:2], hashed_directory_id[2:], encrypted_filename, 'dir.c9r') with open(subdirectory_dir_file, 'r') as f: new_directory_id = f.read() # Traverse to the new directory ID directory_id = new_directory_id return directory_id def list_directory(vault_path, primary_master_key, hmac_master_key, directory_id): """ Return a list of files and directories in the directory with the given plaintext directory ID """ hashed_directory_id = hash_directory_id(primary_master_key, hmac_master_key, directory_id) # Check directory ID given by dirid.c9r matches the expected directory ID stored_directory_id = decrypt_file(vault_path, primary_master_key, hashed_directory_id, 'dirid.c9r') if stored_directory_id != directory_id.encode('utf-8'): print('Error: Unexpected dirid.c9r (got "{}", expected "{}")'.format(stored_directory_id.decode('utf-8'), directory_id), file=sys.stderr) sys.exit(1) # List directory contents directory_contents = [] for entry in os.scandir(os.path.join(vault_path, 'd', hashed_directory_id[:2], hashed_directory_id[2:])): if entry.name == 'dirid.c9r': continue elif entry.name.endswith('.c9r'): directory_contents.append(decrypt_filename(primary_master_key, hmac_master_key, directory_id, entry.name)) elif entry.name.endswith('.c9s'): print('Warning: Unsupported entry with long filename "{}" - not yet implemented'.format(entry.name), file=sys.stderr) else: print('Warning: Unknown file "{}" - ignoring'.format(entry.name), file=sys.stderr) return directory_contents def encrypt_filename(primary_master_key, hmac_master_key, directory_id, filename): """ Return the encrypted filename for a given plaintext filename within a directory with the given plaintext directory ID """ # Filename is encrypted with AES-SIV, passing the plaintext directory ID as AAD return b64url_encode(aes_siv_encrypt(primary_master_key, hmac_master_key, filename.encode('utf-8'), [directory_id.encode('utf-8')]), strip_padding=False) + '.c9r' def decrypt_filename(primary_master_key, hmac_master_key, directory_id, encrypted_filename): """ Return the plaintext filename for a given encrypted filename within a directory with the given plaintext directory ID """ if not encrypted_filename.endswith('.c9r'): raise ValueError('Encrypted filename must end with .c9r') # Reverse the AES-SIV encryption, passing the plaintext directory ID as AAD ciphertext_filename = b64url_decode(encrypted_filename[:-len('.c9r')]) plaintext_filename = aes_siv_decrypt(primary_master_key, hmac_master_key, ciphertext_filename, [directory_id.encode('utf-8')]).decode('utf-8') return plaintext_filename def decrypt_file(vault_path, primary_master_key, hashed_directory_id, filename): """ Return the decrypted contents of the file given by the plaintext filename and hashed directory ID """ # Read the encrypted contents encrypted_file_path = os.path.join(vault_path, 'd', hashed_directory_id[:2], hashed_directory_id[2:], filename) with open(encrypted_file_path, 'rb') as f: ciphertext = f.read() # Read header ciphertext_header = ciphertext[:68] header_nonce = ciphertext_header[:12] header_payload = ciphertext_header[12:-16] header_tag = ciphertext_header[-16:] # Decrypt header to obtain the content key plaintext_header = AESGCM(primary_master_key).decrypt(header_nonce, header_payload + header_tag, None) content_key = plaintext_header[8:] # Decrypt file in chunks corresponding to 32 KiB plaintext plaintext = bytearray() for chunk_num, idx in enumerate(range(68, len(ciphertext), 32*1024 + 28)): # Skip 68 byte header; each encrypted chunk is 32 KiB + 28 byte overhead ciphertext_chunk = ciphertext[idx:idx+32*1024+28] chunk_nonce = ciphertext_chunk[:12] chunk_payload = ciphertext_chunk[12:-16] chunk_tag = ciphertext_chunk[-16:] # Chunk is encrypted with AES-GCM - chunk number and header nonce are passed as AAD # cryptography.hazmat.primitives.ciphers.aead.AESGCM does not support multiple AAD so must do this manually cipher = Cipher(AES(content_key), GCM(chunk_nonce, chunk_tag)).decryptor() cipher.authenticate_additional_data(struct.pack('>Q', chunk_num)) cipher.authenticate_additional_data(header_nonce) plaintext_chunk = cipher.update(chunk_payload) + cipher.finalize() plaintext.extend(plaintext_chunk) return bytes(plaintext)