From 16309ff44c0808f6cc596bc214e65eac563ca3cd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=88=98=E6=82=A6?= Date: Sat, 3 Feb 2024 11:46:58 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E6=89=8B=E5=8A=A8?= =?UTF-8?q?=E8=AF=AD=E4=B9=89=E5=AD=97=E5=B9=95=E8=AF=AD=E9=9F=B3=E5=88=87?= =?UTF-8?q?=E5=88=86=E5=B7=A5=E5=85=B7(=E5=A4=9A=E8=A7=92=E8=89=B2)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 添加手动语义字幕语音切分工具(多角色) --- webui.py | 97 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 97 insertions(+) diff --git a/webui.py b/webui.py index 771893ee..1890708f 100644 --- a/webui.py +++ b/webui.py @@ -6,6 +6,14 @@ import platform import psutil import signal +from modelscope.pipelines import pipeline +from modelscope.utils.constant import Tasks +from videoclipper import VideoClipper +import librosa +import soundfile as sf +import numpy as np +import random + warnings.filterwarnings("ignore") torch.manual_seed(233333) tmp = os.path.join(now_dir, "TEMP") @@ -64,6 +72,60 @@ gpu_infos = [] mem = [] if_gpu_ok = False +# 字幕语音切分 +inference_pipeline = pipeline( + task=Tasks.auto_speech_recognition, + model='damo/speech_paraformer-large-vad-punc_asr_nat-zh-cn-16k-common-vocab8404-pytorch', + vad_model='damo/speech_fsmn_vad_zh-cn-16k-common-pytorch', + punc_model='damo/punc_ct-transformer_zh-cn-common-vocab272727-pytorch', + ncpu=16, +) +sd_pipeline = pipeline( + task='speaker-diarization', + model='damo/speech_campplus_speaker-diarization_common', + model_revision='v1.0.0' +) +audio_clipper = VideoClipper(inference_pipeline, sd_pipeline) + +def audio_change(audio): + + print(audio) + + sf.write('./output_44100.wav', audio[1], audio[0], 'PCM_24') + + y, sr = librosa.load('./output_44100.wav', sr=16000) + + # sf.write('./output_16000.wav', y, sr, 'PCM_24') + + # arr = np.array(y, dtype=np.int32) + + # y, sr = librosa.load('./output_16000.wav', sr=16000) + + audio_data = np.array(y) + + print(y, sr) + + return (16000,audio_data) + +def write_list(text,audio): + + random_number = random.randint(10000, 99999) + + wav_name = f'./output/slicer_opt/sample_{random_number}.wav' + + sf.write(wav_name, audio[1], audio[0], 'PCM_24') + + text = text.replace("#",",") + + with open("./output/asr_opt/slicer_opt.list","a",encoding="utf-8")as f:f.write(f"\n{wav_name}|slicer_opt|zh|{text}") + +def audio_recog(audio_input, sd_switch): + print(audio_input) + return audio_clipper.recog(audio_input, sd_switch) + +def audio_clip(dest_text, audio_spk_input, start_ost, end_ost, state): + return audio_clipper.clip(dest_text, start_ost, end_ost, state, dest_spk=audio_spk_input) + # 判断是否有能用来训练和加速推理的N卡 if torch.cuda.is_available() or ngpu != 0: for i in range(ngpu): @@ -648,6 +710,41 @@ with gr.Blocks(title="GPT-SoVITS WebUI") as app: with gr.Row(): if_uvr5 = gr.Checkbox(label=i18n("是否开启UVR5-WebUI"),show_label=True) uvr5_info = gr.Textbox(label=i18n("UVR5进程输出信息")) + gr.Markdown(value=i18n("0.5b-手动语义字幕语音切分工具")) + audio_state = gr.State() + with gr.Row(): + with gr.Column(): + # oaudio_input = gr.Audio(label="🔊音频输入 44100hz Audio Input",type="filepath") + # rec_audio = gr.Button("👂重新采样") + audio_input = gr.Audio(label="🔊音频输入 16000hz Audio Input") + audio_sd_switch = gr.Radio(["no", "yes"], label="👥是否区分说话人 Recognize Speakers", value='no') + recog_button1 = gr.Button("👂识别 Recognize") + audio_text_output = gr.Textbox(label="✏️识别结果 Recognition Result") + audio_srt_output = gr.Textbox(label="📖SRT字幕内容 RST Subtitles") + with gr.Column(): + audio_text_input = gr.Textbox(label="✏️待裁剪文本 Text to Clip (多段文本使用'#'连接)") + audio_spk_input = gr.Textbox(label="✏️待裁剪说话人 Speaker to Clip (多个说话人使用'#'连接)") + with gr.Row(): + audio_start_ost = gr.Slider(minimum=-500, maximum=1000, value=0, step=50, label="⏪开始位置偏移 Start Offset (ms)") + audio_end_ost = gr.Slider(minimum=-500, maximum=1000, value=0, step=50, label="⏩结束位置偏移 End Offset (ms)") + with gr.Row(): + clip_button1 = gr.Button("✂️裁剪 Clip") + write_button1 = gr.Button("写入转写文件") + audio_output = gr.Audio(label="🔊裁剪结果 Audio Clipped") + audio_mess_output = gr.Textbox(label="ℹ️裁剪信息 Clipping Log") + audio_srt_clip_output = gr.Textbox(label="📖裁剪部分SRT字幕内容 Clipped RST Subtitles") + + audio_input.change(inputs=audio_input, outputs=audio_input, fn=audio_change) + + write_button1.click(write_list,[audio_text_input,audio_output],[]) + + # rec_audio.click(re_write,[oaudio_input],[rec_audio]) + recog_button1.click(audio_recog, + inputs=[audio_input, audio_sd_switch], + outputs=[audio_text_output, audio_srt_output, audio_state]) + clip_button1.click(audio_clip, + inputs=[audio_text_input, audio_spk_input, audio_start_ost, audio_end_ost, audio_state], + outputs=[audio_output, audio_mess_output, audio_srt_clip_output]) gr.Markdown(value=i18n("0b-语音切分工具")) with gr.Row(): with gr.Row(): From 66341adf5583a5a762e2c45fefc054617d8b5ea9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=88=98=E6=82=A6?= Date: Sat, 3 Feb 2024 11:48:21 +0800 Subject: [PATCH 2/2] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E6=89=8B=E5=8A=A8?= =?UTF-8?q?=E8=AF=AD=E4=B9=89=E5=AD=97=E5=B9=95=E8=AF=AD=E9=9F=B3=E5=88=87?= =?UTF-8?q?=E5=88=86=E5=B7=A5=E5=85=B7(=E5=A4=9A=E8=A7=92=E8=89=B2)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 添加手动语义字幕语音切分工具(多角色) --- argparse_tools.py | 83 ++++++++++++ subtitle_utils.py | 130 ++++++++++++++++++ trans_utils.py | 82 ++++++++++++ videoclipper.py | 329 ++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 624 insertions(+) create mode 100644 argparse_tools.py create mode 100644 subtitle_utils.py create mode 100644 trans_utils.py create mode 100644 videoclipper.py diff --git a/argparse_tools.py b/argparse_tools.py new file mode 100644 index 00000000..fc621ed4 --- /dev/null +++ b/argparse_tools.py @@ -0,0 +1,83 @@ +import argparse +from pathlib import Path + +import yaml +import sys + + +class ArgumentParser(argparse.ArgumentParser): + """Simple implementation of ArgumentParser supporting config file + + This class is originated from https://github.com/bw2/ConfigArgParse, + but this class is lack of some features that it has. + + - Not supporting multiple config files + - Automatically adding "--config" as an option. + - Not supporting any formats other than yaml + - Not checking argument type + + """ + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.add_argument("--config", help="Give config file in yaml format") + + def parse_known_args(self, args=None, namespace=None): + # Once parsing for setting from "--config" + _args, _ = super().parse_known_args(args, namespace) + if _args.config is not None: + if not Path(_args.config).exists(): + self.error(f"No such file: {_args.config}") + + with open(_args.config, "r", encoding="utf-8") as f: + d = yaml.safe_load(f) + if not isinstance(d, dict): + self.error("Config file has non dict value: {_args.config}") + + for key in d: + for action in self._actions: + if key == action.dest: + break + else: + self.error(f"unrecognized arguments: {key} (from {_args.config})") + + # NOTE(kamo): Ignore "--config" from a config file + # NOTE(kamo): Unlike "configargparse", this module doesn't check type. + # i.e. We can set any type value regardless of argument type. + self.set_defaults(**d) + return super().parse_known_args(args, namespace) + + +def get_commandline_args(): + extra_chars = [ + " ", + ";", + "&", + "(", + ")", + "|", + "^", + "<", + ">", + "?", + "*", + "[", + "]", + "$", + "`", + '"', + "\\", + "!", + "{", + "}", + ] + + # Escape the extra characters for shell + argv = [ + arg.replace("'", "'\\''") + if all(char not in arg for char in extra_chars) + else "'" + arg.replace("'", "'\\''") + "'" + for arg in sys.argv + ] + + return sys.executable + " " + " ".join(argv) \ No newline at end of file diff --git a/subtitle_utils.py b/subtitle_utils.py new file mode 100644 index 00000000..705f13e0 --- /dev/null +++ b/subtitle_utils.py @@ -0,0 +1,130 @@ +def time_convert(ms): + ms = int(ms) + tail = ms % 1000 + s = ms // 1000 + mi = s // 60 + s = s % 60 + h = mi // 60 + mi = mi % 60 + h = "00" if h == 0 else str(h) + mi = "00" if mi == 0 else str(mi) + s = "00" if s == 0 else str(s) + tail = str(tail) + if len(h) == 1: h = '0' + h + if len(mi) == 1: mi = '0' + mi + if len(s) == 1: s = '0' + s + return "{}:{}:{},{}".format(h, mi, s, tail) + + +class Text2SRT(): + def __init__(self, text_seg, ts_list, offset=0): + self.token_list = [i for i in text_seg.split() if len(i)] + self.ts_list = ts_list + start, end = ts_list[0][0] - offset, ts_list[-1][1] - offset + self.start_sec, self.end_sec = start, end + self.start_time = time_convert(start) + self.end_time = time_convert(end) + def text(self): + res = "" + for word in self.token_list: + if '\u4e00' <= word <= '\u9fff': + res += word + else: + res += " " + word + return res + def len(self): + return len(self.token_list) + def srt(self, acc_ost=0.0): + return "{} --> {}\n{}\n".format( + time_convert(self.start_sec+acc_ost*1000), + time_convert(self.end_sec+acc_ost*1000), + self.text()) + def time(self, acc_ost=0.0): + return (self.start_sec/1000+acc_ost, self.end_sec/1000+acc_ost) + +def distribute_spk(sentence_list, sd_time_list): + sd_sentence_list = [] + for d in sentence_list: + sentence_start = d['ts_list'][0][0] + sentence_end = d['ts_list'][-1][1] + sentence_spk = 0 + max_overlap = 0 + for sd_time in sd_time_list: + spk_st, spk_ed, spk = sd_time + spk_st = spk_st*1000 + spk_ed = spk_ed*1000 + overlap = max( + min(sentence_end, spk_ed) - max(sentence_start, spk_st), 0) + if overlap > max_overlap: + max_overlap = overlap + sentence_spk = spk + d['spk'] = sentence_spk + sd_sentence_list.append(d) + return sd_sentence_list + +def generate_srt(sentence_list): + srt_total = '' + for i, d in enumerate(sentence_list): + t2s = Text2SRT(d['text_seg'], d['ts_list']) + if 'spk' in d: + srt_total += "{} spk{}\n{}".format(i, d['spk'], t2s.srt()) + else: + srt_total += "{}\n{}".format(i, t2s.srt()) + return srt_total + +def generate_srt_clip(sentence_list, start, end, begin_index=0, time_acc_ost=0.0): + start, end = int(start * 1000), int(end * 1000) + srt_total = '' + cc = 1 + begin_index + subs = [] + for i, d in enumerate(sentence_list): + if d['ts_list'][-1][1] <= start: + continue + if d['ts_list'][0][0] >= end: + break + # parts in between + if (d['ts_list'][-1][1] <= end and d['ts_list'][0][0] > start) or (d['ts_list'][-1][1] == end and d['ts_list'][0][0] == start): + t2s = Text2SRT(d['text_seg'], d['ts_list'], offset=start) + srt_total += "{}\n{}".format(cc, t2s.srt(time_acc_ost)) + subs.append((t2s.time(time_acc_ost), t2s.text())) + cc += 1 + continue + if d['ts_list'][0][0] <= start: + if not d['ts_list'][-1][1] > end: + for j, ts in enumerate(d['ts_list']): + if ts[1] > start: + break + _text = " ".join(d['text_seg'].split()[j:]) + _ts = d['ts_list'][j:] + else: + for j, ts in enumerate(d['ts_list']): + if ts[1] > start: + _start = j + break + for j, ts in enumerate(d['ts_list']): + if ts[1] > end: + _end = j + break + _text = " ".join(d['text_seg'].split()[_start:_end]) + _ts = d['ts_list'][_start:_end] + if len(ts): + t2s = Text2SRT(_text, _ts, offset=start) + srt_total += "{}\n{}".format(cc, t2s.srt(time_acc_ost)) + subs.append((t2s.time(time_acc_ost), t2s.text())) + cc += 1 + continue + if d['ts_list'][-1][1] > end: + for j, ts in enumerate(d['ts_list']): + if ts[1] > end: + break + _text = " ".join(d['text_seg'].split()[:j]) + _ts = d['ts_list'][:j] + if len(_ts): + t2s = Text2SRT(_text, _ts, offset=start) + srt_total += "{}\n{}".format(cc, t2s.srt(time_acc_ost)) + subs.append( + (t2s.time(time_acc_ost), t2s.text()) + ) + cc += 1 + continue + return srt_total, subs, cc diff --git a/trans_utils.py b/trans_utils.py new file mode 100644 index 00000000..a630249c --- /dev/null +++ b/trans_utils.py @@ -0,0 +1,82 @@ +PUNC_LIST = [',', '。', '!', '?', '、'] + + +def pre_proc(text): + res = '' + for i in range(len(text)): + if text[i] in PUNC_LIST: + continue + if '\u4e00' <= text[i] <= '\u9fff': + if len(res) and res[-1] != " ": + res += ' ' + text[i]+' ' + else: + res += text[i]+' ' + else: + res += text[i] + if res[-1] == ' ': + res = res[:-1] + return res + +def proc(raw_text, timestamp, dest_text): + # simple matching + ld = len(dest_text.split()) + mi, ts = [], [] + offset = 0 + while True: + fi = raw_text.find(dest_text, offset, len(raw_text)) + # import pdb; pdb.set_trace() + ti = raw_text[:fi].count(' ') + if fi == -1: + break + offset = fi + ld + mi.append(fi) + ts.append([timestamp[ti][0]*16, timestamp[ti+ld-1][1]*16]) + # import pdb; pdb.set_trace() + return ts + +def proc_spk(dest_spk, sd_sentences): + ts = [] + for d in sd_sentences: + d_start = d['ts_list'][0][0] + d_end = d['ts_list'][-1][1] + spkid=dest_spk[3:] + if str(d['spk']) == spkid and d_end-d_start>999: + ts.append([d['start']*16, d['end']*16]) + return ts + +def generate_vad_data(data, sd_sentences, sr=16000): + assert len(data.shape) == 1 + vad_data = [] + for d in sd_sentences: + d_start = round(d['ts_list'][0][0]/1000, 2) + d_end = round(d['ts_list'][-1][1]/1000, 2) + vad_data.append([d_start, d_end, data[int(d_start * sr):int(d_end * sr)]]) + return vad_data + +def write_state(output_dir, state): + for key in ['/recog_res_raw', '/timestamp', '/sentences', '/sd_sentences']: + with open(output_dir+key, 'w') as fout: + fout.write(str(state[key[1:]])) + if 'sd_sentences' in state: + with open(output_dir+'/sd_sentences', 'w') as fout: + fout.write(str(state['sd_sentences'])) + +import os +def load_state(output_dir): + state = {} + with open(output_dir+'/recog_res_raw') as fin: + line = fin.read() + state['recog_res_raw'] = line + with open(output_dir+'/timestamp') as fin: + line = fin.read() + state['timestamp'] = eval(line) + with open(output_dir+'/sentences') as fin: + line = fin.read() + state['sentences'] = eval(line) + if os.path.exists(output_dir+'/sd_sentences'): + with open(output_dir+'/sd_sentences') as fin: + line = fin.read() + state['sd_sentences'] = eval(line) + return state + + \ No newline at end of file diff --git a/videoclipper.py b/videoclipper.py new file mode 100644 index 00000000..66cc79cc --- /dev/null +++ b/videoclipper.py @@ -0,0 +1,329 @@ +import sys +import copy +import librosa +import logging +import argparse +import numpy as np +import soundfile as sf +import moviepy.editor as mpy +# from modelscope.pipelines import pipeline +# from modelscope.utils.constant import Tasks +from subtitle_utils import generate_srt, generate_srt_clip, distribute_spk +from trans_utils import pre_proc, proc, write_state, load_state, proc_spk, generate_vad_data +from argparse_tools import ArgumentParser, get_commandline_args + +from moviepy.editor import * +from moviepy.video.tools.subtitles import SubtitlesClip + + +class VideoClipper(): + def __init__(self, asr_pipeline, sd_pipeline=None): + logging.warning("Initializing VideoClipper.") + self.asr_pipeline = asr_pipeline + self.sd_pipeline = sd_pipeline + + def recog(self, audio_input, sd_switch='no', state=None): + if state is None: + state = {} + sr, data = audio_input + assert sr == 16000, "16kHz sample rate required, {} given.".format(sr) + if len(data.shape) == 2: # multi-channel wav input + logging.warning("Input wav shape: {}, only first channel reserved.").format(data.shape) + data = data[:,0] + state['audio_input'] = (sr, data) + data = data.astype(np.float64) + rec_result = self.asr_pipeline(audio_in=data) + if sd_switch == 'yes': + vad_data = generate_vad_data(data.astype(np.float32), rec_result['sentences'], sr) + sd_result = self.sd_pipeline(audio=vad_data, batch_size=1) + rec_result['sd_sentences'] = distribute_spk(rec_result['sentences'], sd_result['text']) + res_srt = generate_srt(rec_result['sd_sentences']) + state['sd_sentences'] = rec_result['sd_sentences'] + else: + res_srt = generate_srt(rec_result['sentences']) + state['recog_res_raw'] = rec_result['text_postprocessed'] + state['timestamp'] = rec_result['time_stamp'] + state['sentences'] = rec_result['sentences'] + res_text = rec_result['text'] + return res_text, res_srt, state + + def clip(self, dest_text, start_ost, end_ost, state, dest_spk=None): + # get from state + audio_input = state['audio_input'] + recog_res_raw = state['recog_res_raw'] + timestamp = state['timestamp'] + sentences = state['sentences'] + sr, data = audio_input + data = data.astype(np.float64) + + all_ts = [] + if dest_spk is None or dest_spk == '' or 'sd_sentences' not in state: + for _dest_text in dest_text.split('#'): + _dest_text = pre_proc(_dest_text) + ts = proc(recog_res_raw, timestamp, _dest_text) + for _ts in ts: all_ts.append(_ts) + else: + for _dest_spk in dest_spk.split('#'): + ts = proc_spk(_dest_spk, state['sd_sentences']) + for _ts in ts: all_ts.append(_ts) + ts = all_ts + # ts.sort() + srt_index = 0 + clip_srt = "" + if len(ts): + start, end = ts[0] + start = min(max(0, start+start_ost*16), len(data)) + end = min(max(0, end+end_ost*16), len(data)) + res_audio = data[start:end] + start_end_info = "from {} to {}".format(start/16000, end/16000) + srt_clip, _, srt_index = generate_srt_clip(sentences, start/16000.0, end/16000.0, begin_index=srt_index) + clip_srt += srt_clip + for _ts in ts[1:]: # multiple sentence input or multiple output matched + start, end = _ts + start = min(max(0, start+start_ost*16), len(data)) + end = min(max(0, end+end_ost*16), len(data)) + start_end_info += ", from {} to {}".format(start, end) + res_audio = np.concatenate([res_audio, data[start+start_ost*16:end+end_ost*16]], -1) + srt_clip, _, srt_index = generate_srt_clip(sentences, start/16000.0, end/16000.0, begin_index=srt_index-1) + clip_srt += srt_clip + if len(ts): + message = "{} periods found in the speech: ".format(len(ts)) + start_end_info + else: + message = "No period found in the speech, return raw speech. You may check the recognition result and try other destination text." + res_audio = data + return (sr, res_audio), message, clip_srt + + def video_recog(self, vedio_filename, sd_switch='no'): + vedio_filename = vedio_filename + clip_video_file = vedio_filename[:-4] + '_clip.mp4' + video = mpy.VideoFileClip(vedio_filename) + audio_file = vedio_filename[:-3] + 'wav' + video.audio.write_audiofile(audio_file) + wav = librosa.load(audio_file, sr=16000)[0] + state = { + 'vedio_filename': vedio_filename, + 'clip_video_file': clip_video_file, + 'video': video, + } + # res_text, res_srt = self.recog((16000, wav), state) + return self.recog((16000, wav), sd_switch, state) + + def video_clip(self, dest_text, start_ost, end_ost, state, font_size=32, font_color='white', add_sub=False, dest_spk=None): + # get from state + recog_res_raw = state['recog_res_raw'] + timestamp = state['timestamp'] + sentences = state['sentences'] + video = state['video'] + clip_video_file = state['clip_video_file'] + vedio_filename = state['vedio_filename'] + + all_ts = [] + srt_index = 0 + if dest_spk is None or dest_spk == '' or 'sd_sentences' not in state: + for _dest_text in dest_text.split('#'): + _dest_text = pre_proc(_dest_text) + ts = proc(recog_res_raw, timestamp, _dest_text) + for _ts in ts: all_ts.append(_ts) + else: + for _dest_spk in dest_spk.split('#'): + ts = proc_spk(_dest_spk, state['sd_sentences']) + for _ts in ts: all_ts.append(_ts) + time_acc_ost = 0.0 + ts = all_ts + # ts.sort() + clip_srt = "" + if len(ts): + start, end = ts[0][0] / 16000, ts[0][1] / 16000 + srt_clip, subs, srt_index = generate_srt_clip(sentences, start, end, begin_index=srt_index, time_acc_ost=time_acc_ost) + start, end = start+start_ost/1000.0, end+end_ost/1000.0 + video_clip = video.subclip(start, end) + start_end_info = "from {} to {}".format(start, end) + clip_srt += srt_clip + if add_sub: + generator = lambda txt: TextClip(txt, font='./font/STHeitiMedium.ttc', fontsize=font_size, color=font_color) + subtitles = SubtitlesClip(subs, generator) + video_clip = CompositeVideoClip([video_clip, subtitles.set_pos(('center','bottom'))]) + concate_clip = [video_clip] + time_acc_ost += end+end_ost/1000.0 - (start+start_ost/1000.0) + for _ts in ts[1:]: + start, end = _ts[0] / 16000, _ts[1] / 16000 + srt_clip, subs, srt_index = generate_srt_clip(sentences, start, end, begin_index=srt_index-1, time_acc_ost=time_acc_ost) + chi_subs = [] + sub_starts = subs[0][0][0] + for sub in subs: + chi_subs.append(((sub[0][0]-sub_starts, sub[0][1]-sub_starts), sub[1])) + start, end = start+start_ost/1000.0, end+end_ost/1000.0 + _video_clip = video.subclip(start, end) + start_end_info += ", from {} to {}".format(start, end) + clip_srt += srt_clip + if add_sub: + generator = lambda txt: TextClip(txt, font='./font/STHeitiMedium.ttc', fontsize=font_size, color=font_color) + subtitles = SubtitlesClip(chi_subs, generator) + _video_clip = CompositeVideoClip([_video_clip, subtitles.set_pos(('center','bottom'))]) + # _video_clip.write_videofile("debug.mp4", audio_codec="aac") + concate_clip.append(copy.copy(_video_clip)) + time_acc_ost += end+end_ost/1000.0 - (start+start_ost/1000.0) + message = "{} periods found in the audio: ".format(len(ts)) + start_end_info + logging.warning("Concating...") + if len(concate_clip) > 1: + video_clip = concatenate_videoclips(concate_clip) + video_clip.write_videofile(clip_video_file, audio_codec="aac") + else: + clip_video_file = vedio_filename + message = "No period found in the audio, return raw speech. You may check the recognition result and try other destination text." + srt_clip = '' + return clip_video_file, message, clip_srt + + +def get_parser(): + parser = ArgumentParser( + description="ClipVideo Argument", + formatter_class=argparse.ArgumentDefaultsHelpFormatter, + ) + parser.add_argument( + "--stage", + type=int, + choices=(1, 2), + help="Stage, 0 for recognizing and 1 for clipping", + required=True + ) + parser.add_argument( + "--file", + type=str, + default=None, + help="Input file path", + required=True + ) + parser.add_argument( + "--sd_switch", + type=str, + choices=("no", "yes"), + default="no", + help="Trun on the speaker diarization or not", + ) + parser.add_argument( + "--output_dir", + type=str, + default='./output', + help="Output files path", + ) + parser.add_argument( + "--dest_text", + type=str, + default=None, + help="Destination text string for clipping", + ) + parser.add_argument( + "--dest_spk", + type=str, + default=None, + help="Destination spk id for clipping", + ) + parser.add_argument( + "--start_ost", + type=int, + default=0, + help="Offset time in ms at beginning for clipping" + ) + parser.add_argument( + "--end_ost", + type=int, + default=0, + help="Offset time in ms at ending for clipping" + ) + parser.add_argument( + "--output_file", + type=str, + default=None, + help="Output file path" + ) + return parser + + +def runner(stage, file, sd_switch, output_dir, dest_text, dest_spk, start_ost, end_ost, output_file, config=None): + audio_suffixs = ['wav'] + video_suffixs = ['mp4'] + if file[-3:] in audio_suffixs: + mode = 'audio' + elif file[-3:] in video_suffixs: + mode = 'video' + else: + logging.error("Unsupported file format: {}".format(file)) + while output_dir.endswith('/'): + output_dir = output_dir[:-1] + if stage == 1: + from modelscope.pipelines import pipeline + from modelscope.utils.constant import Tasks + # initialize modelscope asr pipeline + logging.warning("Initializing modelscope asr pipeline.") + inference_pipeline = pipeline( + task=Tasks.auto_speech_recognition, + model='damo/speech_paraformer-large-vad-punc_asr_nat-zh-cn-16k-common-vocab8404-pytorch', + vad_model='damo/speech_fsmn_vad_zh-cn-16k-common-pytorch', + punc_model='damo/punc_ct-transformer_zh-cn-common-vocab272727-pytorch', + output_dir=output_dir, + ) + sd_pipeline = pipeline( + task='speaker-diarization', + model='damo/speech_campplus_speaker-diarization_common', + model_revision='v1.0.0' + ) + audio_clipper = VideoClipper(inference_pipeline, sd_pipeline) + if mode == 'audio': + logging.warning("Recognizing audio file: {}".format(file)) + wav, sr = librosa.load(file, sr=16000) + res_text, res_srt, state = audio_clipper.recog((sr, wav), sd_switch) + if mode == 'video': + logging.warning("Recognizing video file: {}".format(file)) + res_text, res_srt, state = audio_clipper.video_recog(file, sd_switch) + total_srt_file = output_dir + '/total.srt' + with open(total_srt_file, 'w') as fout: + fout.write(res_srt) + logging.warning("Write total subtitile to {}".format(total_srt_file)) + write_state(output_dir, state) + logging.warning("Recognition successed. You can copy the text segment from below and use stage 2.") + print(res_text) + if stage == 2: + audio_clipper = VideoClipper(None) + if mode == 'audio': + state = load_state(output_dir) + wav, sr = librosa.load(file, sr=16000) + state['audio_input'] = (sr, wav) + (sr, audio), message, srt_clip = audio_clipper.clip(dest_text, start_ost, end_ost, state, dest_spk=dest_spk) + if output_file is None: + output_file = output_dir + '/result.wav' + clip_srt_file = output_file[:-3] + 'srt' + logging.warning(message) + sf.write(output_file, audio, 16000) + assert output_file.endswith('.wav'), "output_file must ends with '.wav'" + logging.warning("Save clipped wav file to {}".format(output_file)) + with open(clip_srt_file, 'w') as fout: + fout.write(srt_clip) + logging.warning("Write clipped subtitile to {}".format(clip_srt_file)) + if mode == 'video': + state = load_state(output_dir) + state['vedio_filename'] = file + if output_file is None: + state['clip_video_file'] = file[:-4] + '_clip.mp4' + else: + state['clip_video_file'] = output_file + clip_srt_file = state['clip_video_file'][:-3] + 'srt' + state['video'] = mpy.VideoFileClip(file) + clip_video_file, message, srt_clip = audio_clipper.video_clip(dest_text, start_ost, end_ost, state, dest_spk=dest_spk) + logging.warning("Clipping Log: {}".format(message)) + logging.warning("Save clipped mp4 file to {}".format(clip_video_file)) + with open(clip_srt_file, 'w') as fout: + fout.write(srt_clip) + logging.warning("Write clipped subtitile to {}".format(clip_srt_file)) + + +def main(cmd=None): + print(get_commandline_args(), file=sys.stderr) + parser = get_parser() + args = parser.parse_args(cmd) + kwargs = vars(args) + runner(**kwargs) + + +if __name__ == '__main__': + main() \ No newline at end of file