20200611, large files, new options

Signed-off-by: dfirfpi <francesco.picasso@gmail.com>
This commit is contained in:
dfirfpi 2020-06-11 00:08:31 +02:00
parent e36167743d
commit a14390724e
No known key found for this signature in database
GPG Key ID: EC61291E5446B028
2 changed files with 100 additions and 30 deletions

View File

@ -12,7 +12,7 @@ The script *assumes* that backups are encrypted with a user-provided password. A
```
usage: kobackupdec.py [-h] [-v] password backup_path dest_path
Huawei KoBackup decryptor version 20190729
Huawei KoBackup decryptor version 20200611
positional arguments:
password user password for the backup
@ -20,8 +20,10 @@ positional arguments:
dest_path decrypted backup folder
optional arguments:
-h, --help show this help message and exit
-v, --verbose verbose level, -v to -vvv
-h, --help show this help message and exit
-e, --expandtar expand tar files
-w, --writable do not set RO pemission on decrypted data
-v, --verbose verbose level, -v to -vvv
```
- `password`, is the user provided password.

View File

@ -4,6 +4,9 @@
# Huawei KoBackup backups decryptor.
#
# Version History
# - 20200611: added 'expandtar' option, to avoid automatic expansion of TARs
# added 'writable' option, to allow user RW on decrypted files
# large TAR files are not managed in chunk but not expanded
# - 20200607: merged empty CheckMsg, update folder_to_media_type by @realSnoopy
# - 20200406: merged pull by @lp4n6, related to files and folders permissions
# - 20200405: added Python minor version check and note (thanks @lp4n6)
@ -57,11 +60,13 @@ from Crypto.Hash import HMAC
from Crypto.Protocol.KDF import PBKDF2
from Crypto.Util import Counter
VERSION = '20200607'
VERSION = '20200611'
# Disabling check on doc strings and naming convention.
# pylint: disable=C0111,C0103
MAX_FILE_SIZE = 536870912 # Files larger than that needs to be 'chuncked'.
# --- DecryptMaterial ---------------------------------------------------------
class DecryptMaterial:
@ -164,6 +169,7 @@ class Decryptor:
count = 5000
dklen = 32
chunk_size = 1024*1024*64
def __init__(self, password):
'''Initialize the object by setting a password.'''
@ -308,6 +314,28 @@ class Decryptor:
decryptor = AES.new(key, mode=AES.MODE_CTR, counter=counter_obj)
return decryptor.decrypt(data)
def decrypt_large_package(self, dec_material, data):
if not self._good:
logging.warning('well, it is hard to decrypt with a wrong key.')
if not dec_material.encMsgV3:
logging.error('cannot decrypt with an empty encMsgV3!')
return None
salt = dec_material.encMsgV3[:32]
counter_iv = dec_material.encMsgV3[32:]
key = PBKDF2(self._bkey, salt, Decryptor.dklen, Decryptor.count,
Decryptor.prf, hmac_hash_module=None)
counter_obj = Counter.new(128, initial_value=int.from_bytes(
counter_iv, byteorder='big'), little_endian=False)
decryptor = AES.new(key, mode=AES.MODE_CTR, counter=counter_obj)
data_view = memoryview(data)
for x in range(0, len(data), self.chunk_size):
yield decryptor.decrypt(data_view[x:x+self.chunk_size])
def decrypt_file(self, dec_material, data):
if not self._good:
logging.warning('well, it is hard to decrypt with a wrong key.')
@ -658,9 +686,22 @@ def decrypt_entry(decrypt_info, entry, type_info, search=False):
logging.warning('entry %s has no decrypt material!', skey)
return cleartext
# --- decrypt_large_entry -----------------------------------------------------
def decrypt_large_entry(decrypt_info, entry, type_info, search=False):
skey = entry.stem
decrypt_material = decrypt_info.get_decrypt_material(skey, type_info,
search)
if decrypt_material:
for x in decrypt_info.decryptor.decrypt_large_package(
decrypt_material, entry.read_bytes()):
yield x
else:
logging.warning('entry %s has no decrypt material!', skey)
# --- decrypt_files_in_root ---------------------------------------------------
def decrypt_files_in_root(decrypt_info, path_in, path_out):
def decrypt_files_in_root(decrypt_info, path_in, path_out, expandtar):
data_apk_dir = path_out.absolute().joinpath('data/app')
data_app_dir = path_out.absolute().joinpath('data/data')
@ -695,18 +736,33 @@ def decrypt_files_in_root(decrypt_info, path_in, path_out):
else:
logging.warning('unable to decrypt entry %s', entry.name)
elif extension == '.tar':
elif extension == '.tar' and entry.stat().st_size < MAX_FILE_SIZE:
cleartext = decrypt_entry(decrypt_info, entry,
DecryptInfo.info_type.FILE)
if cleartext:
if cleartext and expandtar:
with tarfile.open(fileobj=io.BytesIO(cleartext)) as tar_data:
if os.name == 'nt':
tar_extract_win(tar_data, data_app_dir)
else:
tar_data.extractall(path=data_app_dir)
elif cleartext:
logging.info('Not expanding TAR file %s', entry.name)
dest_file = data_app_dir.joinpath(entry.name)
dest_file.parent.mkdir(0o755, parents=True, exist_ok=True)
dest_file.write_bytes(cleartext)
else:
logging.warning('unable to decrypt entry %s', entry.name)
elif extension == '.tar' and entry.stat().st_size >= MAX_FILE_SIZE:
logging.info('Decrypting LARGE entry %s', entry.name)
logging.info('TAR will not be expanded')
dest_file = data_app_dir.joinpath(entry.name)
dest_file.parent.mkdir(0o755, parents=True, exist_ok=True)
with open(dest_file, 'wb') as fd:
for x in decrypt_large_entry(decrypt_info, entry,
DecryptInfo.info_type.FILE):
fd.write(x)
else:
logging.warning('entry %s unmanged, copying it', entry.name)
dest_file = data_unk_dir.joinpath(entry.name)
@ -715,7 +771,7 @@ def decrypt_files_in_root(decrypt_info, path_in, path_out):
# --- decrypt_files_in_folder -------------------------------------------------
def decrypt_files_in_folder(decrypt_info, folder, path_out):
def decrypt_files_in_folder(decrypt_info, folder, path_out, expandtar):
folder_to_media_type = {'movies': 'video', 'pictures': 'photo',
'audios': 'audio', }
@ -723,6 +779,12 @@ def decrypt_files_in_folder(decrypt_info, folder, path_out):
media_out_dir = path_out.absolute().joinpath('storage')
media_unk_dir = path_out.absolute().joinpath('unknown')
# Dirty 'hack' to see if an XML file is inside the folder with IVs
# needed to decrypt .enc files... Not tested for side effects.
xml_files = folder.glob('*.xml')
for entry in xml_files:
parse_generic_xml(entry, decrypt_info)
for entry in folder.glob('**/*'):
if entry.is_dir():
continue
@ -773,7 +835,7 @@ def decrypt_files_in_folder(decrypt_info, folder, path_out):
if cleartext:
dest_file = media_out_dir.joinpath(entry.relative_to(folder))
dest_file.parent.mkdir(0o755, parents=True, exist_ok=True)
if entry.suffix.lower() == '.tar':
if entry.suffix.lower() == '.tar' and expandtar:
with tarfile.open(fileobj=io.BytesIO(cleartext)) as tdata:
if os.name == 'nt':
tar_extract_win(tdata, dest_file.parent)
@ -796,7 +858,7 @@ def decrypt_files_in_folder(decrypt_info, folder, path_out):
# --- decrypt_backup ----------------------------------------------------------
def decrypt_backup(password, path_in, path_out):
def decrypt_backup(password, path_in, path_out, expandtar):
decrypt_info = parse_info_xml(path_in.joinpath('info.xml'), password)
if not decrypt_info:
@ -814,15 +876,15 @@ def decrypt_backup(password, path_in, path_out):
logging.debug(decrypt_info.dump())
decrypt_files_in_root(decrypt_info, path_in, path_out)
decrypt_files_in_root(decrypt_info, path_in, path_out, expandtar)
for entry in path_in.glob('*'):
if entry.is_dir():
decrypt_files_in_folder(decrypt_info, entry, path_out)
decrypt_files_in_folder(decrypt_info, entry, path_out, expandtar)
# --- decrypt_media -----------------------------------------------------------
def decrypt_media(password, path_in, path_out):
def decrypt_media(password, path_in, path_out, expandtar):
# [TODO][TBR] Should parse media.db sqlite.
@ -844,11 +906,11 @@ def decrypt_media(password, path_in, path_out):
for entry in subfolder.glob('*'):
if entry.is_dir():
decrypt_files_in_folder(decrypt_info, entry, path_out)
decrypt_files_in_folder(decrypt_info, entry, path_out, expandtar)
# --- main --------------------------------------------------------------------
def main(password, backup_path_in, dest_path_out):
def main(password, backup_path_in, dest_path_out, expandtar, writable):
logging.info('searching backup in [%s]', backup_path_in)
@ -870,7 +932,7 @@ def main(password, backup_path_in, dest_path_out):
if files_folder:
logging.info('got info.xml, going to decrypt backup files')
decrypt_backup(password, files_folder, dest_path_out)
decrypt_backup(password, files_folder, dest_path_out, expandtar)
media_folder = None
if backup_path_in.joinpath('media').is_dir():
@ -880,17 +942,19 @@ def main(password, backup_path_in, dest_path_out):
logging.info('No media folder found.')
if media_folder:
decrypt_media(password, media_folder, dest_path_out)
decrypt_media(password, media_folder, dest_path_out, expandtar)
logging.info('setting all decrypted files to read-only')
for entry in dest_path_out.glob('**/*'):
# Set read-only permission if entry is a file.
if os.path.isfile(entry):
os.chmod(entry, 0o444)
# *nix directories require execute permission to read/traverse
elif os.path.isdir(entry):
os.chmod(entry, 0o555)
if writable:
logging.info('Not setting read-only on decrypted files')
else:
logging.info('setting all decrypted files to read-only')
for entry in dest_path_out.glob('**/*'):
# Set read-only permission if entry is a file.
if os.path.isfile(entry):
os.chmod(entry, 0o444)
# *nix directories require execute permission to read/traverse
elif os.path.isdir(entry):
os.chmod(entry, 0o555)
# --- entry point and parameters checks ---------------------------------------
@ -907,6 +971,10 @@ if __name__ == '__main__':
parser.add_argument('password', help='user password for the backup')
parser.add_argument('backup_path', help='backup folder')
parser.add_argument('dest_path', help='decrypted backup folder')
parser.add_argument('-e', '--expandtar', action='store_true',
help='expand tar files')
parser.add_argument('-w', '--writable', action='store_true',
help='do not set RO pemission on decrypted data')
parser.add_argument('-v', '--verbose', action='count',
help='verbose level, -v to -vvv')
args = parser.parse_args()
@ -932,8 +1000,8 @@ if __name__ == '__main__':
dest_path = pathlib.Path(args.dest_path)
if dest_path.is_dir():
sys.exit('Destination folder already exists!')
# Make directory with read and execute permission (=read and traverse)
dest_path.mkdir(0o755,parents=True)
main(user_password, backup_path, dest_path)
# Make directory with read and execute permission (=read and traverse)
dest_path.mkdir(0o755, parents=True)
main(user_password, backup_path, dest_path, args.expandtar, args.writable)