-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathautomated_puppeteering.py
110 lines (89 loc) · 3.95 KB
/
automated_puppeteering.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
from pydub import AudioSegment
from pydispatch import dispatcher
from scipy.io import wavfile
import pygame
import numpy as np
import time
import io
from typing import Tuple, List, Any
class AutomatedPuppeteering:
def __init__(self, pygame_instance: Any, threshold: float = 0.15, interval_ms: int = 25) -> None:
self.pygame = pygame_instance
# Ensure threshold is numeric
if not isinstance(threshold, (int, float)):
raise ValueError("Threshold must be a numeric value!")
self.threshold: float = threshold # Audio level threshold to open/close mouth
self.interval_ms: int = interval_ms # Time interval to monitor audio (in ms)
def normalize_rms(self, rms: float, max_rms: float) -> float:
"""Normalize RMS to be between 0 and 1 based on dynamic max RMS value."""
return min(rms / max_rms, 1.0) # Normalize RMS to 0-1 range
def calculate_rms(self, data: np.ndarray, sample_rate: int) -> List[float]:
"""Calculate RMS values over time for the audio."""
window_size: int = int(sample_rate * (self.interval_ms / 1000.0)) # Interval in samples
rms_values: List[float] = []
num_samples: int = len(data)
max_rms: float = float(np.max(np.abs(data))) # Dynamically adjust max RMS from data
for i in range(0, num_samples, window_size):
window = data[i:i + window_size]
rms: float = np.sqrt(np.mean(window.astype(np.float64) ** 2))
if np.isnan(rms) or np.isinf(rms):
rms = 0 # Replace NaN/inf values with 0
rms_values.append(self.normalize_rms(rms, max_rms)) # Normalize RMS value
return rms_values
def load_audio_data(self, file_path: str) -> Tuple[int, np.ndarray]:
"""Load audio data and sample rate from various file formats."""
if file_path.endswith('.mp3'):
audio = AudioSegment.from_mp3(file_path)
elif file_path.endswith('.ogg'):
audio = AudioSegment.from_ogg(file_path)
elif file_path.endswith('.wav'):
audio = AudioSegment.from_wav(file_path)
else:
raise ValueError(f"Unsupported file format: {file_path}")
# Convert to raw audio data and retrieve sample rate
wav_io = io.BytesIO()
audio.export(wav_io, format="wav")
wav_io.seek(0)
sample_rate, data = wavfile.read(wav_io)
return sample_rate, data
def monitor_audio(self, file_path: str) -> None:
"""Monitor the audio levels during playback with improved synchronization."""
try:
# Load audio data
sample_rate, data = self.load_audio_data(file_path)
# Calculate RMS values
rms_values: List[float] = self.calculate_rms(data, sample_rate)
# Play the audio file using pygame.mixer
self.pygame.mixer.music.load(file_path)
self.pygame.mixer.music.play()
# Use a monotonic clock for scheduling to prevent drift
start_time: float = time.monotonic()
iteration: int = 0
duration: float = len(data) / sample_rate # Audio duration in seconds
for rms in rms_values:
# If playback duration is exceeded, break out of the loop
if time.monotonic() - start_time > duration:
break
# Trigger mouth movement based on the RMS threshold
if rms > self.threshold:
dispatcher.send(signal="keyEvent", key='x', val=1) # Mouth open event
else:
dispatcher.send(signal="keyEvent", key='x', val=0) # Mouth close event
iteration += 1
# Calculate target time for the next update
target_time: float = start_time + iteration * (self.interval_ms / 1000.0)
sleep_duration: float = target_time - time.monotonic()
if sleep_duration > 0:
time.sleep(sleep_duration)
# Wait for the music to finish without busy-waiting
while self.pygame.mixer.music.get_busy():
self.pygame.time.wait(10) # Small wait to avoid a busy loop
except Exception as e:
print(f"Error processing audio file {file_path}: {e}")
def play_audio_with_puppeting(self, file_path: str) -> None:
"""Plays audio and synchronizes mouth state with the audio."""
try:
print(f"Playing audio from {file_path}...") # Debug output
self.monitor_audio(file_path)
except Exception as e:
print(f"Error playing {file_path}: {e}")