""" # api.py usage ` python api.py -a 127.0.0.1 -p 9880 -c GPT_SoVITS/configs/tts_infer.yaml ` ## 执行参数: `-a` - `绑定地址, 默认"127.0.0.1"` `-p` - `绑定端口, 默认9880, 可在 config.py 中指定` `-c` - `TTS配置文件路径, 默认"GPT_SoVITS/configs/tts_infer.yaml"` ## 调用: ### 推理 endpoint: `/tts` GET: `http://127.0.0.1:9880/tts?ref_audio_path=123.wav&prompt_text=一二三。&prompt_language=zh&text=先帝创业未半而中道崩殂,今天下三分,益州疲弊,此诚危急存亡之秋也。&text_language=zh` POST: ```json { "text": "", # str.(required) text to be synthesized "text_lang: "", # str.(required) language of the text to be synthesized "ref_audio_path": "", # str.(required) reference audio path. "prompt_text": "", # str.(optional) prompt text for the reference audio "prompt_lang": "", # str.(required) language of the prompt text for the reference audio "top_k": 5, # int.(optional) top k sampling "top_p": 1, # float.(optional) top p sampling "temperature": 1, # float.(optional) temperature for sampling "text_split_method": "cut0", # str.(optional) text split method, see text_segmentation_method.py for details. "batch_size": 1, # int.(optional) batch size for inference "batch_threshold": 0.75, # float.(optional) threshold for batch splitting. "split_bucket: True, # bool.(optional) whether to split the batch into multiple buckets. "speed_factor":1.0, # float.(optional) control the speed of the synthesized audio. "fragment_interval":0.3, # float.(optional) to control the interval of the audio fragment. "seed": -1, # int.(optional) random seed for reproducibility. "media_type": "wav", # str.(optional) media type of the output audio, support "wav", "raw", "ogg", "aac". "streaming_mode": False, # bool.(optional) whether to return a streaming response. } ``` RESP: 成功: 直接返回 wav 音频流, http code 200 失败: 返回包含错误信息的 json, http code 400 ### 命令控制 endpoint: `/control` command: "restart": 重新运行 "exit": 结束运行 GET: `http://127.0.0.1:9880/control?command=restart` POST: ```json { "command": "restart" } ``` RESP: 成功: 返回"success", http code 200 失败: 返回包含错误信息的 json, http code 400 ### 切换GPT模型 endpoint: `/set_gpt_weights` GET: `http://127.0.0.1:9880/set_gpt_weights?weights_path=GPT_SoVITS/pretrained_models/s1bert25hz-2kh-longer-epoch=68e-step=50232.ckpt` RESP: 成功: 返回"success", http code 200 失败: 返回包含错误信息的 json, http code 400 ### 切换Sovits模型 endpoint: `/set_sovits_weights` GET: `http://127.0.0.1:9880/set_sovits_weights?weights_path=GPT_SoVITS/pretrained_models/s2G488k.pth` RESP: 成功: 返回"success", http code 200 失败: 返回包含错误信息的 json, http code 400 ### """ from asyncio import sleep import os import sys now_dir = os.getcwd() sys.path.append(now_dir) sys.path.append("%s/GPT_SoVITS" % (now_dir)) import argparse import io # import os import subprocess # import sys import wave import signal import numpy as np import torch import soundfile as sf from fastapi import FastAPI, Request, HTTPException from fastapi.responses import StreamingResponse, JSONResponse from fastapi import FastAPI, UploadFile, File import uvicorn from io import BytesIO from tools.i18n.i18n import I18nAuto from GPT_SoVITS.TTS_infer_pack.TTS import TTS, TTS_Config from fastapi.responses import StreamingResponse from pydub import AudioSegment # print(sys.path) i18n = I18nAuto() parser = argparse.ArgumentParser(description="GPT-SoVITS api") # parser.add_argument("-api_c", "--api_config", type=str, default="GPT_SoVITS/configs/api_config.yaml", help="api_config路径") parser.add_argument("-tts_c", "--tts_config", type=str, default="GPT_SoVITS/configs/tts_infer.yaml", help="tts_infer路径") # parser.add_argument("-d", "--device", type=str, default="cpu", help="cuda / cpu / mps") parser.add_argument("-a", "--bind_addr", type=str, default="127.0.0.1", help="default: 127.0.0.1") parser.add_argument("-p", "--port", type=int, default="9880", help="default: 9880") args = parser.parse_args() config_path = args.tts_config # device = args.device port = args.port host = args.bind_addr argv = sys.argv if config_path in [None, ""]: config_path = "GPT-SoVITS/configs/tts_infer.yaml" tts_config=TTS_Config(config_path) tts_pipeline = TTS(tts_config) APP = FastAPI() ### modify from https://github.com/RVC-Boss/GPT-SoVITS/pull/894/files def pack_ogg(io_buffer:BytesIO, data:np.ndarray, rate:int): with sf.SoundFile(io_buffer, mode='w', samplerate=rate, channels=1, format='ogg') as audio_file: audio_file.write(data) return io_buffer def pack_raw(io_buffer:BytesIO, data:np.ndarray, rate:int): io_buffer.write(data.tobytes()) return io_buffer def pack_wav(io_buffer:BytesIO, data:np.ndarray, rate:int): io_buffer = BytesIO() sf.write(io_buffer, data, rate, format='wav') return io_buffer def pack_aac(io_buffer:BytesIO, data:np.ndarray, rate:int): process = subprocess.Popen([ 'ffmpeg', '-f', 's16le', # 输入16位有符号小端整数PCM '-ar', str(rate), # 设置采样率 '-ac', '1', # 单声道 '-i', 'pipe:0', # 从管道读取输入 '-c:a', 'aac', # 音频编码器为AAC '-b:a', '192k', # 比特率 '-vn', # 不包含视频 '-f', 'adts', # 输出AAC数据流格式 'pipe:1' # 将输出写入管道 ], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) out, _ = process.communicate(input=data.tobytes()) io_buffer.write(out) return io_buffer def pack_audio(io_buffer:BytesIO, data:np.ndarray, rate:int, media_type:str): if media_type == "ogg": io_buffer = pack_ogg(io_buffer, data, rate) elif media_type == "aac": io_buffer = pack_aac(io_buffer, data, rate) elif media_type == "wav": io_buffer = pack_wav(io_buffer, data, rate) else: io_buffer = pack_raw(io_buffer, data, rate) io_buffer.seek(0) return io_buffer # from https://huggingface.co/spaces/coqui/voice-chat-with-mistral/blob/main/app.py def wave_header_chunk(frame_input=b"", channels=1, sample_width=2, sample_rate=32000): # This will create a wave header then append the frame input # It should be first on a streaming wav file # Other frames better should not have it (else you will hear some artifacts each chunk start) wav_buf = io.BytesIO() with wave.open(wav_buf, "wb") as vfout: vfout.setnchannels(channels) vfout.setsampwidth(sample_width) vfout.setframerate(sample_rate) vfout.writeframes(frame_input) wav_buf.seek(0) return wav_buf.read() def handle_control(command:str): if command == "restart": os.execl(sys.executable, sys.executable, *argv) elif command == "exit": os.kill(os.getpid(), signal.SIGTERM) exit(0) def tts_handle(req:dict): """ Text to speech handler. Args: req (dict): { "text": "", # str.(required) text to be synthesized "text_lang: "", # str.(required) language of the text to be synthesized "ref_audio_path": "", # str.(required) reference audio path "prompt_text": "", # str.(optional) prompt text for the reference audio "prompt_lang": "", # str.(required) language of the prompt text for the reference audio "top_k": 5, # int. top k sampling "top_p": 1, # float. top p sampling "temperature": 1, # float. temperature for sampling "text_split_method": "cut0", # str. text split method, see text_segmentation_method.py for details. "batch_size": 1, # int. batch size for inference "batch_threshold": 0.75, # float. threshold for batch splitting. "split_bucket: True, # bool. whether to split the batch into multiple buckets. "speed_factor":1.0, # float. control the speed of the synthesized audio. "fragment_interval":0.3, # float. to control the interval of the audio fragment. "seed": -1, # int. random seed for reproducibility. "media_type": "wav", # str. media type of the output audio, support "wav", "raw", "ogg", "aac". "streaming_mode": False, # bool. whether to return a streaming response. } returns: StreamingResponse: audio stream response. """ text:str = req.get("text", "") text_lang:str = req.get("text_lang", "") ref_audio_path:str = req.get("ref_audio_path", "") streaming_mode = req.get("streaming_mode", False) media_type = req.get("media_type", "wav") if streaming_mode: req["return_fragment"] = True if ref_audio_path in [None, ""]: return JSONResponse(status_code=400, content={"message": "ref_audio_path is required"}) if text in [None, ""]: return JSONResponse(status_code=400, content={"message": "text is required"}) if (text_lang in [None, ""]) or (text_lang.lower() not in ["auto", "en", "zh", "ja", "all_zh", "all_ja"]): return JSONResponse(status_code=400, content={"message": "text_language is required"}) if media_type not in ["wav", "raw", "ogg", "aac"]: return JSONResponse(status_code=400, content={"message": "media_type is not supported"}) try: tts_generator=tts_pipeline.run(req) if streaming_mode: if media_type == "wav": def streaming_generator(tts_generator): yield wave_header_chunk() for sr, chunk in tts_generator: yield pack_audio(BytesIO(), chunk, sr, "raw").getvalue() else: def streaming_generator(tts_generator): for sr, chunk in tts_generator: yield pack_audio(BytesIO(), chunk, sr, media_type).getvalue() _media_type = "" if media_type in ["wav", "raw"]: _media_type = f"audio/x-{media_type}" else: _media_type = f"audio/{media_type}" return StreamingResponse(streaming_generator(tts_generator), media_type=_media_type) else: audio_buffer = BytesIO() sr, audio_data = next(tts_generator) audio_buffer = pack_audio(audio_buffer, audio_data, sr, media_type) return StreamingResponse(audio_buffer, media_type=f"audio/{media_type}") except Exception as e: return JSONResponse(status_code=400, content={"message": f"tts failed", "Exception": str(e)}) @APP.get("/control") async def control(command: str = None): if command is None: return JSONResponse(status_code=400, content={"message": "command is required"}) handle_control(command) @APP.get("/tts") async def tts_get_endpoint( text: str = None, text_lang: str = None, ref_audio_path: str = None, prompt_lang: str = None, prompt_text: str = "", top_k:int = 5, top_p:float = 1, temperature:float = 1, text_split_method:str = "cut0", batch_size:int = 1, batch_threshold:float = 0.75, split_bucket:bool = True, speed_factor:float = 1.0, fragment_interval:float = 0.3, seed:int = -1, media_type:str = "wav", streaming_mode:bool = False, ): req = { "text": text, "text_lang": text_lang.lower(), "ref_audio_path": ref_audio_path, "prompt_text": prompt_text, "prompt_lang": prompt_lang.lower(), "top_k": top_k, "top_p": top_p, "temperature": temperature, "text_split_method": text_split_method, "batch_size":int(batch_size), "batch_threshold":float(batch_threshold), "speed_factor":float(speed_factor), "split_bucket":split_bucket, "fragment_interval":fragment_interval, "seed":seed, "media_type":media_type, "streaming_mode":streaming_mode, } return tts_handle(req) @APP.post("/tts") async def tts_post_endpoint(request: Request): req = await request.json() return tts_handle(req) @APP.get("/set_refer_audio") async def set_refer_aduio(refer_audio_path: str = None): try: tts_pipeline.set_ref_audio(refer_audio_path) except Exception as e: return JSONResponse(status_code=400, content={"message": f"set refer audio failed", "Exception": str(e)}) return JSONResponse(status_code=200, content={"message": "success"}) # @APP.post("/set_refer_audio") # async def set_refer_aduio_post(audio_file: UploadFile = File(...)): # try: # # 检查文件类型,确保是音频文件 # if not audio_file.content_type.startswith("audio/"): # return JSONResponse(status_code=400, content={"message": "file type is not supported"}) # os.makedirs("uploaded_audio", exist_ok=True) # save_path = os.path.join("uploaded_audio", audio_file.filename) # # 保存音频文件到服务器上的一个目录 # with open(save_path , "wb") as buffer: # buffer.write(await audio_file.read()) # tts_pipeline.set_ref_audio(save_path) # except Exception as e: # return JSONResponse(status_code=400, content={"message": f"set refer audio failed", "Exception": str(e)}) # return JSONResponse(status_code=200, content={"message": "success"}) @APP.get("/set_gpt_weights") async def set_gpt_weights(weights_path: str = None): try: if weights_path in ["", None]: return JSONResponse(status_code=400, content={"message": "gpt weight path is required"}) tts_pipeline.init_t2s_weights(weights_path) except Exception as e: return JSONResponse(status_code=400, content={"message": f"change gpt weight failed", "Exception": str(e)}) return JSONResponse(status_code=200, content={"message": "success"}) @APP.get("/set_sovits_weights") async def set_sovits_weights(weights_path: str = None): try: if weights_path in ["", None]: return JSONResponse(status_code=400, content={"message": "sovits weight path is required"}) tts_pipeline.init_vits_weights(weights_path) except Exception as e: return JSONResponse(status_code=400, content={"message": f"change sovits weight failed", "Exception": str(e)}) return JSONResponse(status_code=200, content={"message": "success"}) if __name__ == "__main__": try: uvicorn.run(APP, host=host, port=port, workers=1) except KeyboardInterrupt: os.kill(os.getpid(), signal.SIGTERM) exit(0)