上传Ufile Auth-Server for Python

This commit is contained in:
Joshua Yin 2019-01-09 15:45:08 +08:00
parent fb67a48999
commit 14144fe98a
6 changed files with 150 additions and 0 deletions

View File

@ -0,0 +1 @@
__version__ = '1.0.0'

View File

@ -0,0 +1,55 @@
import json
from flask import Flask
from flask import request
from server.authroization import Authroizator
app = Flask(__name__)
@app.route('/applyAuth', methods=['POST'])
def applyAuth():
error = None
print("[request]:%s" % request)
headers = request.headers
print("[headers]:%s" % headers)
data = request.get_data()
print("[body]:%s" % data)
checkRes = checkRequestParams(data)
if not checkRes[0]:
return checkRes[1]
json_data = checkRes[1]
auth = Authroizator(json_data)
return auth.calculateAuthSignature()
@app.route('/applyPrivateUrlAuth', methods=['POST'])
def applyPrivateUrlAuth():
error = None
print("[request]:%s" % request)
headers = request.headers
print("[headers]:%s" % headers)
data = request.get_data()
print("[body]:%s" % data)
checkRes = checkRequestParams(data)
if not checkRes[0]:
return checkRes[1]
json_data = checkRes[1]
auth = Authroizator(json_data)
return auth.calculatePrivateUrlAuthroization()
def checkRequestParams(data):
# 参数为空
if data is None or data == b'':
return False, "Request body is null!"
return True, json.loads(data.decode('utf-8'))
if __name__ == '__main__':
app.run(host='localhost', port='8000', debug=True)

View File

@ -0,0 +1,91 @@
import base64
import hmac
from hashlib import sha1
PUBLIC_KEY = 'Kf6owXtNZSimr0rR7w9ew6Iclm1w73QB8+jUkiV7hXgBYtV5BNWN1LlNUko='
PRIVATE_KEY = '9K91tK7hcpCFL+90HwVk8lGUwJrqqjzfUouDD47RLrs9f1Umt4gXx7LWwB4kE7um'
class Authroizator:
json_data = {}
def __init__(self, json_data):
self.json_data = json_data
print("[json_data]:%s" % self.json_data)
def calculateAuthSignature(self):
check = self.__checkAuthRequiredParams()
if not check[0]:
return check[1]
method = self.json_data.get('method')
bucket = self.json_data.get('bucket')
content_type = self.json_data.get('content_type')
if content_type is None:
content_type = ''
content_md5 = self.json_data.get('content_md5')
if content_md5 is None:
content_md5 = ''
date = self.json_data.get('date')
if date is None:
date = ''
key = self.json_data.get('key')
if key is None:
key = ''
content = method + '\n' + content_md5 + '\n' + content_type + '\n' + date + '\n'
content += '/' + bucket + '/' + key
signature = self.__signature(content)
return 'UCloud ' + PUBLIC_KEY + ':' + signature
def calculatePrivateUrlAuthroization(self):
check = self.__checkPrivateUrlAuthRequiredParams()
if not check[0]:
return check[1]
method = self.json_data.get('method')
bucket = self.json_data.get('bucket')
key = self.json_data.get('key')
expires = str(self.json_data.get('expires'))
content = method + '\n\n\n' + expires + '\n'
content += '/' + bucket + '/' + key
return self.__signature(content)
def __checkAuthRequiredParams(self):
if self.json_data is None:
return False, "Request body json is null"
method = self.json_data.get('method')
if method is None or method == '':
return False, "'method' is required!"
bucket = self.json_data.get('bucket')
if bucket is None or bucket == '':
return False, "'bucket' is required!"
return True, ''
def __checkPrivateUrlAuthRequiredParams(self):
if self.json_data is None:
return False, "Request body json is null"
method = self.json_data.get('method')
if method is None or method == '':
return False, "'method' is required!"
bucket = self.json_data.get('bucket')
if bucket is None or bucket == '':
return False, "'bucket' is required!"
key = self.json_data.get('key')
if key is None or key == '':
return False, "'key' is required!"
expires = self.json_data.get('expires')
if expires is None or str(expires) == '':
return False, "'expires' is required!"
return True, ''
def __signature(self, content):
print(content)
hmac_res = hmac.new(PRIVATE_KEY.encode(), content.encode(), sha1).digest()
return base64.standard_b64encode(hmac_res).decode('utf-8')

View File

@ -0,0 +1,3 @@
home = /usr/local/bin
include-system-site-packages = true
version = 3.7.2