fix(tiktok): 修复失效API,增加X-Bogus

暂时使用https://github.com/Johnserf-Seed/TikTokDownload提供的API来获取X-Bogus
This commit is contained in:
imgyh 2023-02-11 15:18:38 +08:00
parent 9a6749bffc
commit 53d3b5875e
7 changed files with 542 additions and 344 deletions

387
TikTok.py
View File

@ -9,7 +9,7 @@
@Github :https://github.com/imgyh
@Mail :admin@imgyh.com
-------------------------------------------------
Change Log :
Change Log : 2023/02/11 修改接口
-------------------------------------------------
'''
@ -20,206 +20,23 @@ import time
import os
import copy
import TikTokUtils
from TikTokUtils import Utils
from TikTokUrls import Urls
from TikTokResult import Result
'''
作品详情
https://www.iesdouyin.com/aweme/v1/web/aweme/detail/?aweme_id=%s&aid=1128&version_name=23.5.0&device_platform=android&os_version=2333
1080p视频
https://aweme.snssdk.com/aweme/v1/play/?video_id=%s&ratio=1080p&line=0
主页作品
https://www.iesdouyin.com/aweme/v1/web/aweme/post/?sec_user_id=%s&count=%s&max_cursor=%s&aid=1128&version_name=23.5.0&device_platform=android&os_version=2333
主页喜欢
https://www.iesdouyin.com/web/api/v2/aweme/like/?sec_uid=%s&count=%s&max_cursor=%s&aid=1128&version_name=23.5.0&device_platform=android&os_version=2333
'''
class TikTok(object):
def __init__(self):
self.urls = Urls()
self.utils = Utils()
self.result = Result()
self.headers = {
'user-agent': 'Mozilla/5.0 (Linux; Android 8.0; Pixel 2 Build/OPD3.170816.012) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Mobile Safari/537.36 Edg/87.0.664.66',
'Cookie': 'msToken=%s' % TikTokUtils.generate_random_str(107)
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.0.0 Safari/537.36',
'referer':'https://www.douyin.com/',
'Cookie': 'msToken=%s;odin_tt=324fb4ea4a89c0c05827e18a1ed9cf9bf8a17f7705fcc793fec935b637867e2a5a9b8168c885554d029919117a18ba69;' % self.utils.generate_random_str(107)
}
# 作者信息
self.authorDict = {
"avatar_thumb": {
"height": "",
"uri": "",
"url_list": [],
"width": ""
},
"avatar": {
"height": "",
"uri": "",
"url_list": [],
"width": ""
},
"cover_url": {
"height": "",
"uri": "",
"url_list": [],
"width": ""
},
# 喜欢的作品数
"favoriting_count": "",
# 粉丝数
"follower_count": "",
# 关注数
"following_count": "",
# 昵称
"nickname": "",
# 是否允许下载
"prevent_download": "",
# 用户 url id
"sec_uid": "",
# 是否私密账号
"secret": "",
# 短id
"short_id": "",
# 签名
"signature": "",
# 总获赞数
"total_favorited": "",
# 用户id
"uid": "",
# 用户自定义唯一id 抖音号
"unique_id": "",
# 年龄
"user_age": "",
}
# 图片信息
self.picDict = {
"height": "",
"mask_url_list": "",
"uri": "",
"url_list": [],
"width": ""
}
# 音乐信息
self.musicDict = {
"cover_hd": {
"height": "",
"uri": "",
"url_list": [],
"width": ""
},
"cover_large": {
"height": "",
"uri": "",
"url_list": [],
"width": ""
},
"cover_medium": {
"height": "",
"uri": "",
"url_list": [],
"width": ""
},
"cover_thumb": {
"height": "",
"uri": "",
"url_list": [],
"width": ""
},
# 音乐作者抖音号
"owner_handle": "",
# 音乐作者id
"owner_id": "",
# 音乐作者昵称
"owner_nickname": "",
"play_url": {
"height": "",
"uri": "",
"url_key": "",
"url_list": [],
"width": ""
},
# 音乐名字
"title": "",
}
# 视频信息
self.videoDict = {
"play_addr": {
"uri": "",
"url_list": "",
},
"cover_original_scale": {
"height": "",
"uri": "",
"url_list": [],
"width": ""
},
"dynamic_cover": {
"height": "",
"uri": "",
"url_list": [],
"width": ""
},
"origin_cover": {
"height": "",
"uri": "",
"url_list": [],
"width": ""
},
"cover": {
"height": "",
"uri": "",
"url_list": [],
"width": ""
}
}
# 作品信息
self.awemeDict = {
# 作品创建时间
"create_time":"",
# awemeType=0 视频, awemeType=1 图集
"awemeType": "",
# 作品 id
"aweme_id": "",
# 作者信息
"author": self.authorDict,
# 作品描述
"desc": "",
# 图片
"images": [],
# 音乐
"music": self.musicDict,
# 视频
"video": self.videoDict,
# 作品信息统计
"statistics": {
"admire_count": "",
"collect_count": "",
"comment_count": "",
"digg_count": "",
"play_count": "",
"share_count": ""
}
}
# 用户作品信息
self.awemeList = []
# 直播信息
self.liveDict = {
# 是否在播
"status": "",
# 直播标题
"title": "",
# 观看人数
"user_count": "",
# 昵称
"nickname": "",
# sec_uid
"sec_uid": "",
# 直播间观看状态
"display_long": "",
# 推流
"flv_pull_url": "",
# 分区
"partition": "",
"sub_partition": ""
}
# 从分享链接中提取网址
def getShareLink(self, string):
@ -269,97 +86,25 @@ class TikTok(object):
return key_type, key
# 将得到的json数据dataRaw精简成自己定义的数据dataNew
# 转换得到的数据
def dataConvert(self, awemeType, dataNew, dataRaw):
for item in dataNew:
try:
# 作品创建时间
if item == "create_time":
dataNew['create_time'] = time.strftime(
"%Y-%m-%d %H.%M.%S", time.localtime(dataRaw['create_time']))
continue
# 设置 awemeType
if item == "awemeType":
dataNew["awemeType"] = awemeType
continue
# 当 解析的链接 是图片时
if item == "images":
if awemeType == 1:
for image in dataRaw[item]:
for i in image:
self.picDict[i] = image[i]
# 字典要深拷贝
self.awemeDict["images"].append(copy.deepcopy(self.picDict))
continue
# 当 解析的链接 是视频时
if item == "video":
if awemeType == 0:
self.dataConvert(awemeType, dataNew[item], dataRaw[item])
continue
# 将小头像放大
if item == "avatar":
for i in dataNew[item]:
if i == "url_list":
for j in self.awemeDict["author"]["avatar_thumb"]["url_list"]:
dataNew[item][i].append(j.replace("100x100", "1080x1080"))
elif i == "uri":
dataNew[item][i] = self.awemeDict["author"]["avatar_thumb"][i].replace("100x100",
"1080x1080")
else:
dataNew[item][i] = self.awemeDict["author"]["avatar_thumb"][i]
continue
# 原来的json是[{}] 而我们的是 {}
if item == "cover_url":
self.dataConvert(awemeType, dataNew[item], dataRaw[item][0])
continue
# 根据 uri 获取 1080p 视频
if item == "play_addr":
dataNew[item]["uri"] = dataRaw["bit_rate"][0]["play_addr"]["uri"]
# 使用 这个api 可以获得1080p
dataNew[item]["url_list"] = "https://aweme.snssdk.com/aweme/v1/play/?video_id=%s&ratio=1080p&line=0" \
% dataNew[item]["uri"]
continue
# 常规 递归遍历 字典
if isinstance(dataNew[item], dict):
self.dataConvert(awemeType, dataNew[item], dataRaw[item])
else:
# 赋值
dataNew[item] = dataRaw[item]
except Exception as e:
print("[ 警告 ]:转换数据时在接口中未找到 %s\r" % (item))
def clearDict(self, data):
for item in data:
# 常规 递归遍历 字典
if isinstance(data[item], dict):
self.clearDict(data[item])
elif isinstance(data[item], list):
data[item] = []
else:
data[item] = ""
# 传入 aweme_id
# 返回 数据 字典
def getAwemeInfo(self, aweme_id):
if aweme_id is None:
return None
# 官方接口
# 旧接口22/12/23失效
# jx_url = f'https://www.iesdouyin.com/web/api/v2/aweme/iteminfo/?item_ids={self.aweme_id[i]}'
# 23/01/11
# 此ies domian暂时不需要xg参数
# 单作品接口返回 'aweme_detail'
# 主页作品接口返回 'aweme_list'->['aweme_detail']
jx_url = f'https://www.iesdouyin.com/aweme/v1/web/aweme/detail/?aweme_id={aweme_id}&aid=1128&version_name=23.5.0&device_platform=android&os_version=2333'
raw = requests.get(url=jx_url, headers=self.headers).text
datadict = json.loads(raw)
jx_url = self.urls.POST_DETAIL + self.utils.getXbogus(
url=f'aweme_id={aweme_id}&aid=1128&version_name=23.5.0&device_platform=android&os_version=2333')
try:
raw = requests.get(url=jx_url, headers=self.headers).text
datadict = json.loads(raw)
except Exception as e:
print("[ 错误 ]:接口未返回数据, 请检查后重新运行!\r")
return None
# 清空self.awemeDict
self.clearDict(self.awemeDict)
self.result.clearDict(self.result.awemeDict)
if datadict['aweme_detail'] is None:
print('[ 错误 ]:作品不存在, 请检查后重新运行!\r')
@ -374,20 +119,16 @@ class TikTok(object):
print("[ 警告 ]:接口中未找到 images\r")
# 转换成我们自己的格式
self.dataConvert(awemeType, self.awemeDict, datadict['aweme_detail'])
self.result.dataConvert(awemeType, self.result.awemeDict, datadict['aweme_detail'])
return self.awemeDict
return self.result.awemeDict, datadict
# 传入 url 支持 https://www.iesdouyin.com 与 https://v.douyin.com
# mode : post | like 模式选择 like为用户点赞 post为用户发布
def getUserInfo(self, sec_uid, mode="post", count=35):
if sec_uid is None:
return None
# 旧接口于22/12/23失效
# post_url = 'https://www.iesdouyin.com/web/api/v2/aweme/post/?sec_uid=%s&count=35&max_cursor=0&aid=1128&_signature=PDHVOQAAXMfFyj02QEpGaDwx1S&dytk=' % (
# self.sec)
# 23/1/11
# 暂时使用不需要xg的接口
max_cursor = 0
self.awemeList = []
@ -395,28 +136,32 @@ class TikTok(object):
while True:
if mode == "post":
post_url = 'https://www.iesdouyin.com/aweme/v1/web/aweme/post/?sec_user_id=%s&count=%s&max_cursor=%s&aid=1128&version_name=23.5.0&device_platform=android&os_version=2333' % (
sec_uid, count, max_cursor)
url = self.urls.USER_POST + self.utils.getXbogus(
url=f'sec_user_id={sec_uid}&count={count}&max_cursor={max_cursor}&aid=1128&version_name=23.5.0&device_platform=android&os_version=2333')
elif mode == "like":
post_url = 'https://www.iesdouyin.com/web/api/v2/aweme/like/?sec_uid=%s&count=%s&max_cursor=%s&aid=1128&version_name=23.5.0&device_platform=android&os_version=2333' % (
sec_uid, count, max_cursor)
url = self.urls.USER_FAVORITE_A + self.utils.getXbogus(
url=f'sec_user_id={sec_uid}&count={count}&max_cursor={max_cursor}&aid=1128&version_name=23.5.0&device_platform=android&os_version=2333')
else:
print("[ 错误 ]:模式选择错误, 仅支持post和like, 请检查后重新运行!\r")
return None
res = requests.get(url=post_url, headers=self.headers)
datadict = json.loads(res.text)
try:
res = requests.get(url=url, headers=self.headers)
datadict = json.loads(res.text)
except Exception as e:
print("[ 错误 ]:接口未返回数据, 请检查后重新运行!\r")
return None
if not datadict["aweme_list"]:
print("[ 错误 ]:未找到数据, 请检查后重新运行!\r")
return None
for aweme in datadict["aweme_list"]:
# 获取 aweme_id 使用这个接口 https://www.iesdouyin.com/aweme/v1/web/aweme/detail/
# 获取 aweme_id
aweme_id = aweme["aweme_id"]
# 深拷贝 dict 不然list里面全是同样的数据
self.awemeList.append(copy.deepcopy(self.getAwemeInfo(aweme_id)))
# time.sleep(0.5)
datanew, dataraw = self.getAwemeInfo(aweme_id)
self.awemeList.append(copy.deepcopy(datanew))
# 更新 max_cursor
max_cursor = datadict["max_cursor"]
@ -431,76 +176,79 @@ class TikTok(object):
# web_rid = live_url.replace('https://live.douyin.com/', '')
live_api = 'https://live.douyin.com/webcast/web/enter/?aid=6383&web_rid=%s' % (web_rid)
live_api = 'https://live.douyin.com/webcast/room/web/enter/?aid=6383&device_platform=web&web_rid=%s' % (web_rid)
# 必须用这个 headers
headers = {
'Cookie': 'msToken=tsQyL2_m4XgtIij2GZfyu8XNXBfTGELdreF1jeIJTyktxMqf5MMIna8m1bv7zYz4pGLinNP2TvISbrzvFubLR8khwmAVLfImoWo3Ecnl_956MgOK9kOBdwM=; odin_tt=6db0a7d68fd2147ddaf4db0b911551e472d698d7b84a64a24cf07c49bdc5594b2fb7a42fd125332977218dd517a36ec3c658f84cebc6f806032eff34b36909607d5452f0f9d898810c369cd75fd5fb15; ttwid=1%7CfhiqLOzu_UksmD8_muF_TNvFyV909d0cw8CSRsmnbr0%7C1662368529%7C048a4e969ec3570e84a5faa3518aa7e16332cfc7fbcb789780135d33a34d94d2'
}
response = requests.get(live_api, headers=headers)
live_json = json.loads(response.text)
try:
response = requests.get(live_api, headers=headers)
live_json = json.loads(response.text)
except Exception as e:
print("[ 错误 ]:接口未返回数据, 请检查后重新运行!\r")
return None
if live_json == {} or live_json['status_code'] != 0:
print("[ 警告 ]:接口未返回信息\r")
return None
# 清空字典
self.clearDict(self.liveDict)
self.result.clearDict(self.result.liveDict)
# 是否在播
self.liveDict["status"] = live_json['data']['data'][0]['status']
self.result.liveDict["status"] = live_json['data']['data'][0]['status']
if self.liveDict["status"] == 4:
if self.result.liveDict["status"] == 4:
print('[ 📺 ]:当前直播已结束,正在退出')
return self.liveDict
return self.result.liveDict
# 直播标题
self.liveDict["title"] = live_json['data']['data'][0]['title']
self.result.liveDict["title"] = live_json['data']['data'][0]['title']
# 观看人数
self.liveDict["user_count"] = live_json['data']['data'][0]['user_count_str']
self.result.liveDict["user_count"] = live_json['data']['data'][0]['user_count_str']
# 昵称
self.liveDict["nickname"] = live_json['data']['data'][0]['owner']['nickname']
self.result.liveDict["nickname"] = live_json['data']['data'][0]['owner']['nickname']
# sec_uid
self.liveDict["sec_uid"] = live_json['data']['data'][0]['owner']['sec_uid']
self.result.liveDict["sec_uid"] = live_json['data']['data'][0]['owner']['sec_uid']
# 直播间观看状态
self.liveDict["display_long"] = live_json['data']['data'][0]['room_view_stats']['display_long']
self.result.liveDict["display_long"] = live_json['data']['data'][0]['room_view_stats']['display_long']
# 推流
self.liveDict["flv_pull_url"] = live_json['data']['data'][0]['stream_url']['flv_pull_url']
self.result.liveDict["flv_pull_url"] = live_json['data']['data'][0]['stream_url']['flv_pull_url']
try:
# 分区
self.liveDict["partition"] = live_json['data']['partition_road_map']['partition']['title']
self.liveDict["sub_partition"] = live_json['data']['partition_road_map']['sub_partition']['partition'][
self.result.liveDict["partition"] = live_json['data']['partition_road_map']['partition']['title']
self.result.liveDict["sub_partition"] = live_json['data']['partition_road_map']['sub_partition']['partition'][
'title']
except Exception as e:
self.liveDict["partition"] = ''
self.liveDict["sub_partition"] = ''
self.result.liveDict["partition"] = ''
self.result.liveDict["sub_partition"] = ''
info = '[ 💻 ]:直播间:%s 当前%s 主播:%s 分区:%s-%s\r' % (
self.liveDict["title"], self.liveDict["display_long"], self.liveDict["nickname"],
self.liveDict["partition"], self.liveDict["sub_partition"])
self.result.liveDict["title"], self.result.liveDict["display_long"], self.result.liveDict["nickname"],
self.result.liveDict["partition"], self.result.liveDict["sub_partition"])
print(info)
flv = []
print('[ 🎦 ]:直播间清晰度')
for i, f in enumerate(self.liveDict["flv_pull_url"].keys()):
for i, f in enumerate(self.result.liveDict["flv_pull_url"].keys()):
print('[ %s ]: %s' % (i, f))
flv.append(f)
rate = int(input('[ 🎬 ]输入数字选择推流清晰度:'))
# 显示清晰度列表
print('[ %s ]:%s' % (flv[rate], self.liveDict["flv_pull_url"][flv[rate]]))
print('[ %s ]:%s' % (flv[rate], self.result.liveDict["flv_pull_url"][flv[rate]]))
print('[ 📺 ]:复制链接使用下载工具下载')
return self.liveDict
return self.result.liveDict
# 来自 https://blog.csdn.net/weixin_43347550/article/details/105248223
def progressBarDownload(self, url, filepath):
@ -531,9 +279,12 @@ class TikTok(object):
def awemeDownload(self, awemeDict: dict, music=True, cover=True, avatar=True, savePath=os.getcwd()):
if awemeDict is None:
return
if not os.path.exists(savePath):
os.mkdir(savePath)
try:
# 使用作品 创建时间+描述 当文件夹
file_name = TikTokUtils.replaceStr(awemeDict["create_time"] + awemeDict["desc"])
file_name = self.utils.replaceStr(awemeDict["create_time"] + awemeDict["desc"])
aweme_path = os.path.join(savePath, file_name)
if not os.path.exists(aweme_path):
os.mkdir(aweme_path)
@ -577,7 +328,7 @@ class TikTok(object):
# 下载 音乐
if music:
print("[ 提示 ]:正在下载音乐...\r")
music_name = TikTokUtils.replaceStr(awemeDict["music"]["title"])
music_name = self.utils.replaceStr(awemeDict["music"]["title"])
music_path = os.path.join(aweme_path, music_name + ".mp3")
if os.path.exists(music_path):
@ -632,7 +383,7 @@ class TikTok(object):
% (aweme["author"]["nickname"], str(ind + 1), len(awemeList)))
self.awemeDownload(aweme, music, cover, avatar, savePath)
time.sleep(0.5)
# time.sleep(0.5)
if __name__ == "__main__":

View File

@ -52,8 +52,8 @@ def main():
tk.userDownload(awemeList=datalist, music=args.music, cover=args.cover, avatar=args.avatar,
savePath=args.path)
elif key_type == "aweme":
datadict = tk.getAwemeInfo(key)
tk.awemeDownload(awemeDict=datadict, music=args.music, cover=args.cover, avatar=args.avatar,
datanew, dataraw = tk.getAwemeInfo(key)
tk.awemeDownload(awemeDict=datanew, music=args.music, cover=args.cover, avatar=args.avatar,
savePath=args.path)
elif key_type == "live":
live_json = tk.getLiveInfo(key)

272
TikTokResult.py Normal file
View File

@ -0,0 +1,272 @@
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
'''
@Description:TikTok.py
@Date :2023/02/11 13:06:23
@Author :imgyh
@version :1.0
@Github :https://github.com/imgyh
@Mail :admin@imgyh.com
-------------------------------------------------
Change Log :
-------------------------------------------------
'''
import time
import copy
class Result(object):
def __init__(self):
# 作者信息
self.authorDict = {
"avatar_thumb": {
"height": "",
"uri": "",
"url_list": [],
"width": ""
},
"avatar": {
"height": "",
"uri": "",
"url_list": [],
"width": ""
},
"cover_url": {
"height": "",
"uri": "",
"url_list": [],
"width": ""
},
# 喜欢的作品数
"favoriting_count": "",
# 粉丝数
"follower_count": "",
# 关注数
"following_count": "",
# 昵称
"nickname": "",
# 是否允许下载
"prevent_download": "",
# 用户 url id
"sec_uid": "",
# 是否私密账号
"secret": "",
# 短id
"short_id": "",
# 签名
"signature": "",
# 总获赞数
"total_favorited": "",
# 用户id
"uid": "",
# 用户自定义唯一id 抖音号
"unique_id": "",
# 年龄
"user_age": "",
}
# 图片信息
self.picDict = {
"height": "",
"mask_url_list": "",
"uri": "",
"url_list": [],
"width": ""
}
# 音乐信息
self.musicDict = {
"cover_hd": {
"height": "",
"uri": "",
"url_list": [],
"width": ""
},
"cover_large": {
"height": "",
"uri": "",
"url_list": [],
"width": ""
},
"cover_medium": {
"height": "",
"uri": "",
"url_list": [],
"width": ""
},
"cover_thumb": {
"height": "",
"uri": "",
"url_list": [],
"width": ""
},
# 音乐作者抖音号
"owner_handle": "",
# 音乐作者id
"owner_id": "",
# 音乐作者昵称
"owner_nickname": "",
"play_url": {
"height": "",
"uri": "",
"url_key": "",
"url_list": [],
"width": ""
},
# 音乐名字
"title": "",
}
# 视频信息
self.videoDict = {
"play_addr": {
"uri": "",
"url_list": "",
},
"cover_original_scale": {
"height": "",
"uri": "",
"url_list": [],
"width": ""
},
"dynamic_cover": {
"height": "",
"uri": "",
"url_list": [],
"width": ""
},
"origin_cover": {
"height": "",
"uri": "",
"url_list": [],
"width": ""
},
"cover": {
"height": "",
"uri": "",
"url_list": [],
"width": ""
}
}
# 作品信息
self.awemeDict = {
# 作品创建时间
"create_time": "",
# awemeType=0 视频, awemeType=1 图集
"awemeType": "",
# 作品 id
"aweme_id": "",
# 作者信息
"author": self.authorDict,
# 作品描述
"desc": "",
# 图片
"images": [],
# 音乐
"music": self.musicDict,
# 视频
"video": self.videoDict,
# 作品信息统计
"statistics": {
"admire_count": "",
"collect_count": "",
"comment_count": "",
"digg_count": "",
"play_count": "",
"share_count": ""
}
}
# 用户作品信息
self.awemeList = []
# 直播信息
self.liveDict = {
# 是否在播
"status": "",
# 直播标题
"title": "",
# 观看人数
"user_count": "",
# 昵称
"nickname": "",
# sec_uid
"sec_uid": "",
# 直播间观看状态
"display_long": "",
# 推流
"flv_pull_url": "",
# 分区
"partition": "",
"sub_partition": ""
}
# 将得到的json数据dataRaw精简成自己定义的数据dataNew
# 转换得到的数据
def dataConvert(self, awemeType, dataNew, dataRaw):
for item in dataNew:
try:
# 作品创建时间
if item == "create_time":
dataNew['create_time'] = time.strftime(
"%Y-%m-%d %H.%M.%S", time.localtime(dataRaw['create_time']))
continue
# 设置 awemeType
if item == "awemeType":
dataNew["awemeType"] = awemeType
continue
# 当 解析的链接 是图片时
if item == "images":
if awemeType == 1:
for image in dataRaw[item]:
for i in image:
self.picDict[i] = image[i]
# 字典要深拷贝
self.awemeDict["images"].append(copy.deepcopy(self.picDict))
continue
# 当 解析的链接 是视频时
if item == "video":
if awemeType == 0:
self.dataConvert(awemeType, dataNew[item], dataRaw[item])
continue
# 将小头像放大
if item == "avatar":
for i in dataNew[item]:
if i == "url_list":
for j in self.awemeDict["author"]["avatar_thumb"]["url_list"]:
dataNew[item][i].append(j.replace("100x100", "1080x1080"))
elif i == "uri":
dataNew[item][i] = self.awemeDict["author"]["avatar_thumb"][i].replace("100x100",
"1080x1080")
else:
dataNew[item][i] = self.awemeDict["author"]["avatar_thumb"][i]
continue
# 原来的json是[{}] 而我们的是 {}
if item == "cover_url":
self.dataConvert(awemeType, dataNew[item], dataRaw[item][0])
continue
# 根据 uri 获取 1080p 视频
if item == "play_addr":
dataNew[item]["uri"] = dataRaw["bit_rate"][0]["play_addr"]["uri"]
# 使用 这个api 可以获得1080p
dataNew[item]["url_list"] = "https://aweme.snssdk.com/aweme/v1/play/?video_id=%s&ratio=1080p&line=0" \
% dataNew[item]["uri"]
continue
# 常规 递归遍历 字典
if isinstance(dataNew[item], dict):
self.dataConvert(awemeType, dataNew[item], dataRaw[item])
else:
# 赋值
dataNew[item] = dataRaw[item]
except Exception as e:
print("[ 警告 ]:转换数据时在接口中未找到 %s\r" % (item))
def clearDict(self, data):
for item in data:
# 常规 递归遍历 字典
if isinstance(data[item], dict):
self.clearDict(data[item])
elif isinstance(data[item], list):
data[item] = []
else:
data[item] = ""

59
TikTokTest.py Normal file
View File

@ -0,0 +1,59 @@
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
'''
@Description:TikTok.py
@Date :2023/02/11 13:06:23
@Author :imgyh
@version :1.0
@Github :https://github.com/imgyh
@Mail :admin@imgyh.com
-------------------------------------------------
Change Log :
-------------------------------------------------
'''
from TikTok import TikTok
def getAwemeInfo():
share_link_video = "3.56 uSy:/ 复制打开抖音,看看【小透明的作品】没有女朋友就用我的吧哈哈哈哈 # 表情包锁屏 https://v.douyin.com/BugmVVD/"
share_link_pic = "8.20 MJI:/ 复制打开抖音,看看【舍溪的图文作品】我又来放图集啦~还有你们要的小可爱大图也放啦~# ... https://v.douyin.com/BugrFTN/"
tk = TikTok()
url = tk.getShareLink(share_link_pic)
key_type, key = tk.getKey(url)
datanew, dataraw = tk.getAwemeInfo(key)
print(datanew)
def getUserInfo():
share_link_post = "1- 长按复制此条消息打开抖音搜索查看TA的更多作品。 https://v.douyin.com/BupCppt/"
share_link_like = "2- 长按复制此条消息打开抖音搜索查看TA的更多作品。 https://v.douyin.com/BusJrfr/"
tk = TikTok()
url = tk.getShareLink(share_link_like)
key_type, key = tk.getKey(url)
awemeList = tk.getUserInfo(key, mode="like", count=35)
print(awemeList)
def getLiveInfo():
live_link = "https://live.douyin.com/40768897856"
tk = TikTok()
url = tk.getShareLink(live_link)
key_type, key = tk.getKey(url)
live_json = tk.getLiveInfo(key)
print(live_json)
if __name__ == "__main__":
# getAwemeInfo()
# getUserInfo()
getLiveInfo()
pass
# 视频
# python TikTokCommand.py -l https://v.douyin.com/BugmVVD/ -p /mnt/c/project/test1
# 图集
# python TikTokCommand.py -l https://v.douyin.com/BugrFTN/ -p /mnt/c/project/test2
# 主页作品
# python TikTokCommand.py -l https://v.douyin.com/BupCppt/ -p /mnt/c/project/test3
# 主页喜欢
# python TikTokCommand.py -l https://v.douyin.com/BusJrfr/ -p /mnt/c/project/test4 -M like

80
TikTokUrls.py Normal file
View File

@ -0,0 +1,80 @@
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
'''
@Description:TikTok.py
@Date :2023/02/11 13:06:23
@Author :imgyh
@version :1.0
@Github :https://github.com/imgyh
@Mail :admin@imgyh.com
-------------------------------------------------
Change Log :
-------------------------------------------------
'''
class Urls(object):
def __init__(self):
# https://langyue.cc/APIdocV1.0.html
######################################### WEB #########################################
# 首页推荐
self.TAB_FEED = 'https://www.douyin.com/aweme/v1/web/tab/feed/?'
# 用户短信息给多少个用户secid就返回多少的用户信息
self.USER_SHORT_INFO = 'https://www.douyin.com/aweme/v1/web/im/user/info/?'
# 用户详细信息
self.USER_DETAIL = 'https://www.douyin.com/aweme/v1/web/user/profile/other/?'
# 用户作品
self.USER_POST = 'https://www.douyin.com/aweme/v1/web/aweme/post/?'
# 作品信息
self.POST_DETAIL = 'https://www.douyin.com/aweme/v1/web/aweme/detail/?'
# 用户喜欢A
self.USER_FAVORITE_A = 'https://www.douyin.com/aweme/v1/web/aweme/favorite/?'
# 用户喜欢B
self.USER_FAVORITE_B = 'https://www.iesdouyin.com/web/api/v2/aweme/like/?'
# 用户历史
self.USER_HISTORY = 'https://www.douyin.com/aweme/v1/web/history/read/?'
# 用户收藏
self.USER_COLLECTION = 'https://www.douyin.com/aweme/v1/web/aweme/listcollection/?'
# 用户评论
self.COMMENT = 'https://www.douyin.com/aweme/v1/web/comment/list/?'
# 首页朋友作品
self.FRIEND_FEED = 'https://www.douyin.com/aweme/v1/web/familiar/feed/?'
# 关注用户作品
self.FOLLOW_FEED = 'https://www.douyin.com/aweme/v1/web/follow/feed/?'
# X-Bogus Path
self.GET_XB_PATH = 'http://47.115.200.238/xg/path?url='
# X-Bogus Dict
self.GET_XB_DICT = 'http://47.115.200.238/xg/dict/?params='
# X-Bogus Login
self.GET_XB_LOGIN = 'http://47.115.200.238/login'
# X-Bogus Register
self.GET_XB_REGISTER = 'http://47.115.200.238/register'
# X-Bogus Token
self.GET_XB_TOKEN = 'http://47.115.200.238/token'
#######################################################################################
######################################### APP #########################################
# X-Gorgon Path
self.GET_XG_LOGIN = 'http://47.115.200.238/xog/path?url='
#######################################################################################
if __name__ == '__main__':
Urls()

View File

@ -15,34 +15,70 @@ Change Log :
import random
import re
from urllib.parse import urlencode, unquote
import json
import requests
from TikTokUrls import Urls
def generate_random_str(randomlength=16):
"""
根据传入长度产生随机字符串
"""
random_str = ''
base_str = 'ABCDEFGHIGKLMNOPQRSTUVWXYZabcdefghigklmnopqrstuvwxyz0123456789='
length = len(base_str) - 1
for _ in range(randomlength):
random_str += base_str[random.randint(0, length)]
return random_str
class Utils(object):
def __init__(self):
pass
def generate_random_str(self, randomlength=16):
"""
根据传入长度产生随机字符串
"""
random_str = ''
base_str = 'ABCDEFGHIGKLMNOPQRSTUVWXYZabcdefghigklmnopqrstuvwxyz0123456789='
length = len(base_str) - 1
for _ in range(randomlength):
random_str += base_str[random.randint(0, length)]
return random_str
def replaceStr(filenamestr: str):
"""
替换非法字符缩短字符长度使其能成为文件名
"""
# 匹配 汉字 字母 数字 空格
match = "([0-9A-Za-z\u4e00-\u9fa5 -._]+)"
def replaceStr(self, filenamestr: str):
"""
替换非法字符缩短字符长度使其能成为文件名
"""
# 匹配 汉字 字母 数字 空格
match = "([0-9A-Za-z\u4e00-\u9fa5 -._]+)"
result = re.findall(match, filenamestr)
result = re.findall(match, filenamestr)
result = "".join(result).strip()
if len(result) > 80:
result = result[:80]
# 去除前后空格
return result
result = "".join(result).strip()
if len(result) > 80:
result = result[:80]
# 去除前后空格
return result
def getXbogus(self, url, cookie=None, referer="https://www.douyin.com/"):
urls = Urls()
headers = {
"cookie": cookie,
"referer": referer,
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.0.0 Safari/537.36"
}
try:
if isinstance(url, dict):
params = eval(unquote(url, 'utf-8'))
url = urlencode(params, safe="=")
response = json.loads(requests.post(
urls.GET_XB_DICT + url,
headers=headers).text)
if isinstance(url, str):
url = url.replace('&', '%26')
response = json.loads(requests.post(
urls.GET_XB_PATH + url,
headers=headers).text)
else:
print('[ 提示 ]:传入的参数有误')
except Exception as e:
print('[ 错误 ]:%s' % e)
params = response["result"][0]["paramsencode"]
xb = response["result"][0]["X-Bogus"]["0"]
# print('[ 调试 ]:%s' % self.params)
return params #, xb
if __name__ == "__main__":

View File

@ -22,8 +22,8 @@ def work(share_link):
url = tk.getShareLink(share_link)
key_type, key = tk.getKey(url)
datadict = tk.getAwemeInfo(key)
return datadict
datanew, dataraw = tk.getAwemeInfo(key)
return datanew
app = Flask(__name__)