mirror of
https://github.com/imgyh/tiktok.git
synced 2025-06-18 16:09:22 +08:00
优化用户所有作品数据的获取流程,支持 link 参数传入个人主页地址
1. 支持 link 参数传入个人主页地址, 格式为: https://www.douyin.com/user/MS4wLjABAAAAKT9wejag0ZFC12jlj_lPVYKNJtE8WGQlN42Omq 可直接从网页版地址栏获取 2. 用户主页作品数据的获取和下载流程优化: - 放弃 timeout 时间限制, 设置请求重试次数限制, 可设置为无限重试, 超过请求重试次数后, 尝试下载已经获取成功的作品而不是直接报错退出 - 通过 requests.Session() 发送请求以维持请求状态,似乎能提高接口访问成功率 - 以用户名创建一个单独的文件夹以存放该用户的下载文件
This commit is contained in:
parent
def3dd77e7
commit
d712e84a17
85
TikTok.py
85
TikTok.py
@ -38,8 +38,14 @@ class TikTok(object):
|
||||
'referer': 'https://www.douyin.com/',
|
||||
'Cookie': f"msToken={self.utils.generate_random_str(107)}; ttwid={self.utils.getttwid()}; odin_tt=324fb4ea4a89c0c05827e18a1ed9cf9bf8a17f7705fcc793fec935b637867e2a5a9b8168c885554d029919117a18ba69; passport_csrf_token=f61602fc63757ae0e4fd9d6bdcee4810;"
|
||||
}
|
||||
self.session = requests.Session()
|
||||
self.session.headers.update(self.headers)
|
||||
# 用于设置重复请求某个接口的最大时间
|
||||
self.timeout = 10
|
||||
# 用于设置重复请求某个接口的最大重试次数, 设置为 0 则重试无限次
|
||||
self.retries_max = 20
|
||||
# 用于设置重复请求某个接口的重试等待时间
|
||||
self.retry_wait = 1
|
||||
|
||||
|
||||
# 从分享链接中提取网址
|
||||
@ -54,7 +60,7 @@ class TikTok(object):
|
||||
key_type = None
|
||||
|
||||
try:
|
||||
r = requests.get(url=url, headers=self.headers)
|
||||
r = self.session.get(url=url)
|
||||
except Exception as e:
|
||||
print('[ 错误 ]:输入链接有误!\r')
|
||||
return key_type, key
|
||||
@ -63,6 +69,7 @@ class TikTok(object):
|
||||
# 作品 第一步解析出来的链接是share/video/{aweme_id}
|
||||
# https://www.iesdouyin.com/share/video/7037827546599263488/?region=CN&mid=6939809470193126152&u_code=j8a5173b&did=MS4wLjABAAAA1DICF9-A9M_CiGqAJZdsnig5TInVeIyPdc2QQdGrq58xUgD2w6BqCHovtqdIDs2i&iid=MS4wLjABAAAAomGWi4n2T0H9Ab9x96cUZoJXaILk4qXOJlJMZFiK6b_aJbuHkjN_f0mBzfy91DX1&with_sec_did=1&titleType=title&schema_type=37&from_ssr=1&utm_source=copy&utm_campaign=client_share&utm_medium=android&app=aweme
|
||||
# 用户 第一步解析出来的链接是share/user/{sec_uid}
|
||||
# 如果传入 link 参数是 https://www.douyin.com/user/<sec_uid> 格式的,解析出来的链接是 /user/{sec_uid}
|
||||
# https://www.iesdouyin.com/share/user/MS4wLjABAAAA06y3Ctu8QmuefqvUSU7vr0c_ZQnCqB0eaglgkelLTek?did=MS4wLjABAAAA1DICF9-A9M_CiGqAJZdsnig5TInVeIyPdc2QQdGrq58xUgD2w6BqCHovtqdIDs2i&iid=MS4wLjABAAAAomGWi4n2T0H9Ab9x96cUZoJXaILk4qXOJlJMZFiK6b_aJbuHkjN_f0mBzfy91DX1&with_sec_did=1&sec_uid=MS4wLjABAAAA06y3Ctu8QmuefqvUSU7vr0c_ZQnCqB0eaglgkelLTek&from_ssr=1&u_code=j8a5173b×tamp=1674540164&ecom_share_track_params=%7B%22is_ec_shopping%22%3A%221%22%2C%22secuid%22%3A%22MS4wLjABAAAA-jD2lukp--I21BF8VQsmYUqJDbj3FmU-kGQTHl2y1Cw%22%2C%22enter_from%22%3A%22others_homepage%22%2C%22share_previous_page%22%3A%22others_homepage%22%7D&utm_source=copy&utm_campaign=client_share&utm_medium=android&app=aweme
|
||||
# 合集
|
||||
# https://www.douyin.com/collection/7093490319085307918
|
||||
@ -77,6 +84,15 @@ class TikTok(object):
|
||||
for one in re.finditer(r'user\/([\d\D]*)', str(r.request.path_url)):
|
||||
key = one.group(1)
|
||||
key_type = "user"
|
||||
elif "/user/" in urlstr:
|
||||
# 链接是 www.douyin.com/user/<sec_uid>
|
||||
if '?' in r.request.path_url:
|
||||
for one in re.finditer(r'user\/([\d\D]*)([?])', str(r.request.path_url)):
|
||||
key = one.group(1)
|
||||
else:
|
||||
for one in re.finditer(r'user\/([\d\D]*)', str(r.request.path_url)):
|
||||
key = one.group(1)
|
||||
key_type = "user"
|
||||
elif "/share/video/" in urlstr:
|
||||
# 获取作品 aweme_id
|
||||
key = re.findall('video/(\d+)?', urlstr)[0]
|
||||
@ -97,7 +113,7 @@ class TikTok(object):
|
||||
key1 = re.findall('reflow/(\d+)?', urlstr)[0]
|
||||
url = self.urls.LIVE2 + self.utils.getXbogus(
|
||||
f'live_id=1&room_id={key1}&app_id=1128')
|
||||
res = requests.get(url, headers=self.headers)
|
||||
res = self.session.get(url)
|
||||
resjson = json.loads(res.text)
|
||||
key = resjson['data']['room']['owner']['web_rid']
|
||||
key_type = "live"
|
||||
@ -127,7 +143,7 @@ class TikTok(object):
|
||||
jx_url = self.urls.POST_DETAIL + self.utils.getXbogus(
|
||||
url=f'aweme_id={aweme_id}&aid=1128&version_name=23.5.0&device_platform=android&os_version=2333')
|
||||
|
||||
raw = requests.get(url=jx_url, headers=self.headers).text
|
||||
raw = self.session.get(url=jx_url).text
|
||||
datadict = json.loads(raw)
|
||||
if datadict is not None and datadict['aweme_detail'] is not None and datadict["status_code"] == 0:
|
||||
break
|
||||
@ -171,17 +187,25 @@ class TikTok(object):
|
||||
print("[ 提示 ]:正在获取所有作品数据请稍后...\r")
|
||||
print("[ 提示 ]:会进行多次请求,等待时间较长...\r\n")
|
||||
times = 0
|
||||
# self.session.get('https://www.douyin.com')
|
||||
self.session.get('https://www.douyin.com/user/' + sec_uid)
|
||||
# referer 需要指定为如下格式
|
||||
self.session.headers['referer'] = 'https://www.douyin.com/user/' + sec_uid
|
||||
username = ""
|
||||
is_retries_max = False
|
||||
while True:
|
||||
times = times + 1
|
||||
print("[ 提示 ]:正在对 [主页] 进行第 " + str(times) + " 次请求...\r")
|
||||
|
||||
start = time.time() # 开始时间
|
||||
retries_count = 0
|
||||
while True:
|
||||
retries_count += 1
|
||||
if self.retries_max != 0 and retries_count > self.retries_max:
|
||||
is_retries_max = True
|
||||
break
|
||||
# 接口不稳定, 有时服务器不返回数据, 需要重新获取
|
||||
try:
|
||||
if mode == "post":
|
||||
# referer 需要指定为如下格式
|
||||
self.headers['referer'] = 'https://www.douyin.com/user/' + sec_uid
|
||||
url = self.urls.USER_POST + self.utils.getXbogus(
|
||||
url=f'device_platform=webapp&aid=6383&os_version=10&version_name=17.4.0&sec_user_id={sec_uid}&count={count}&max_cursor={max_cursor}')
|
||||
elif mode == "like":
|
||||
@ -190,19 +214,24 @@ class TikTok(object):
|
||||
else:
|
||||
print("[ 错误 ]:模式选择错误, 仅支持post、like、mix, 请检查后重新运行!\r")
|
||||
return None
|
||||
|
||||
res = requests.get(url=url, headers=self.headers)
|
||||
# req = requests.Request('GET', url)
|
||||
# prepped = req.prepare()
|
||||
# print("referer:", prepped.headers.get("referer"), prepped.headers)
|
||||
# res = self.session.send(prepped)
|
||||
res = self.session.get(url=url)
|
||||
datadict = json.loads(res.text)
|
||||
print('[ 提示 ]:本次请求返回 ' + str(len(datadict["aweme_list"])) + ' 条数据\r')
|
||||
print('[ 提示 ]:开始对 ' + str(len(datadict["aweme_list"])) + ' 条数据请求作品详情\r\n')
|
||||
if datadict is not None and datadict["status_code"] == 0:
|
||||
break
|
||||
except Exception as e:
|
||||
end = time.time() # 结束时间
|
||||
if end - start > self.timeout:
|
||||
raise RuntimeError("重复请求该接口" + str(self.timeout) + "s, 仍然未获取到数据")
|
||||
except Exception:
|
||||
time.sleep(self.retry_wait)
|
||||
print("[ 警告 ]:接口未返回数据, 正在重新请求!\r")
|
||||
|
||||
if is_retries_max:
|
||||
print("\r\n[ 提示 ]: 超出重试次数限制, 尝试返回已经成功获取的作品数据...\r\n")
|
||||
break
|
||||
if not username:
|
||||
username = datadict["aweme_list"][0]["author"]["nickname"]
|
||||
for aweme in datadict["aweme_list"]:
|
||||
# 获取 aweme_id
|
||||
aweme_id = aweme["aweme_id"]
|
||||
@ -226,8 +255,10 @@ class TikTok(object):
|
||||
break
|
||||
else:
|
||||
print("\r\n[ 提示 ]:[主页] 第 " + str(times) + " 次请求成功...\r\n")
|
||||
|
||||
return awemeList
|
||||
if not awemeList:
|
||||
print("\r\n[ 错误 ]:[主页] 获取作品数据为空...\r\n")
|
||||
raise Exception("获取作品数据为空")
|
||||
return awemeList, username
|
||||
|
||||
def getLiveInfo(self, web_rid: str, option=True):
|
||||
if option:
|
||||
@ -242,7 +273,7 @@ class TikTok(object):
|
||||
live_api = self.urls.LIVE + self.utils.getXbogus(
|
||||
url=f'aid=6383&device_platform=web&web_rid={web_rid}')
|
||||
|
||||
response = requests.get(live_api, headers=self.headers)
|
||||
response = self.session.get(live_api)
|
||||
live_json = json.loads(response.text)
|
||||
if live_json != {} and live_json['status_code'] == 0:
|
||||
break
|
||||
@ -351,7 +382,7 @@ class TikTok(object):
|
||||
url = self.urls.USER_MIX + self.utils.getXbogus(
|
||||
url=f'device_platform=webapp&aid=6383&os_version=10&version_name=17.4.0&mix_id={mix_id}&cursor={cursor}&count={count}')
|
||||
|
||||
res = requests.get(url=url, headers=self.headers)
|
||||
res = self.session.get(url=url)
|
||||
datadict = json.loads(res.text)
|
||||
print('[ 提示 ]:本次请求返回 ' + str(len(datadict["aweme_list"])) + ' 条数据\r')
|
||||
print('[ 提示 ]:开始对 ' + str(len(datadict["aweme_list"])) + ' 条数据请求作品详情\r\n')
|
||||
@ -415,7 +446,7 @@ class TikTok(object):
|
||||
url = self.urls.USER_MIX_LIST + self.utils.getXbogus(
|
||||
url=f'device_platform=webapp&aid=6383&os_version=10&version_name=17.4.0&sec_user_id={sec_uid}&count={count}&cursor={cursor}')
|
||||
|
||||
res = requests.get(url=url, headers=self.headers)
|
||||
res = self.session.get(url=url)
|
||||
datadict = json.loads(res.text)
|
||||
print('[ 提示 ]:本次请求返回 ' + str(len(datadict["mix_infos"])) + ' 条数据\r')
|
||||
print('[ 提示 ]:开始对 ' + str(len(datadict["mix_infos"])) + ' 条数据请求作品详情\r\n')
|
||||
@ -475,7 +506,7 @@ class TikTok(object):
|
||||
url = self.urls.MUSIC + self.utils.getXbogus(
|
||||
url=f'device_platform=webapp&aid=6383&os_version=10&version_name=17.4.0&music_id={music_id}&cursor={cursor}&count={count}')
|
||||
|
||||
res = requests.get(url=url, headers=self.headers)
|
||||
res = self.session.get(url=url)
|
||||
datadict = json.loads(res.text)
|
||||
print('[ 提示 ]:本次请求返回 ' + str(len(datadict["aweme_list"])) + ' 条数据\r')
|
||||
print('[ 提示 ]:开始对 ' + str(len(datadict["aweme_list"])) + ' 条数据请求作品详情\r\n')
|
||||
@ -514,8 +545,8 @@ class TikTok(object):
|
||||
return awemeList
|
||||
|
||||
# 来自 https://blog.csdn.net/weixin_43347550/article/details/105248223
|
||||
def progressBarDownload(self, url, filepath,desc):
|
||||
response = requests.get(url, stream=True, headers=self.headers)
|
||||
def progressBarDownload(self, url, filepath, desc):
|
||||
response = self.session.get(url, stream=True)
|
||||
chunk_size = 1024 # 每次下载的数据大小
|
||||
content_size = int(response.headers['content-length']) # 下载文件总大小
|
||||
try:
|
||||
@ -568,7 +599,7 @@ class TikTok(object):
|
||||
video_path = os.path.join(aweme_path, file_name + ".mp4")
|
||||
|
||||
if os.path.exists(video_path):
|
||||
# print("[ 提示 ]:视频已存在为您跳过...\r\n")
|
||||
print(f"[ 视频 ]: {file_name} 已存在为您跳过...")
|
||||
pass
|
||||
else:
|
||||
try:
|
||||
@ -663,11 +694,13 @@ class TikTok(object):
|
||||
except Exception as e:
|
||||
print("[ 错误 ]:下载作品时出错\r\n")
|
||||
|
||||
def userDownload(self, awemeList: list, music=True, cover=True, avatar=True, resjson=True, savePath=os.getcwd(), thread=5):
|
||||
def userDownload(self, awemeList: list, music=True, cover=True, avatar=True, resjson=True, savePath=os.getcwd(), thread=5, username=""):
|
||||
if awemeList is None:
|
||||
return
|
||||
if not os.path.exists(savePath):
|
||||
os.mkdir(savePath)
|
||||
valid_folder_name = re.sub(r'[<>:"/\|\?\*\\]', "_", username)
|
||||
user_path = os.path.join(savePath, valid_folder_name)
|
||||
if not os.path.exists(user_path):
|
||||
os.makedirs(user_path, exist_ok=True)
|
||||
|
||||
self.tpool = ThreadPoolExecutor(thread)
|
||||
self.alltask = []
|
||||
@ -676,7 +709,7 @@ class TikTok(object):
|
||||
for aweme in awemeList:
|
||||
# print("[ 提示 ]:正在下载 [%s] 的作品 %s/%s\r\n"
|
||||
# % (aweme["author"]["nickname"], str(ind + 1), len(awemeList)))
|
||||
self.awemeDownload(awemeDict=aweme, music=music, cover=cover, avatar=avatar, resjson=resjson, savePath=savePath,usingThread=True)
|
||||
self.awemeDownload(awemeDict=aweme, music=music, cover=cover, avatar=avatar, resjson=resjson, savePath=user_path, usingThread=True)
|
||||
|
||||
wait(self.alltask, return_when=ALL_COMPLETED)
|
||||
end = time.time() # 结束时间
|
||||
|
@ -23,7 +23,7 @@ from TikTokUtils import Utils
|
||||
def argument():
|
||||
parser = argparse.ArgumentParser(description='抖音批量下载工具 使用帮助')
|
||||
parser.add_argument("--link", "-l",
|
||||
help="作品(视频或图集)、直播、合集、音乐集合、个人主页抖音分享链接(删除文案, 保证只有URL, https://v.douyin.com/kcvMpuN/)",
|
||||
help="作品(视频或图集)、直播、合集、音乐集合、个人主页抖音分享链接(删除文案, 保证只有URL, https://v.douyin.com/kcvMpuN/)、个人主页地址(https://www.douyin.com/user/MS4wLjABAAAAKT9wejag0ZFC12jlj_lPVYKNJtE8WGQlN42Omq)",
|
||||
type=str, required=True)
|
||||
parser.add_argument("--path", "-p", help="下载保存位置",
|
||||
type=str, required=True)
|
||||
@ -60,9 +60,9 @@ def main():
|
||||
if key is None or key_type is None:
|
||||
return
|
||||
elif key_type == "user" and args.mode != 'mix':
|
||||
datalist = tk.getUserInfo(key, args.mode, 35, args.number)
|
||||
datalist, username = tk.getUserInfo(key, args.mode, 35, args.number)
|
||||
tk.userDownload(awemeList=datalist, music=args.music, cover=args.cover, avatar=args.avatar, resjson=args.json,
|
||||
savePath=args.path, thread=args.thread)
|
||||
savePath=args.path, thread=args.thread, username=username)
|
||||
elif key_type == "user" and args.mode == 'mix':
|
||||
if not os.path.exists(args.path):
|
||||
os.mkdir(args.path)
|
||||
|
Loading…
x
Reference in New Issue
Block a user