From 6c14b8812a95909df520ac3826945eb3a3ccdb10 Mon Sep 17 00:00:00 2001 From: haofanurusai <90292544+haofanurusai@users.noreply.github.com> Date: Mon, 2 Jun 2025 21:12:12 +0800 Subject: [PATCH 1/2] Update gui_v1.py to use AudioIoProcess --- gui_v1.py | 131 +++++++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 110 insertions(+), 21 deletions(-) diff --git a/gui_v1.py b/gui_v1.py index c5e7179aa..38bf182ed 100644 --- a/gui_v1.py +++ b/gui_v1.py @@ -79,6 +79,8 @@ def run(self): import time import traceback from multiprocessing import Queue, cpu_count + from tools.audio_io_process import AudioIoProcess + from multiprocessing.shared_memory import SharedMemory from queue import Empty import librosa @@ -145,7 +147,16 @@ def __init__(self) -> None: self.output_devices = None self.input_devices_indices = None self.output_devices_indices = None - self.stream = None + self.audio_proc = None + self.in_mem = None + self.out_mem = None + self.in_buf = None + self.out_buf = None + self.in_ptr = None + self.out_ptr = None + self.play_ptr = None + self.in_evt = None + self.stop_evt = None self.update_devices() self.launcher() @@ -601,9 +612,9 @@ def event_handler(self): } with open("configs/inuse/config.json", "w") as j: json.dump(settings, j) - if self.stream is not None: + if self.audio_proc is not None: self.delay_time = ( - self.stream.latency[-1] + self.audio_proc.get_latency() + values["block_time"] + values["crossfade_length"] + 0.01 @@ -635,7 +646,7 @@ def event_handler(self): self.gui_config.f0method = event elif event == "I_noise_reduce": self.gui_config.I_noise_reduce = values["I_noise_reduce"] - if self.stream is not None: + if self.audio_proc is not None: self.delay_time += ( 1 if values["I_noise_reduce"] else -1 ) * min(values["crossfade_length"], 0.04) @@ -823,36 +834,80 @@ def start_stream(self): "WASAPI" in self.gui_config.sg_hostapi and self.gui_config.sg_wasapi_exclusive ): - extra_settings = sd.WasapiSettings(exclusive=True) + wasapi_exclusive = True else: - extra_settings = None - self.stream = sd.Stream( - callback=self.audio_callback, - blocksize=self.block_frame, - samplerate=self.gui_config.samplerate, - channels=self.gui_config.channels, - dtype="float32", - extra_settings=extra_settings, + wasapi_exclusive = False + self.audio_proc = AudioIoProcess( + input_device=sd.default.device[0], + output_device=sd.default.device[1], + input_audio_block_size = self.block_frame, + sample_rate = self.gui_config.samplerate, + channel_num=self.gui_config.channels, + is_input_wasapi_exclusive=wasapi_exclusive, + is_output_wasapi_exclusive=wasapi_exclusive, + is_device_combined = True + # TODO: Add control UI to allow devices with different type API & different WASAPI settings ) - self.stream.start() + self.in_mem = SharedMemory(name=self.audio_proc.get_in_mem_name()) + self.out_mem = SharedMemory(name=self.audio_proc.get_out_mem_name()) + self.in_buf = np.ndarray( + self.audio_proc.get_np_shape(), + dtype=self.audio_proc.get_np_dtype(), + buffer=self.in_mem.buf, + order='C' + ) + self.out_buf = np.ndarray( + self.audio_proc.get_np_shape(), + dtype=self.audio_proc.get_np_dtype(), + buffer=self.out_mem.buf, + order='C' + ) + self.in_ptr, \ + self.out_ptr, \ + self.play_ptr, \ + self.in_evt, \ + self.stop_evt = self.audio_proc.get_ptrs_and_events() + + self.audio_proc.start() + + def audio_loop(): + while flag_vc: + self.audio_infer(self.block_frame << 1) + + threading.Thread( + target=audio_loop, + daemon=True + ).start() def stop_stream(self): global flag_vc if flag_vc: flag_vc = False - if self.stream is not None: - self.stream.abort() - self.stream.close() - self.stream = None + if self.audio_proc is not None: + print("Exiting") + self.stop_evt.set() + self.in_mem.close() + self.out_mem.close() + self.audio_proc.join() + self.audio_proc = None - def audio_callback( - self, indata: np.ndarray, outdata: np.ndarray, frames, times, status + def audio_infer( + self, buf_size:int # 2 * self.block_frame ): """ 音频处理 """ global flag_vc + + self.in_evt.wait() + rptr = self.in_ptr.value + self.in_evt.clear() + start_time = time.perf_counter() + + rend = rptr + self.block_frame + indata = np.copy(self.in_buf[rptr:rend]) + indata = librosa.to_mono(indata.T) if self.gui_config.threhold > -60: indata = np.append(self.rms_buffer, indata) @@ -995,13 +1050,47 @@ def audio_callback( self.sola_buffer[:] = infer_wav[ self.block_frame : self.block_frame + self.sola_buffer_frame ] - outdata[:] = ( + outdata = ( infer_wav[: self.block_frame] .repeat(self.gui_config.channels, 1) .t() .cpu() .numpy() ) + + # 装填输出缓冲 + start = self.out_ptr.value + play_pos = self.play_ptr.value + + # 计算播放进度差(写指针距离播放指针的帧数) + delta = (start - play_pos + buf_size) % buf_size + + if delta < self.block_frame: + # 装填赶不上播放,导致播放进度追上来了, + # 此时已产生无法挽回的破音, + # 只好直接卡着播放指针写入,保证接下来的尽快放出来 + print("[W] Output underrun") + write_pos = play_pos + else: + # 否则按块对齐 + write_pos = (start + self.block_frame) % buf_size + + # 写入共享缓冲区 + end = (write_pos + self.block_frame) % buf_size + if end > write_pos: + self.out_buf[write_pos:end] = outdata + else: + first = buf_size - write_pos + self.out_buf[write_pos:] = outdata[:first] + self.out_buf[:end] = outdata[first:] + + # 更新写指针 + self.out_ptr.value = write_pos + + if self.in_evt.is_set(): + print("[W] Input overrun") + self.in_evt.clear() + total_time = time.perf_counter() - start_time if flag_vc: self.window["infer_time"].update(int(total_time * 1000)) From 5816e64952a3cd428a0f45492344367eb6216ada Mon Sep 17 00:00:00 2001 From: haofanurusai <90292544+haofanurusai@users.noreply.github.com> Date: Mon, 2 Jun 2025 21:13:45 +0800 Subject: [PATCH 2/2] multiprocess for async audio input/output --- tools/audio_io_process.py | 168 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 168 insertions(+) create mode 100644 tools/audio_io_process.py diff --git a/tools/audio_io_process.py b/tools/audio_io_process.py new file mode 100644 index 000000000..0fef9ed12 --- /dev/null +++ b/tools/audio_io_process.py @@ -0,0 +1,168 @@ +import numpy as np +from multiprocessing import Process, Value, Event +from multiprocessing.shared_memory import SharedMemory +import sounddevice as sd +import signal + + +class AudioIoProcess(Process): + def __init__(self, + input_device, + output_device, + input_audio_block_size: int, + sample_rate: int, + channel_num: int = 2, + is_device_combined: bool = True, + is_input_wasapi_exclusive: bool = False, + is_output_wasapi_exclusive: bool = False + ): + super().__init__() + self.in_dev = input_device + self.out_dev = output_device + self.block_size: int = input_audio_block_size + self.buf_size: int = self.block_size << 1 # 双缓冲 + self.sample_rate: int = sample_rate + self.channels: int = channel_num + self.is_device_combined: bool = is_device_combined + self.is_input_wasapi_exclusive: bool = is_input_wasapi_exclusive + self.is_output_wasapi_exclusive: bool = is_output_wasapi_exclusive + + self.__rec_ptr = 0 + self.in_ptr = Value('i', 0) # 当收满一个block时由本进程设置 + self.out_ptr = Value('i', 0) # 由主进程设置,指示下一次预期写入位置 + self.play_ptr = Value('i', 0) # 由本进程设置,指示当前音频已经播放到哪里 + self.in_evt = Event() # 当收满一个block时由本进程设置 + self.stop_evt = Event() # 当主进程停止音频活动时由主进程设置 + + self.latency = Value('d', 114514.1919810) + + self.buf_shape: tuple = (self.buf_size, self.channels) + self.buf_dtype: np.dtype = np.float32 + self.buf_nbytes: int = int( + np.prod(self.buf_shape) * np.dtype(self.buf_dtype).itemsize) + + self.in_mem = SharedMemory(create=True, size=self.buf_nbytes) + self.out_mem = SharedMemory(create=True, size=self.buf_nbytes) + self.in_mem_name: str = self.in_mem.name + self.out_mem_name: str = self.out_mem.name + + self.in_buf = None + self.out_buf = None + + def get_in_mem_name(self) -> str: + return self.in_mem_name + + def get_out_mem_name(self) -> str: + return self.out_mem_name + + def get_np_shape(self) -> tuple: + return self.buf_shape + + def get_np_dtype(self) -> np.dtype: + return self.buf_dtype + + def get_ptrs_and_events(self): + return self.in_ptr, \ + self.out_ptr,\ + self.play_ptr,\ + self.in_evt, \ + self.stop_evt\ + + def get_latency(self) -> float: + return self.latency.value + + def run(self): + signal.signal(signal.SIGINT, signal.SIG_IGN) + + in_mem = SharedMemory(name=self.in_mem_name) + self.in_buf = np.ndarray( + self.buf_shape, dtype=self.buf_dtype, buffer=in_mem.buf, order='C') + self.in_buf.fill(0.0) + + out_mem = SharedMemory(name=self.out_mem_name) + self.out_buf = np.ndarray( + self.buf_shape, dtype=self.buf_dtype, buffer=out_mem.buf, order='C') + self.out_buf.fill(0.0) + + exclusive_settings = sd.WasapiSettings(exclusive=True) + + sd.default.device = (self.in_dev, self.out_dev) + + def output_callback(outdata, frames, time_info, status): + play_ptr = self.play_ptr.value + end_ptr = play_ptr + frames + + if end_ptr <= self.buf_size: + outdata[:] = self.out_buf[play_ptr:end_ptr] + else: + first = self.buf_size - play_ptr + second = end_ptr - self.buf_size + outdata[:first] = self.out_buf[play_ptr:] + outdata[first:] = self.out_buf[:second] + + self.play_ptr.value = end_ptr % self.buf_size + + def input_callback(indata, frames, time_info, status): + # 收录输入数据 + end_ptr = self.__rec_ptr + frames + if end_ptr <= self.buf_size: # 整块拷贝 + self.in_buf[self.__rec_ptr:end_ptr] = indata + else: # 处理回绕 + first = self.buf_size - self.__rec_ptr + second = end_ptr - self.buf_size + self.in_buf[self.__rec_ptr:] = indata[:first] + self.in_buf[:second] = indata[first:] + write_pos = self.__rec_ptr + self.__rec_ptr = end_ptr % self.buf_size + + # 设置信号 + if write_pos < self.block_size and self.__rec_ptr >= self.block_size: + self.in_ptr.value = 0 + self.in_evt.set() # 通知主线程来取甲缓冲 + elif write_pos < self.buf_size and self.__rec_ptr < write_pos: + self.in_ptr.value = self.block_size + self.in_evt.set() # 通知主线程来取乙缓冲 + + def combined_callback(indata, outdata, frames, time_info, status): + output_callback(outdata, frames, time_info, status) # 优先出声 + input_callback(indata, frames, time_info, status) + + if self.is_device_combined: + with sd.Stream( + samplerate=self.sample_rate, + channels=self.channels, + dtype=self.buf_dtype, + latency='low', + extra_settings=exclusive_settings if + self.is_input_wasapi_exclusive and + self.is_output_wasapi_exclusive else None, + callback=combined_callback + ) as s: + self.latency.value = s.latency[-1] + self.stop_evt.wait() + self.out_buf.fill(0.0) + else: + with sd.InputStream( + samplerate=self.sample_rate, + channels=self.channels, + dtype=self.buf_dtype, + latency='low', + extra_settings=exclusive_settings if self.is_input_wasapi_exclusive else None, + callback=input_callback + ) as si, sd.OutputStream( + samplerate=self.sample_rate, + channels=self.channels, + dtype=self.buf_dtype, + latency='low', + extra_settings=exclusive_settings if self.is_output_wasapi_exclusive else None, + callback=output_callback + ) as so: + self.latency.value = si.latency[-1] + so.latency[-1] + self.stop_evt.wait() + self.out_buf.fill(0.0) + + # 清理共享内存 + in_mem.close() + out_mem.close() + in_mem.unlink() + out_mem.unlink()