-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlipsync_module.py
92 lines (78 loc) · 2.84 KB
/
lipsync_module.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# lipsynv_module.py
# 音频响度检测实现口型同步, 源代码见live2d-py仓库: https://github.com/Arkueid/live2d-py/blob/main/package/live2d/utils/lipsync.py
import wave
import time
import numpy as np
from io import BytesIO
import logging
# 获取根记录器
logger = logging.getLogger("lipsync_module")
class WavHandler:
def __init__(self):
# 每个通道的采样帧数
self.numFrames: int = 0
# 采样率,帧/秒
self.sampleRate: int = 0
self.sampleWidth: int = 0
# 通道数
self.numChannels: int = 0
# 数据
self.pcmData: np.ndarray = None
# 已经读取的帧数
self.lastOffset: int = 0
# 当前rms值
self.currentRms: float = 0
# 开始读取的时间
self.startTime: float = -1
def Start(self, audio_data: bytes) -> None:
"""接收并处理音频数据
Args:
audio_data (bytes): 音频序列
"""
self.ReleasePcmData()
try:
audio_stream = BytesIO(audio_data)
with wave.open(audio_stream, "rb") as wav:
self.numFrames = wav.getnframes()
self.sampleRate = wav.getframerate()
self.sampleWidth = wav.getsampwidth()
self.numChannels = wav.getnchannels()
self.pcmData = np.frombuffer(
wav.readframes(self.numFrames), dtype=np.int16
)
self.pcmData = self.pcmData / np.max(np.abs(self.pcmData))
self.pcmData = self.pcmData.reshape(-1, self.numChannels).T
self.startTime = time.time()
self.lastOffset = 0
except Exception as e:
self.ReleasePcmData()
logger.error(f"音频数据加载失败: {e}")
def ReleasePcmData(self):
"""释放pcm数据"""
if self.pcmData is not None:
del self.pcmData
self.pcmData = None
def GetRms(self) -> float:
"""返回语音响度数据
Returns:
float: 语音响度, 范围[0,1]
"""
return self.currentRms
def Update(self) -> bool:
"""更新音频帧位置,并计算当前音频段的 RMS
Returns:
bool: 返回是否更新成功
"""
if self.pcmData is None or self.lastOffset >= self.numFrames:
return False
currentTime = time.time() - self.startTime
currentOffset = int(currentTime * self.sampleRate)
if currentOffset == self.lastOffset:
return True
currentOffset = min(currentOffset, self.numFrames)
dataFragment = self.pcmData[:, self.lastOffset : currentOffset].astype(
np.float32
)
self.currentRms = np.sqrt(np.mean(np.square(dataFragment)))
self.lastOffset = currentOffset
return True