diff --git a/README.md b/README.md index f106e29..dea9279 100644 --- a/README.md +++ b/README.md @@ -12,7 +12,7 @@ The script *assumes* that backups are encrypted with a user-provided password. A ``` usage: kobackupdec.py [-h] [-v] password backup_path dest_path -Huawei KoBackup decryptor version 20190729 +Huawei KoBackup decryptor version 20200611 positional arguments: password user password for the backup @@ -20,8 +20,10 @@ positional arguments: dest_path decrypted backup folder optional arguments: - -h, --help show this help message and exit - -v, --verbose verbose level, -v to -vvv + -h, --help show this help message and exit + -e, --expandtar expand tar files + -w, --writable do not set RO pemission on decrypted data + -v, --verbose verbose level, -v to -vvv ``` - `password`, is the user provided password. diff --git a/kobackupdec.py b/kobackupdec.py index 0aefbc1..e2f6c9c 100644 --- a/kobackupdec.py +++ b/kobackupdec.py @@ -4,6 +4,9 @@ # Huawei KoBackup backups decryptor. # # Version History +# - 20200611: added 'expandtar' option, to avoid automatic expansion of TARs +# added 'writable' option, to allow user RW on decrypted files +# large TAR files are not managed in chunk but not expanded # - 20200607: merged empty CheckMsg, update folder_to_media_type by @realSnoopy # - 20200406: merged pull by @lp4n6, related to files and folders permissions # - 20200405: added Python minor version check and note (thanks @lp4n6) @@ -57,11 +60,13 @@ from Crypto.Hash import HMAC from Crypto.Protocol.KDF import PBKDF2 from Crypto.Util import Counter -VERSION = '20200607' +VERSION = '20200611' # Disabling check on doc strings and naming convention. # pylint: disable=C0111,C0103 +MAX_FILE_SIZE = 536870912 # Files larger than that needs to be 'chuncked'. + # --- DecryptMaterial --------------------------------------------------------- class DecryptMaterial: @@ -164,6 +169,7 @@ class Decryptor: count = 5000 dklen = 32 + chunk_size = 1024*1024*64 def __init__(self, password): '''Initialize the object by setting a password.''' @@ -308,6 +314,28 @@ class Decryptor: decryptor = AES.new(key, mode=AES.MODE_CTR, counter=counter_obj) return decryptor.decrypt(data) + def decrypt_large_package(self, dec_material, data): + if not self._good: + logging.warning('well, it is hard to decrypt with a wrong key.') + + if not dec_material.encMsgV3: + logging.error('cannot decrypt with an empty encMsgV3!') + return None + + salt = dec_material.encMsgV3[:32] + counter_iv = dec_material.encMsgV3[32:] + + key = PBKDF2(self._bkey, salt, Decryptor.dklen, Decryptor.count, + Decryptor.prf, hmac_hash_module=None) + + counter_obj = Counter.new(128, initial_value=int.from_bytes( + counter_iv, byteorder='big'), little_endian=False) + + decryptor = AES.new(key, mode=AES.MODE_CTR, counter=counter_obj) + data_view = memoryview(data) + for x in range(0, len(data), self.chunk_size): + yield decryptor.decrypt(data_view[x:x+self.chunk_size]) + def decrypt_file(self, dec_material, data): if not self._good: logging.warning('well, it is hard to decrypt with a wrong key.') @@ -658,9 +686,22 @@ def decrypt_entry(decrypt_info, entry, type_info, search=False): logging.warning('entry %s has no decrypt material!', skey) return cleartext +# --- decrypt_large_entry ----------------------------------------------------- + +def decrypt_large_entry(decrypt_info, entry, type_info, search=False): + skey = entry.stem + decrypt_material = decrypt_info.get_decrypt_material(skey, type_info, + search) + if decrypt_material: + for x in decrypt_info.decryptor.decrypt_large_package( + decrypt_material, entry.read_bytes()): + yield x + else: + logging.warning('entry %s has no decrypt material!', skey) + # --- decrypt_files_in_root --------------------------------------------------- -def decrypt_files_in_root(decrypt_info, path_in, path_out): +def decrypt_files_in_root(decrypt_info, path_in, path_out, expandtar): data_apk_dir = path_out.absolute().joinpath('data/app') data_app_dir = path_out.absolute().joinpath('data/data') @@ -695,18 +736,33 @@ def decrypt_files_in_root(decrypt_info, path_in, path_out): else: logging.warning('unable to decrypt entry %s', entry.name) - elif extension == '.tar': + elif extension == '.tar' and entry.stat().st_size < MAX_FILE_SIZE: cleartext = decrypt_entry(decrypt_info, entry, DecryptInfo.info_type.FILE) - if cleartext: + if cleartext and expandtar: with tarfile.open(fileobj=io.BytesIO(cleartext)) as tar_data: if os.name == 'nt': tar_extract_win(tar_data, data_app_dir) else: tar_data.extractall(path=data_app_dir) + elif cleartext: + logging.info('Not expanding TAR file %s', entry.name) + dest_file = data_app_dir.joinpath(entry.name) + dest_file.parent.mkdir(0o755, parents=True, exist_ok=True) + dest_file.write_bytes(cleartext) else: logging.warning('unable to decrypt entry %s', entry.name) + elif extension == '.tar' and entry.stat().st_size >= MAX_FILE_SIZE: + logging.info('Decrypting LARGE entry %s', entry.name) + logging.info('TAR will not be expanded') + dest_file = data_app_dir.joinpath(entry.name) + dest_file.parent.mkdir(0o755, parents=True, exist_ok=True) + with open(dest_file, 'wb') as fd: + for x in decrypt_large_entry(decrypt_info, entry, + DecryptInfo.info_type.FILE): + fd.write(x) + else: logging.warning('entry %s unmanged, copying it', entry.name) dest_file = data_unk_dir.joinpath(entry.name) @@ -715,7 +771,7 @@ def decrypt_files_in_root(decrypt_info, path_in, path_out): # --- decrypt_files_in_folder ------------------------------------------------- -def decrypt_files_in_folder(decrypt_info, folder, path_out): +def decrypt_files_in_folder(decrypt_info, folder, path_out, expandtar): folder_to_media_type = {'movies': 'video', 'pictures': 'photo', 'audios': 'audio', } @@ -723,6 +779,12 @@ def decrypt_files_in_folder(decrypt_info, folder, path_out): media_out_dir = path_out.absolute().joinpath('storage') media_unk_dir = path_out.absolute().joinpath('unknown') + # Dirty 'hack' to see if an XML file is inside the folder with IVs + # needed to decrypt .enc files... Not tested for side effects. + xml_files = folder.glob('*.xml') + for entry in xml_files: + parse_generic_xml(entry, decrypt_info) + for entry in folder.glob('**/*'): if entry.is_dir(): continue @@ -773,7 +835,7 @@ def decrypt_files_in_folder(decrypt_info, folder, path_out): if cleartext: dest_file = media_out_dir.joinpath(entry.relative_to(folder)) dest_file.parent.mkdir(0o755, parents=True, exist_ok=True) - if entry.suffix.lower() == '.tar': + if entry.suffix.lower() == '.tar' and expandtar: with tarfile.open(fileobj=io.BytesIO(cleartext)) as tdata: if os.name == 'nt': tar_extract_win(tdata, dest_file.parent) @@ -796,7 +858,7 @@ def decrypt_files_in_folder(decrypt_info, folder, path_out): # --- decrypt_backup ---------------------------------------------------------- -def decrypt_backup(password, path_in, path_out): +def decrypt_backup(password, path_in, path_out, expandtar): decrypt_info = parse_info_xml(path_in.joinpath('info.xml'), password) if not decrypt_info: @@ -814,15 +876,15 @@ def decrypt_backup(password, path_in, path_out): logging.debug(decrypt_info.dump()) - decrypt_files_in_root(decrypt_info, path_in, path_out) + decrypt_files_in_root(decrypt_info, path_in, path_out, expandtar) for entry in path_in.glob('*'): if entry.is_dir(): - decrypt_files_in_folder(decrypt_info, entry, path_out) + decrypt_files_in_folder(decrypt_info, entry, path_out, expandtar) # --- decrypt_media ----------------------------------------------------------- -def decrypt_media(password, path_in, path_out): +def decrypt_media(password, path_in, path_out, expandtar): # [TODO][TBR] Should parse media.db sqlite. @@ -844,11 +906,11 @@ def decrypt_media(password, path_in, path_out): for entry in subfolder.glob('*'): if entry.is_dir(): - decrypt_files_in_folder(decrypt_info, entry, path_out) + decrypt_files_in_folder(decrypt_info, entry, path_out, expandtar) # --- main -------------------------------------------------------------------- -def main(password, backup_path_in, dest_path_out): +def main(password, backup_path_in, dest_path_out, expandtar, writable): logging.info('searching backup in [%s]', backup_path_in) @@ -870,7 +932,7 @@ def main(password, backup_path_in, dest_path_out): if files_folder: logging.info('got info.xml, going to decrypt backup files') - decrypt_backup(password, files_folder, dest_path_out) + decrypt_backup(password, files_folder, dest_path_out, expandtar) media_folder = None if backup_path_in.joinpath('media').is_dir(): @@ -880,17 +942,19 @@ def main(password, backup_path_in, dest_path_out): logging.info('No media folder found.') if media_folder: - decrypt_media(password, media_folder, dest_path_out) + decrypt_media(password, media_folder, dest_path_out, expandtar) - logging.info('setting all decrypted files to read-only') - for entry in dest_path_out.glob('**/*'): - # Set read-only permission if entry is a file. - if os.path.isfile(entry): - os.chmod(entry, 0o444) - - # *nix directories require execute permission to read/traverse - elif os.path.isdir(entry): - os.chmod(entry, 0o555) + if writable: + logging.info('Not setting read-only on decrypted files') + else: + logging.info('setting all decrypted files to read-only') + for entry in dest_path_out.glob('**/*'): + # Set read-only permission if entry is a file. + if os.path.isfile(entry): + os.chmod(entry, 0o444) + # *nix directories require execute permission to read/traverse + elif os.path.isdir(entry): + os.chmod(entry, 0o555) # --- entry point and parameters checks --------------------------------------- @@ -907,6 +971,10 @@ if __name__ == '__main__': parser.add_argument('password', help='user password for the backup') parser.add_argument('backup_path', help='backup folder') parser.add_argument('dest_path', help='decrypted backup folder') + parser.add_argument('-e', '--expandtar', action='store_true', + help='expand tar files') + parser.add_argument('-w', '--writable', action='store_true', + help='do not set RO pemission on decrypted data') parser.add_argument('-v', '--verbose', action='count', help='verbose level, -v to -vvv') args = parser.parse_args() @@ -932,8 +1000,8 @@ if __name__ == '__main__': dest_path = pathlib.Path(args.dest_path) if dest_path.is_dir(): sys.exit('Destination folder already exists!') - - # Make directory with read and execute permission (=read and traverse) - dest_path.mkdir(0o755,parents=True) - main(user_password, backup_path, dest_path) + # Make directory with read and execute permission (=read and traverse) + dest_path.mkdir(0o755, parents=True) + + main(user_password, backup_path, dest_path, args.expandtar, args.writable)