Add PreparedCpuStage data class and refactor prepare_cpu_stage_profiled_async method in PrepareCoordinator for improved CPU profiling. Introduce prepare_gpu_stage_profiled_async method to streamline GPU stage preparation using the new data class, enhancing overall performance and maintainability.

This commit is contained in:
baicai-1145 2026-03-11 05:29:30 +08:00
parent 6a427b4f54
commit 06d6b67f73
2 changed files with 2955 additions and 719 deletions

View File

@ -33,6 +33,20 @@ class ProfiledResult:
return max(0.0, (self.finished_at - self.started_at) * 1000.0)
@dataclass
class PreparedCpuStage:
spec: SchedulerRequestSpec
prepare_submit_at: float
prepare_start: float
prompt_text: str
text: str
prepare_admission_wait_ms: float
current_inflight: int
peak_inflight: int
prompt_cpu_profiled: ProfiledResult
target_cpu_profiled: ProfiledResult
class PrepareCoordinator:
def __init__(self, tts: Any):
self.tts = tts
@ -216,11 +230,16 @@ class PrepareCoordinator:
async def _run_ref_audio_stage(self, ref_audio_path: str) -> ProfiledResult:
return await self._run_on_executor(self.ref_audio_executor, self.tts.extract_ref_audio_bundle, ref_audio_path)
async def prepare_state_profiled_async(
def _release_split_stage_slot(self) -> None:
self._mark_leave()
if self._inflight_semaphore is not None:
self._inflight_semaphore.release()
async def prepare_cpu_stage_profiled_async(
self,
spec: SchedulerRequestSpec,
prepare_submit_at: float,
) -> tuple[T2SRequestState, float, float]:
) -> PreparedCpuStage:
admission_start = time.perf_counter()
if self._inflight_semaphore is not None:
await self._inflight_semaphore.acquire()
@ -230,17 +249,38 @@ class PrepareCoordinator:
prompt_text = normalize_sentence(spec.prompt_text, spec.prompt_lang)
text = spec.text.strip("\n")
try:
text_pair_start = time.perf_counter()
prompt_cpu_task = asyncio.create_task(self._run_text_cpu_stage(prompt_text, spec.prompt_lang))
target_cpu_task = asyncio.create_task(self._run_text_cpu_stage(text, spec.text_lang))
ref_audio_task = asyncio.create_task(self._run_ref_audio_stage(str(spec.ref_audio_path)))
prompt_cpu_profiled, target_cpu_profiled = await asyncio.gather(prompt_cpu_task, target_cpu_task)
return PreparedCpuStage(
spec=spec,
prepare_submit_at=float(prepare_submit_at),
prepare_start=float(prepare_start),
prompt_text=prompt_text,
text=text,
prepare_admission_wait_ms=float(prepare_admission_wait_ms),
current_inflight=int(current_inflight),
peak_inflight=int(peak_inflight),
prompt_cpu_profiled=prompt_cpu_profiled,
target_cpu_profiled=target_cpu_profiled,
)
except Exception:
self._release_split_stage_slot()
raise
async def prepare_gpu_stage_profiled_async(
self,
cpu_stage: PreparedCpuStage,
) -> tuple[T2SRequestState, float, float]:
try:
text_pair_start = time.perf_counter()
ref_audio_task = asyncio.create_task(self._run_ref_audio_stage(str(cpu_stage.spec.ref_audio_path)))
text_feature_pair_task = asyncio.create_task(
self._run_text_feature_pair_stage(
prompt_cpu_profiled.result,
target_cpu_profiled.result,
prompt_cpu_profiled.run_ms,
target_cpu_profiled.run_ms,
cpu_stage.prompt_cpu_profiled.result,
cpu_stage.target_cpu_profiled.result,
cpu_stage.prompt_cpu_profiled.run_ms,
cpu_stage.target_cpu_profiled.run_ms,
)
)
(prompt_feature_profiled, target_feature_profiled), ref_audio_profiled = await asyncio.gather(
@ -250,18 +290,18 @@ class PrepareCoordinator:
text_pair_end = time.perf_counter()
state = build_request_state_from_parts(
tts=self.tts,
spec=spec,
prompt_text=prompt_text,
text=text,
spec=cpu_stage.spec,
prompt_text=cpu_stage.prompt_text,
text=cpu_stage.text,
prompt_result=prompt_feature_profiled.result,
target_result=target_feature_profiled.result,
ref_audio_bundle=ref_audio_profiled.result,
prepare_start=prepare_start,
prepare_sync_start=prepare_start,
prepare_start=cpu_stage.prepare_start,
prepare_sync_start=cpu_stage.prepare_start,
profile_overrides={
"executor_queue_ms": max(0.0, (prepare_start - prepare_submit_at) * 1000.0),
"prepare_admission_wait_ms": prepare_admission_wait_ms,
"executor_run_wall_ms": max(0.0, (time.perf_counter() - prepare_start) * 1000.0),
"executor_queue_ms": max(0.0, (cpu_stage.prepare_start - cpu_stage.prepare_submit_at) * 1000.0),
"prepare_admission_wait_ms": cpu_stage.prepare_admission_wait_ms,
"executor_run_wall_ms": max(0.0, (time.perf_counter() - cpu_stage.prepare_start) * 1000.0),
"text_feature_pair_ms": max(0.0, (text_pair_end - text_pair_start) * 1000.0),
"prompt_text_parallel_future_wait_ms": 0.0,
"prompt_text_parallel_future_executor_queue_ms": 0.0,
@ -269,26 +309,32 @@ class PrepareCoordinator:
"prompt_text_parallel_future_finish_after_submit_ms": 0.0,
"prompt_text_parallel_future_queue_tail_after_target_ms": 0.0,
"prompt_text_parallel_future_run_tail_after_target_ms": 0.0,
"prompt_text_cpu_queue_ms": prompt_cpu_profiled.queue_ms,
"prompt_text_cpu_run_ms": prompt_cpu_profiled.run_ms,
"prompt_text_cpu_queue_ms": cpu_stage.prompt_cpu_profiled.queue_ms,
"prompt_text_cpu_run_ms": cpu_stage.prompt_cpu_profiled.run_ms,
"prompt_text_feature_queue_ms": prompt_feature_profiled.queue_ms,
"prompt_text_feature_run_ms": prompt_feature_profiled.run_ms,
"text_cpu_queue_ms": target_cpu_profiled.queue_ms,
"text_cpu_run_ms": target_cpu_profiled.run_ms,
"text_cpu_queue_ms": cpu_stage.target_cpu_profiled.queue_ms,
"text_cpu_run_ms": cpu_stage.target_cpu_profiled.run_ms,
"text_feature_queue_ms": target_feature_profiled.queue_ms,
"text_feature_run_ms": target_feature_profiled.run_ms,
"ref_audio_task_queue_ms": ref_audio_profiled.queue_ms,
"ref_audio_task_run_ms": ref_audio_profiled.run_ms,
"worker_prepare_inflight_on_enter": float(current_inflight),
"worker_prepare_peak_inflight": float(peak_inflight),
"worker_prepare_inflight_on_enter": float(cpu_stage.current_inflight),
"worker_prepare_peak_inflight": float(cpu_stage.peak_inflight),
},
)
prepare_exec_finished_at = time.perf_counter()
state.prepare_profile["executor_run_wall_ms"] = max(
0.0, (prepare_exec_finished_at - prepare_start) * 1000.0
0.0, (prepare_exec_finished_at - cpu_stage.prepare_start) * 1000.0
)
return state, prepare_start, prepare_exec_finished_at
return state, cpu_stage.prepare_start, prepare_exec_finished_at
finally:
self._mark_leave()
if self._inflight_semaphore is not None:
self._inflight_semaphore.release()
self._release_split_stage_slot()
async def prepare_state_profiled_async(
self,
spec: SchedulerRequestSpec,
prepare_submit_at: float,
) -> tuple[T2SRequestState, float, float]:
cpu_stage = await self.prepare_cpu_stage_profiled_async(spec, prepare_submit_at)
return await self.prepare_gpu_stage_profiled_async(cpu_stage)

File diff suppressed because it is too large Load Diff