2024-05-27 22:23:29 +10:00

256 lines
10 KiB
Python

# cryptomator-utils: Python utilities for inspecting Cryptomator drives
# Copyright (C) 2024 Lee Yingtong Li (RunasSudo)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from .aes import aes_siv_decrypt, aes_siv_encrypt
from .b64url import b64url_decode, b64url_encode
from cryptography.hazmat.primitives.ciphers import Cipher
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from cryptography.hazmat.primitives.ciphers.algorithms import AES
from cryptography.hazmat.primitives.ciphers.modes import GCM
from cryptography.hazmat.primitives.kdf.scrypt import Scrypt
from cryptography.hazmat.primitives.keywrap import aes_key_unwrap, InvalidUnwrap
import base64
import getpass
import hashlib
import hmac
import json
import os
import struct
import sys
def load_vault_config(vault_config_path):
"""
Load and verify a vault.cryptomator configuration file and associated master key
Returns (primary_master_key, hmac_master_key)
"""
# ------------------
# Parse vault config
with open(vault_config_path, 'r') as vault_config_file:
vault_config_data = vault_config_file.read()
# Split JWT header, payload and signature
vault_config_jwt_parts = vault_config_data.split('.')
if len(vault_config_jwt_parts) != 3:
print('Error: Invalid JWT (got {} parts, expected 3)'.format(len(vault_config_jwt_parts)), file=sys.stderr)
sys.exit(1)
# Parse JWT header and payload
try:
vault_config_header = json.loads(b64url_decode(vault_config_jwt_parts[0]).decode('utf-8'))
vault_config_payload = json.loads(b64url_decode(vault_config_jwt_parts[1]).decode('utf-8'))
except json.JSONDecodeError as ex:
print('Error: Malformed JWT (invalid JSON)', file=sys.stderr)
import traceback; traceback.print_exc()
sys.exit(1)
# Validate settings
if vault_config_header['typ'] != 'JWT':
print('Error: Malformed JWT (no "typ" in header)', file=sys.stderr)
sys.exit(1)
if vault_config_header['alg'] != 'HS256':
print('Error: Unsupported JWT algorithm (got {}, expected HS256)'.format(vault_config_header['alg']), file=sys.stderr)
sys.exit(1)
if vault_config_payload['format'] != 8: # Current Cryptomator vault format
print('Error: Unsupported vault format (got {}, expected 8)'.format(vault_config_payload['format']), file=sys.stderr)
sys.exit(1)
if vault_config_payload['cipherCombo'] != 'SIV_GCM':
print('Error: Unsupported vault cipher mode (got {}, expected SIV_GCM)'.format(vault_config_payload['cipherCombo']), file=sys.stderr)
sys.exit(1)
# -------------
# Read key file
if not vault_config_header['kid'].startswith('masterkeyfile:'):
print('Error: Unsupported vault master key ID (got {}, expected masterkeyfile)'.format(vault_config_header['kid']), file=sys.stderr)
sys.exit(1)
master_key_path = os.path.join(os.path.split(vault_config_path)[0], vault_config_header['kid'][len('masterkeyfile:'):])
with open(master_key_path, 'r') as master_key_file:
master_key_config = json.load(master_key_file)
# ----------
# Derive KEK
master_password = getpass.getpass()
kek = Scrypt(
base64.b64decode(master_key_config['scryptSalt']),
32, # Masterkey.SUBKEY_LEN_BYTES in cryptolib
master_key_config['scryptCostParam'],
master_key_config['scryptBlockSize'],
1 # Scrypt.P in cryptolib
).derive(master_password.encode('utf-8'))
# -----------------------------------------
# Unwrap encryption keys using AES Key Wrap
try:
primary_master_key = aes_key_unwrap(kek, base64.b64decode(master_key_config['primaryMasterKey']))
hmac_master_key = aes_key_unwrap(kek, base64.b64decode(master_key_config['hmacMasterKey']))
except InvalidUnwrap:
print('Error: Incorrect password', file=sys.stderr)
sys.exit(1)
# -------------------------------
# Validate vault config signature
# HMAC secret is the combined "masterkey" (primary_master_key + hmac_master_key) - see Masterkey class in cryptolib
expected_signature = hmac.digest(primary_master_key + hmac_master_key, (vault_config_jwt_parts[0] + '.' + vault_config_jwt_parts[1]).encode('utf-8'), 'SHA256')
if b64url_encode(expected_signature) != vault_config_jwt_parts[2]:
print('Error: Invalid vault configuration file signature', file=sys.stderr)
sys.exit(1)
return primary_master_key, hmac_master_key
def hash_directory_id(primary_master_key, hmac_master_key, directory_id):
"""
Compute a hashed encrypted directory ID for the given plaintext directory ID
"""
# Directory ID is encrypted with AES-SIV and hashed
encrypted_directory_id = aes_siv_encrypt(primary_master_key, hmac_master_key, directory_id.encode('utf-8'), None)
hashed_directory_id = base64.b32encode(hashlib.sha1(encrypted_directory_id).digest()).decode('utf-8')
return hashed_directory_id
def directory_path_to_id(vault_path, primary_master_key, hmac_master_key, directory_path):
"""
Recurse the drive to resolve the directory ID for the directory at the given plaintext path
"""
directory_path_parts = directory_path.strip('/').split('/')
# Begin in root directory
# The root directory in Cryptomator has an empty directory ID
directory_id = ''
# Traverse path
for path_part in directory_path_parts:
if not path_part:
continue
# Hash the current directory ID
hashed_directory_id = hash_directory_id(primary_master_key, hmac_master_key, directory_id)
# Look up the encrypted path_part in the current directory
encrypted_filename = encrypt_filename(primary_master_key, hmac_master_key, directory_id, path_part)
# Get the directory ID of the part_path
subdirectory_dir_file = os.path.join(vault_path, 'd', hashed_directory_id[:2], hashed_directory_id[2:], encrypted_filename, 'dir.c9r')
with open(subdirectory_dir_file, 'r') as f:
new_directory_id = f.read()
# Traverse to the new directory ID
directory_id = new_directory_id
return directory_id
def list_directory(vault_path, primary_master_key, hmac_master_key, directory_id):
"""
Return a list of files and directories in the directory with the given plaintext directory ID
"""
hashed_directory_id = hash_directory_id(primary_master_key, hmac_master_key, directory_id)
# Check directory ID given by dirid.c9r matches the expected directory ID
stored_directory_id = decrypt_file(vault_path, primary_master_key, hashed_directory_id, 'dirid.c9r')
if stored_directory_id != directory_id.encode('utf-8'):
print('Error: Unexpected dirid.c9r (got "{}", expected "{}")'.format(stored_directory_id.decode('utf-8'), directory_id), file=sys.stderr)
sys.exit(1)
# List directory contents
directory_contents = []
for entry in os.scandir(os.path.join(vault_path, 'd', hashed_directory_id[:2], hashed_directory_id[2:])):
if entry.name == 'dirid.c9r':
continue
elif entry.name.endswith('.c9r'):
directory_contents.append(decrypt_filename(primary_master_key, hmac_master_key, directory_id, entry.name))
elif entry.name.endswith('.c9s'):
print('Warning: Unsupported entry with long filename "{}" - not yet implemented'.format(entry.name), file=sys.stderr)
else:
print('Warning: Unknown file "{}" - ignoring'.format(entry.name), file=sys.stderr)
return directory_contents
def encrypt_filename(primary_master_key, hmac_master_key, directory_id, filename):
"""
Return the encrypted filename for a given plaintext filename within a directory with the given plaintext directory ID
"""
# Filename is encrypted with AES-SIV, passing the plaintext directory ID as AAD
return b64url_encode(aes_siv_encrypt(primary_master_key, hmac_master_key, filename.encode('utf-8'), [directory_id.encode('utf-8')]), strip_padding=False) + '.c9r'
def decrypt_filename(primary_master_key, hmac_master_key, directory_id, encrypted_filename):
"""
Return the plaintext filename for a given encrypted filename within a directory with the given plaintext directory ID
"""
if not encrypted_filename.endswith('.c9r'):
raise ValueError('Encrypted filename must end with .c9r')
# Reverse the AES-SIV encryption, passing the plaintext directory ID as AAD
ciphertext_filename = b64url_decode(encrypted_filename[:-len('.c9r')])
plaintext_filename = aes_siv_decrypt(primary_master_key, hmac_master_key, ciphertext_filename, [directory_id.encode('utf-8')]).decode('utf-8')
return plaintext_filename
def decrypt_file(vault_path, primary_master_key, hashed_directory_id, filename):
"""
Return the decrypted contents of the file given by the plaintext filename and hashed directory ID
"""
# Read the encrypted contents
encrypted_file_path = os.path.join(vault_path, 'd', hashed_directory_id[:2], hashed_directory_id[2:], filename)
with open(encrypted_file_path, 'rb') as f:
ciphertext = f.read()
# Read header
ciphertext_header = ciphertext[:68]
header_nonce = ciphertext_header[:12]
header_payload = ciphertext_header[12:-16]
header_tag = ciphertext_header[-16:]
# Decrypt header to obtain the content key
plaintext_header = AESGCM(primary_master_key).decrypt(header_nonce, header_payload + header_tag, None)
content_key = plaintext_header[8:]
# Decrypt file in chunks corresponding to 32 KiB plaintext
plaintext = bytearray()
for chunk_num, idx in enumerate(range(68, len(ciphertext), 32*1024 + 28)): # Skip 68 byte header; each encrypted chunk is 32 KiB + 28 byte overhead
ciphertext_chunk = ciphertext[idx:idx+32*1024+28]
chunk_nonce = ciphertext_chunk[:12]
chunk_payload = ciphertext_chunk[12:-16]
chunk_tag = ciphertext_chunk[-16:]
# Chunk is encrypted with AES-GCM - chunk number and header nonce are passed as AAD
# cryptography.hazmat.primitives.ciphers.aead.AESGCM does not support multiple AAD so must do this manually
cipher = Cipher(AES(content_key), GCM(chunk_nonce, chunk_tag)).decryptor()
cipher.authenticate_additional_data(struct.pack('>Q', chunk_num))
cipher.authenticate_additional_data(header_nonce)
plaintext_chunk = cipher.update(chunk_payload) + cipher.finalize()
plaintext.extend(plaintext_chunk)
return bytes(plaintext)