Refactor and add comments
This commit is contained in:
parent
f0cf05824a
commit
5040f11fc9
2
.gitignore
vendored
Normal file
2
.gitignore
vendored
Normal file
@ -0,0 +1,2 @@
|
|||||||
|
__pycache__
|
||||||
|
*.pyc
|
0
lib_cryptomator_utils/__init__.py
Normal file
0
lib_cryptomator_utils/__init__.py
Normal file
55
lib_cryptomator_utils/aes.py
Normal file
55
lib_cryptomator_utils/aes.py
Normal file
@ -0,0 +1,55 @@
|
|||||||
|
# cryptomator-utils: Python utilities for inspecting Cryptomator drives
|
||||||
|
# Copyright (C) 2024 Lee Yingtong Li (RunasSudo)
|
||||||
|
#
|
||||||
|
# This program is free software: you can redistribute it and/or modify
|
||||||
|
# it under the terms of the GNU Affero General Public License as published by
|
||||||
|
# the Free Software Foundation, either version 3 of the License, or
|
||||||
|
# (at your option) any later version.
|
||||||
|
#
|
||||||
|
# This program is distributed in the hope that it will be useful,
|
||||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
# GNU Affero General Public License for more details.
|
||||||
|
#
|
||||||
|
# You should have received a copy of the GNU Affero General Public License
|
||||||
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
from Crypto.Cipher import AES
|
||||||
|
from cryptography.hazmat.primitives.ciphers.aead import AESSIV
|
||||||
|
|
||||||
|
def aes_siv_encrypt(primary_master_key, hmac_master_key, plaintext, associated_data):
|
||||||
|
"""
|
||||||
|
Encrypt the given bytes using AES-SIV
|
||||||
|
"""
|
||||||
|
|
||||||
|
if len(plaintext) == 0:
|
||||||
|
# Must use PyCryptodome
|
||||||
|
# https://github.com/pyca/cryptography/issues/10958 - cryptography AESSIV does not accept empty plaintext (e.g. root directory has empty directory ID)
|
||||||
|
if associated_data:
|
||||||
|
if len(associated_data) > 1:
|
||||||
|
# Incompatible with PyCryptodome
|
||||||
|
raise ValueError('Cannot encrypt zero-length plaintext with AES-SIV with >1 associated data')
|
||||||
|
|
||||||
|
if not associated_data[0]:
|
||||||
|
# Incompatible with PyCryptodome
|
||||||
|
raise ValueError('Cannot encrypt zero-length plaintext with AES-SIV with zero-length associated data')
|
||||||
|
|
||||||
|
# If there is only one associated data, this is equivalent to the nonce, so we can use PyCryptodome
|
||||||
|
ciphertext, tag = AES.new(hmac_master_key + primary_master_key, AES.MODE_SIV, nonce=associated_data[0]).encrypt_and_digest(plaintext)
|
||||||
|
return tag + ciphertext
|
||||||
|
|
||||||
|
# Zero-length plaintext with no AAD - encrypt with PyCryptodome
|
||||||
|
ciphertext, tag = AES.new(hmac_master_key + primary_master_key, AES.MODE_SIV).encrypt_and_digest(plaintext)
|
||||||
|
return tag + ciphertext
|
||||||
|
|
||||||
|
# In all other cases, use cryptography AESSIV
|
||||||
|
tag_and_ciphertext = AESSIV(hmac_master_key + primary_master_key).encrypt(plaintext, associated_data)
|
||||||
|
return tag_and_ciphertext
|
||||||
|
|
||||||
|
def aes_siv_decrypt(primary_master_key, hmac_master_key, tag_and_ciphertext, associated_data):
|
||||||
|
"""
|
||||||
|
Decrypt the given AES-SIV ciphertext
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Use cryptography AESSIV
|
||||||
|
return AESSIV(hmac_master_key + primary_master_key).decrypt(tag_and_ciphertext, associated_data)
|
35
lib_cryptomator_utils/b64url.py
Normal file
35
lib_cryptomator_utils/b64url.py
Normal file
@ -0,0 +1,35 @@
|
|||||||
|
# cryptomator-utils: Python utilities for inspecting Cryptomator drives
|
||||||
|
# Copyright (C) 2024 Lee Yingtong Li (RunasSudo)
|
||||||
|
#
|
||||||
|
# This program is free software: you can redistribute it and/or modify
|
||||||
|
# it under the terms of the GNU Affero General Public License as published by
|
||||||
|
# the Free Software Foundation, either version 3 of the License, or
|
||||||
|
# (at your option) any later version.
|
||||||
|
#
|
||||||
|
# This program is distributed in the hope that it will be useful,
|
||||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
# GNU Affero General Public License for more details.
|
||||||
|
#
|
||||||
|
# You should have received a copy of the GNU Affero General Public License
|
||||||
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
import base64
|
||||||
|
|
||||||
|
def b64url_decode(s):
|
||||||
|
"""
|
||||||
|
Decode the Base64URL (optional padding) encoded string
|
||||||
|
"""
|
||||||
|
|
||||||
|
padding_len = -(len(s) % -4)
|
||||||
|
return base64.urlsafe_b64decode(s + '=' * padding_len)
|
||||||
|
|
||||||
|
def b64url_encode(s, strip_padding=True):
|
||||||
|
"""
|
||||||
|
Encode the bytes with Base64URL, optionally stripping padding
|
||||||
|
"""
|
||||||
|
|
||||||
|
result = base64.urlsafe_b64encode(s)
|
||||||
|
if strip_padding:
|
||||||
|
result = result.rstrip(b'=')
|
||||||
|
return result.decode('utf-8')
|
223
lib_cryptomator_utils/cryptomator.py
Normal file
223
lib_cryptomator_utils/cryptomator.py
Normal file
@ -0,0 +1,223 @@
|
|||||||
|
# cryptomator-utils: Python utilities for inspecting Cryptomator drives
|
||||||
|
# Copyright (C) 2024 Lee Yingtong Li (RunasSudo)
|
||||||
|
#
|
||||||
|
# This program is free software: you can redistribute it and/or modify
|
||||||
|
# it under the terms of the GNU Affero General Public License as published by
|
||||||
|
# the Free Software Foundation, either version 3 of the License, or
|
||||||
|
# (at your option) any later version.
|
||||||
|
#
|
||||||
|
# This program is distributed in the hope that it will be useful,
|
||||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
# GNU Affero General Public License for more details.
|
||||||
|
#
|
||||||
|
# You should have received a copy of the GNU Affero General Public License
|
||||||
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
from .aes import aes_siv_decrypt, aes_siv_encrypt
|
||||||
|
from .b64url import b64url_decode, b64url_encode
|
||||||
|
|
||||||
|
from Crypto.Cipher import AES
|
||||||
|
from cryptography.hazmat.primitives.kdf.scrypt import Scrypt
|
||||||
|
from cryptography.hazmat.primitives.keywrap import aes_key_unwrap, InvalidUnwrap
|
||||||
|
|
||||||
|
import base64
|
||||||
|
import getpass
|
||||||
|
import hashlib
|
||||||
|
import hmac
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
import struct
|
||||||
|
import sys
|
||||||
|
|
||||||
|
def load_vault_config(vault_config_path):
|
||||||
|
"""
|
||||||
|
Load and verify a vault.cryptomator configuration file and associated master key
|
||||||
|
|
||||||
|
Returns (primary_master_key, hmac_master_key)
|
||||||
|
"""
|
||||||
|
|
||||||
|
# ------------------
|
||||||
|
# Parse vault config
|
||||||
|
|
||||||
|
with open(vault_config_path, 'r') as vault_config_file:
|
||||||
|
vault_config_data = vault_config_file.read()
|
||||||
|
|
||||||
|
# Split JWT header, payload and signature
|
||||||
|
vault_config_jwt_parts = vault_config_data.split('.')
|
||||||
|
|
||||||
|
if len(vault_config_jwt_parts) != 3:
|
||||||
|
print('Error: Invalid JWT (got {} parts, expected 3)'.format(len(vault_config_jwt_parts)))
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
# Parse JWT header and payload
|
||||||
|
try:
|
||||||
|
vault_config_header = json.loads(b64url_decode(vault_config_jwt_parts[0]).decode('utf-8'))
|
||||||
|
vault_config_payload = json.loads(b64url_decode(vault_config_jwt_parts[1]).decode('utf-8'))
|
||||||
|
except json.JSONDecodeError as ex:
|
||||||
|
print('Error: Malformed JWT (invalid JSON)')
|
||||||
|
import traceback; traceback.print_exc()
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
# Validate settings
|
||||||
|
if vault_config_header['typ'] != 'JWT':
|
||||||
|
print('Error: Malformed JWT (no "typ" in header)')
|
||||||
|
sys.exit(1)
|
||||||
|
if vault_config_header['alg'] != 'HS256':
|
||||||
|
print('Error: Unsupported JWT algorithm (got {}, expected HS256)'.format(vault_config_header['alg']))
|
||||||
|
sys.exit(1)
|
||||||
|
if vault_config_payload['format'] != 8: # Current Cryptomator vault format
|
||||||
|
print('Error: Unsupported vault format (got {}, expected 8)'.format(vault_config_payload['format']))
|
||||||
|
sys.exit(1)
|
||||||
|
if vault_config_payload['cipherCombo'] != 'SIV_GCM':
|
||||||
|
print('Error: Unsupported vault cipher mode (got {}, expected SIV_GCM)'.format(vault_config_payload['cipherCombo']))
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
# -------------
|
||||||
|
# Read key file
|
||||||
|
|
||||||
|
if not vault_config_header['kid'].startswith('masterkeyfile:'):
|
||||||
|
print('Error: Unsupported vault master key ID (got {}, expected masterkeyfile)'.format(vault_config_header['kid']))
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
master_key_path = os.path.join(os.path.split(vault_config_path)[0], vault_config_header['kid'][len('masterkeyfile:'):])
|
||||||
|
|
||||||
|
with open(master_key_path, 'r') as master_key_file:
|
||||||
|
master_key_config = json.load(master_key_file)
|
||||||
|
|
||||||
|
# ----------
|
||||||
|
# Derive KEK
|
||||||
|
|
||||||
|
master_password = getpass.getpass()
|
||||||
|
|
||||||
|
kek = Scrypt(
|
||||||
|
base64.b64decode(master_key_config['scryptSalt']),
|
||||||
|
32, # Masterkey.SUBKEY_LEN_BYTES in cryptolib
|
||||||
|
master_key_config['scryptCostParam'],
|
||||||
|
master_key_config['scryptBlockSize'],
|
||||||
|
1 # Scrypt.P in cryptolib
|
||||||
|
).derive(master_password.encode('utf-8'))
|
||||||
|
|
||||||
|
# -----------------------------------------
|
||||||
|
# Unwrap encryption keys using AES Key Wrap
|
||||||
|
|
||||||
|
try:
|
||||||
|
primary_master_key = aes_key_unwrap(kek, base64.b64decode(master_key_config['primaryMasterKey']))
|
||||||
|
hmac_master_key = aes_key_unwrap(kek, base64.b64decode(master_key_config['hmacMasterKey']))
|
||||||
|
except InvalidUnwrap:
|
||||||
|
print('Error: Incorrect password')
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
# -------------------------------
|
||||||
|
# Validate vault config signature
|
||||||
|
|
||||||
|
# HMAC secret is the combined "masterkey" (primary_master_key + hmac_master_key) - see Masterkey class in cryptolib
|
||||||
|
expected_signature = hmac.digest(primary_master_key + hmac_master_key, (vault_config_jwt_parts[0] + '.' + vault_config_jwt_parts[1]).encode('utf-8'), 'SHA256')
|
||||||
|
|
||||||
|
if b64url_encode(expected_signature) != vault_config_jwt_parts[2]:
|
||||||
|
print('Error: Invalid vault configuration file signature')
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
return primary_master_key, hmac_master_key
|
||||||
|
|
||||||
|
def hash_directory_id(primary_master_key, hmac_master_key, directory_id):
|
||||||
|
"""
|
||||||
|
Compute a hashed encrypted directory ID for the given plaintext directory ID
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Directory ID is encrypted with AES-SIV and hashed
|
||||||
|
encrypted_directory_id = aes_siv_encrypt(primary_master_key, hmac_master_key, directory_id.encode('utf-8'), None)
|
||||||
|
hashed_directory_id = base64.b32encode(hashlib.sha1(encrypted_directory_id).digest()).decode('utf-8')
|
||||||
|
return hashed_directory_id
|
||||||
|
|
||||||
|
def list_directory(vault_path, primary_master_key, hmac_master_key, directory_id):
|
||||||
|
"""
|
||||||
|
Return a list of files and directories in the directory with the given plaintext directory ID
|
||||||
|
"""
|
||||||
|
|
||||||
|
hashed_directory_id = hash_directory_id(primary_master_key, hmac_master_key, directory_id)
|
||||||
|
|
||||||
|
# Check directory ID given by dirid.c9r matches the expected directory ID
|
||||||
|
stored_directory_id = decrypt_file(vault_path, primary_master_key, hashed_directory_id, 'dirid.c9r')
|
||||||
|
if stored_directory_id != directory_id.encode('utf-8'):
|
||||||
|
print('Error: Unexpected dirid.c9r (got "{}", expected "{}")'.format(stored_directory_id.decode('utf-8'), directory_id))
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
# List directory contents
|
||||||
|
directory_contents = []
|
||||||
|
|
||||||
|
for entry in os.scandir(os.path.join(vault_path, 'd', hashed_directory_id[:2], hashed_directory_id[2:])):
|
||||||
|
if entry.name == 'dirid.c9r':
|
||||||
|
continue
|
||||||
|
elif entry.name.endswith('.c9r'):
|
||||||
|
directory_contents.append(decrypt_filename(primary_master_key, hmac_master_key, directory_id, entry.name))
|
||||||
|
elif entry.name.endswith('.c9s'):
|
||||||
|
print('Warning: Unsupported entry with long filename "{}" - not yet implemented'.format(entry.name))
|
||||||
|
else:
|
||||||
|
print('Warning: Unknown file "{}" - ignoring'.format(entry.name))
|
||||||
|
|
||||||
|
return directory_contents
|
||||||
|
|
||||||
|
def encrypt_filename(primary_master_key, hmac_master_key, directory_id, filename):
|
||||||
|
"""
|
||||||
|
Return the encrypted filename for a given plaintext filename within a directory with the given plaintext directory ID
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Filename is encrypted with AES-SIV, passing the plaintext directory ID as AAD
|
||||||
|
return b64url_encode(aes_siv_encrypt(primary_master_key, hmac_master_key, filename.encode('utf-8'), [directory_id.encode('utf-8')]), strip_padding=False) + '.c9r'
|
||||||
|
|
||||||
|
def decrypt_filename(primary_master_key, hmac_master_key, directory_id, encrypted_filename):
|
||||||
|
"""
|
||||||
|
Return the plaintext filename for a given encrypted filename within a directory with the given plaintext directory ID
|
||||||
|
"""
|
||||||
|
|
||||||
|
if not encrypted_filename.endswith('.c9r'):
|
||||||
|
raise ValueError('Encrypted filename must end with .c9r')
|
||||||
|
|
||||||
|
# Reverse the AES-SIV encryption, passing the plaintext directory ID as AAD
|
||||||
|
ciphertext_filename = b64url_decode(encrypted_filename[:-len('.c9r')])
|
||||||
|
plaintext_filename = aes_siv_decrypt(primary_master_key, hmac_master_key, ciphertext_filename, [directory_id.encode('utf-8')]).decode('utf-8')
|
||||||
|
return plaintext_filename
|
||||||
|
|
||||||
|
def decrypt_file(vault_path, primary_master_key, hashed_directory_id, filename):
|
||||||
|
"""
|
||||||
|
Return the decrypted contents of the file given by the plaintext filename and hashed directory ID
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Read the encrypted contents
|
||||||
|
encrypted_file_path = os.path.join(vault_path, 'd', hashed_directory_id[:2], hashed_directory_id[2:], filename)
|
||||||
|
with open(encrypted_file_path, 'rb') as f:
|
||||||
|
ciphertext = f.read()
|
||||||
|
|
||||||
|
# Read header
|
||||||
|
ciphertext_header = ciphertext[:68]
|
||||||
|
header_nonce = ciphertext_header[:12]
|
||||||
|
header_payload = ciphertext_header[12:-16]
|
||||||
|
header_tag = ciphertext_header[-16:]
|
||||||
|
|
||||||
|
# Decrypt header to obtain the content key
|
||||||
|
cipher = AES.new(primary_master_key, AES.MODE_GCM, nonce=header_nonce)
|
||||||
|
plaintext_header = cipher.decrypt_and_verify(header_payload, header_tag)
|
||||||
|
content_key = plaintext_header[8:]
|
||||||
|
|
||||||
|
# Decrypt file in chunks corresponding to 32 KiB plaintext
|
||||||
|
plaintext = bytearray()
|
||||||
|
|
||||||
|
for chunk_num, idx in enumerate(range(68, len(ciphertext), 32*1024 + 28)): # Skip 68 byte header; each encrypted chunk is 32 KiB + 28 byte overhead
|
||||||
|
ciphertext_chunk = ciphertext[idx:idx+32*1024+28]
|
||||||
|
chunk_nonce = ciphertext_chunk[:12]
|
||||||
|
chunk_payload = ciphertext_chunk[12:-16]
|
||||||
|
chunk_tag = ciphertext_chunk[-16:]
|
||||||
|
|
||||||
|
# Chunk is encrypted with AES-GCM
|
||||||
|
cipher = AES.new(content_key, AES.MODE_GCM, nonce=chunk_nonce)
|
||||||
|
|
||||||
|
# Chunk number and header nonce are passed as AAD
|
||||||
|
# Contrary to the Cryptomator documentation, chunk number is a 64-bit not 32-bit integer - https://github.com/cryptomator/docs/pull/54
|
||||||
|
cipher.update(struct.pack('>Q', chunk_num))
|
||||||
|
cipher.update(header_nonce)
|
||||||
|
|
||||||
|
plaintext_chunk = cipher.decrypt_and_verify(chunk_payload, chunk_tag)
|
||||||
|
plaintext.extend(plaintext_chunk)
|
||||||
|
|
||||||
|
return bytes(plaintext)
|
@ -16,19 +16,9 @@
|
|||||||
# You should have received a copy of the GNU Affero General Public License
|
# You should have received a copy of the GNU Affero General Public License
|
||||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
from Crypto.Cipher import AES
|
from lib_cryptomator_utils.cryptomator import encrypt_filename, hash_directory_id, list_directory, load_vault_config
|
||||||
|
|
||||||
from cryptography.hazmat.primitives.ciphers.aead import AESSIV
|
|
||||||
from cryptography.hazmat.primitives.kdf.scrypt import Scrypt
|
|
||||||
from cryptography.hazmat.primitives.keywrap import aes_key_unwrap, InvalidUnwrap
|
|
||||||
|
|
||||||
import base64
|
|
||||||
import getpass
|
|
||||||
import hashlib
|
|
||||||
import hmac
|
|
||||||
import json
|
|
||||||
import os
|
import os
|
||||||
import struct
|
|
||||||
import sys
|
import sys
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
@ -36,17 +26,19 @@ def main():
|
|||||||
print('Usage: {} /path/to/vault.cryptomator /plaintext/path/within/drive')
|
print('Usage: {} /path/to/vault.cryptomator /plaintext/path/within/drive')
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
|
||||||
|
# Parse CLI arguments
|
||||||
vault_config_path = sys.argv[1]
|
vault_config_path = sys.argv[1]
|
||||||
target_directory = sys.argv[2]
|
target_directory = sys.argv[2]
|
||||||
|
|
||||||
vault_path = os.path.split(vault_config_path)[0]
|
vault_path = os.path.split(vault_config_path)[0]
|
||||||
|
|
||||||
# Load vault config
|
# Load vault config (asks for password)
|
||||||
primary_master_key, hmac_master_key = load_vault_config(vault_config_path)
|
primary_master_key, hmac_master_key = load_vault_config(vault_config_path)
|
||||||
|
|
||||||
target_directory_parts = target_directory.strip('/').split('/')
|
target_directory_parts = target_directory.strip('/').split('/')
|
||||||
|
|
||||||
# Traverse root directory
|
# Begin in root directory
|
||||||
|
# The root directory in Cryptomator has an empty directory ID
|
||||||
directory_id = ''
|
directory_id = ''
|
||||||
|
|
||||||
# Traverse path
|
# Traverse path
|
||||||
@ -54,213 +46,23 @@ def main():
|
|||||||
if not path_part:
|
if not path_part:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
# Hash the current directory ID
|
||||||
hashed_directory_id = hash_directory_id(primary_master_key, hmac_master_key, directory_id)
|
hashed_directory_id = hash_directory_id(primary_master_key, hmac_master_key, directory_id)
|
||||||
subdirectory_dir_file = os.path.join(vault_path, 'd', hashed_directory_id[:2], hashed_directory_id[2:], encrypt_filename(primary_master_key, hmac_master_key, directory_id, path_part), 'dir.c9r')
|
|
||||||
|
# Look up the encrypted path_part in the current directory
|
||||||
|
encrypted_filename = encrypt_filename(primary_master_key, hmac_master_key, directory_id, path_part)
|
||||||
|
|
||||||
|
# Get the directory ID of the part_path
|
||||||
|
subdirectory_dir_file = os.path.join(vault_path, 'd', hashed_directory_id[:2], hashed_directory_id[2:], encrypted_filename, 'dir.c9r')
|
||||||
with open(subdirectory_dir_file, 'r') as f:
|
with open(subdirectory_dir_file, 'r') as f:
|
||||||
new_directory_id = f.read()
|
new_directory_id = f.read()
|
||||||
|
|
||||||
|
# Traverse to the new directory ID
|
||||||
directory_id = new_directory_id
|
directory_id = new_directory_id
|
||||||
|
|
||||||
for filename in list_directory(vault_path, primary_master_key, hmac_master_key, directory_id):
|
# Now we have reached the requested directory, so print directory listing
|
||||||
|
for filename in sorted(list_directory(vault_path, primary_master_key, hmac_master_key, directory_id)):
|
||||||
print(filename)
|
print(filename)
|
||||||
|
|
||||||
def load_vault_config(vault_config_path):
|
|
||||||
# ------------------
|
|
||||||
# Parse vault config
|
|
||||||
|
|
||||||
with open(vault_config_path, 'r') as vault_config_file:
|
|
||||||
vault_config_data = vault_config_file.read()
|
|
||||||
|
|
||||||
vault_config_jwt_parts = vault_config_data.split('.')
|
|
||||||
|
|
||||||
if len(vault_config_jwt_parts) != 3:
|
|
||||||
print('Error: Invalid JWT (got {} parts, expected 3)'.format(len(vault_config_jwt_parts)))
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
try:
|
|
||||||
vault_config_header = json.loads(b64url_decode(vault_config_jwt_parts[0]).decode('utf-8'))
|
|
||||||
vault_config_payload = json.loads(b64url_decode(vault_config_jwt_parts[1]).decode('utf-8'))
|
|
||||||
except json.JSONDecodeError as ex:
|
|
||||||
print('Error: Malformed JWT (invalid JSON)')
|
|
||||||
import traceback; traceback.print_exc()
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
if vault_config_header['typ'] != 'JWT':
|
|
||||||
print('Error: Malformed JWT (no "typ" in header)')
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
if vault_config_header['alg'] != 'HS256':
|
|
||||||
print('Error: Unsupported JWT algorithm (got {}, expected HS256)'.format(vault_config_header['alg']))
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
if vault_config_payload['format'] != 8:
|
|
||||||
print('Error: Unsupported vault format (got {}, expected 8)'.format(vault_config_payload['format']))
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
if vault_config_payload['cipherCombo'] != 'SIV_GCM':
|
|
||||||
print('Error: Unsupported vault cipher mode (got {}, expected SIV_GCM)'.format(vault_config_payload['cipherCombo']))
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
# -------------
|
|
||||||
# Read key file
|
|
||||||
|
|
||||||
if not vault_config_header['kid'].startswith('masterkeyfile:'):
|
|
||||||
print('Error: Unsupported vault master key ID (got {}, expected masterkeyfile)'.format(vault_config_header['kid']))
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
master_key_path = os.path.join(os.path.split(vault_config_path)[0], vault_config_header['kid'][len('masterkeyfile:'):])
|
|
||||||
|
|
||||||
with open(master_key_path, 'r') as master_key_file:
|
|
||||||
master_key_config = json.load(master_key_file)
|
|
||||||
|
|
||||||
# ----------
|
|
||||||
# Derive KEK
|
|
||||||
|
|
||||||
master_password = getpass.getpass()
|
|
||||||
|
|
||||||
kek = Scrypt(
|
|
||||||
base64.b64decode(master_key_config['scryptSalt']),
|
|
||||||
32, # Masterkey.SUBKEY_LEN_BYTES in cryptolib
|
|
||||||
master_key_config['scryptCostParam'],
|
|
||||||
master_key_config['scryptBlockSize'],
|
|
||||||
1 # Scrypt.P in cryptolib
|
|
||||||
).derive(master_password.encode('utf-8'))
|
|
||||||
|
|
||||||
# ----------------------
|
|
||||||
# Unwrap encryption keys
|
|
||||||
|
|
||||||
try:
|
|
||||||
primary_master_key = aes_key_unwrap(kek, base64.b64decode(master_key_config['primaryMasterKey']))
|
|
||||||
hmac_master_key = aes_key_unwrap(kek, base64.b64decode(master_key_config['hmacMasterKey']))
|
|
||||||
except InvalidUnwrap:
|
|
||||||
print('Error: Incorrect password')
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
# -------------------------------
|
|
||||||
# Validate vault config signature
|
|
||||||
|
|
||||||
# HMAC secret is the combined "masterkey" (primary_master_key + hmac_master_key) - see Masterkey class in cryptolib
|
|
||||||
expected_signature = hmac.digest(primary_master_key + hmac_master_key, (vault_config_jwt_parts[0] + '.' + vault_config_jwt_parts[1]).encode('utf-8'), 'SHA256')
|
|
||||||
|
|
||||||
if b64url_encode(expected_signature) != vault_config_jwt_parts[2]:
|
|
||||||
print('Error: Invalid vault configuration file signature')
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
return primary_master_key, hmac_master_key
|
|
||||||
|
|
||||||
def list_directory(vault_path, primary_master_key, hmac_master_key, directory_id):
|
|
||||||
hashed_directory_id = hash_directory_id(primary_master_key, hmac_master_key, directory_id)
|
|
||||||
|
|
||||||
# Check dirid.c9r matches expected
|
|
||||||
stored_directory_id = decrypt_file(vault_path, primary_master_key, hashed_directory_id, 'dirid.c9r')
|
|
||||||
if stored_directory_id != directory_id.encode('utf-8'):
|
|
||||||
print('Error: Unexpected dirid.c9r (got "{}", expected "{}")'.format(stored_directory_id.decode('utf-8'), directory_id))
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
# List directory contents
|
|
||||||
directory_contents = []
|
|
||||||
|
|
||||||
for entry in os.scandir(os.path.join(vault_path, 'd', hashed_directory_id[:2], hashed_directory_id[2:])):
|
|
||||||
if entry.name == 'dirid.c9r':
|
|
||||||
continue
|
|
||||||
elif entry.name.endswith('.c9r'):
|
|
||||||
directory_contents.append(decrypt_filename(primary_master_key, hmac_master_key, directory_id, entry.name))
|
|
||||||
elif entry.name.endswith('.c9s'):
|
|
||||||
print('Warning: Unsupported entry with long filename "{}" - not yet implemented'.format(entry.name))
|
|
||||||
else:
|
|
||||||
print('Warning: Unknown file "{}" - ignoring'.format(entry.name))
|
|
||||||
|
|
||||||
return directory_contents
|
|
||||||
|
|
||||||
def hash_directory_id(primary_master_key, hmac_master_key, directory_id):
|
|
||||||
encrypted_directory_id = aes_siv_encrypt(primary_master_key, hmac_master_key, directory_id.encode('utf-8'), None)
|
|
||||||
hashed_directory_id = base64.b32encode(hashlib.sha1(encrypted_directory_id).digest()).decode('utf-8')
|
|
||||||
return hashed_directory_id
|
|
||||||
|
|
||||||
def encrypt_filename(primary_master_key, hmac_master_key, directory_id, filename):
|
|
||||||
return b64url_encode(aes_siv_encrypt(primary_master_key, hmac_master_key, filename.encode('utf-8'), [directory_id.encode('utf-8')]), strip_padding=False) + '.c9r'
|
|
||||||
|
|
||||||
def decrypt_filename(primary_master_key, hmac_master_key, directory_id, encrypted_filename):
|
|
||||||
if not encrypted_filename.endswith('.c9r'):
|
|
||||||
raise ValueError('Encrypted filename must end with .c9r')
|
|
||||||
|
|
||||||
ciphertext_filename = b64url_decode(encrypted_filename[:-len('.c9r')])
|
|
||||||
plaintext_filename = aes_siv_decrypt(primary_master_key, hmac_master_key, ciphertext_filename, [directory_id.encode('utf-8')]).decode('utf-8')
|
|
||||||
return plaintext_filename
|
|
||||||
|
|
||||||
def decrypt_file(vault_path, primary_master_key, hashed_directory_id, filename):
|
|
||||||
encrypted_file_path = os.path.join(vault_path, 'd', hashed_directory_id[:2], hashed_directory_id[2:], filename)
|
|
||||||
|
|
||||||
with open(encrypted_file_path, 'rb') as f:
|
|
||||||
ciphertext = f.read()
|
|
||||||
|
|
||||||
# Read header
|
|
||||||
ciphertext_header = ciphertext[:68]
|
|
||||||
header_nonce = ciphertext_header[:12]
|
|
||||||
header_payload = ciphertext_header[12:-16]
|
|
||||||
header_tag = ciphertext_header[-16:]
|
|
||||||
|
|
||||||
# Decrypt header
|
|
||||||
cipher = AES.new(primary_master_key, AES.MODE_GCM, nonce=header_nonce)
|
|
||||||
plaintext_header = cipher.decrypt_and_verify(header_payload, header_tag)
|
|
||||||
|
|
||||||
content_key = plaintext_header[8:]
|
|
||||||
|
|
||||||
# Decrypt file in chunks
|
|
||||||
plaintext = bytearray()
|
|
||||||
|
|
||||||
for chunk_num, idx in enumerate(range(68, len(ciphertext), 32*1024 + 28)):
|
|
||||||
ciphertext_chunk = ciphertext[idx:idx+32*1024+28]
|
|
||||||
chunk_nonce = ciphertext_chunk[:12]
|
|
||||||
chunk_payload = ciphertext_chunk[12:-16]
|
|
||||||
chunk_tag = ciphertext_chunk[-16:]
|
|
||||||
|
|
||||||
cipher = AES.new(content_key, AES.MODE_GCM, nonce=chunk_nonce)
|
|
||||||
cipher.update(struct.pack('>Q', chunk_num)) # Contrary to documentation, this is a 64-bit not 32-bit integer (longToBigEndianByteArray in cryptolib: Long.SIZE)
|
|
||||||
cipher.update(header_nonce)
|
|
||||||
plaintext_chunk = cipher.decrypt_and_verify(chunk_payload, chunk_tag)
|
|
||||||
|
|
||||||
plaintext.extend(plaintext_chunk)
|
|
||||||
|
|
||||||
return bytes(plaintext)
|
|
||||||
|
|
||||||
def aes_siv_encrypt(primary_master_key, hmac_master_key, plaintext, associated_data):
|
|
||||||
if len(plaintext) == 0:
|
|
||||||
# Must use PyCryptodome
|
|
||||||
# https://github.com/pyca/cryptography/issues/10958 - cryptography AESSIV does not accept empty plaintext (e.g. root directory)
|
|
||||||
if associated_data:
|
|
||||||
if len(associated_data) > 1:
|
|
||||||
# Incompatible with PyCryptodome
|
|
||||||
raise ValueError('Cannot encrypt zero-length plaintext with AES-SIV with >1 associated data')
|
|
||||||
|
|
||||||
# Only one associated data - equals the nonce
|
|
||||||
|
|
||||||
if not associated_data[0]:
|
|
||||||
# Incompatible with PyCryptodome
|
|
||||||
raise ValueError('Cannot encrypt zero-length plaintext with AES-SIV with zero-length associated data')
|
|
||||||
|
|
||||||
ciphertext, tag = AES.new(hmac_master_key + primary_master_key, AES.MODE_SIV, nonce=associated_data[0]).encrypt_and_digest(plaintext)
|
|
||||||
return tag + ciphertext
|
|
||||||
|
|
||||||
ciphertext, tag = AES.new(hmac_master_key + primary_master_key, AES.MODE_SIV).encrypt_and_digest(plaintext)
|
|
||||||
return tag + ciphertext
|
|
||||||
|
|
||||||
# Otherwise use cryptography
|
|
||||||
tag_and_ciphertext = AESSIV(hmac_master_key + primary_master_key).encrypt(plaintext, associated_data)
|
|
||||||
return tag_and_ciphertext
|
|
||||||
|
|
||||||
def aes_siv_decrypt(primary_master_key, hmac_master_key, tag_and_ciphertext, associated_data):
|
|
||||||
return AESSIV(hmac_master_key + primary_master_key).decrypt(tag_and_ciphertext, associated_data)
|
|
||||||
|
|
||||||
def b64url_decode(s):
|
|
||||||
padding_len = -(len(s) % -4)
|
|
||||||
return base64.urlsafe_b64decode(s + '=' * padding_len)
|
|
||||||
|
|
||||||
def b64url_encode(s, strip_padding=True):
|
|
||||||
result = base64.urlsafe_b64encode(s)
|
|
||||||
if strip_padding:
|
|
||||||
result = result.rstrip(b'=')
|
|
||||||
return result.decode('utf-8')
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
main()
|
main()
|
||||||
|
Loading…
Reference in New Issue
Block a user