# Streaming Avatar 개발기 - v7: 인터랙티브 대화 시스템 ## 개요 사용자 음성 입력(STT)을 추가하여 양방향 실시간 대화가 가능한 완전한 인터랙티브 아바타 시스템을 구현합니다. ## 전체 시스템 흐름 ``` ┌─────────────────────────────────────────────────────────────────────────────┐ │ Interactive Avatar System │ ├─────────────────────────────────────────────────────────────────────────────┤ │ │ │ [User] [Avatar] │ │ │ │ │ │ │ 🎤 Voice │ │ │ ▼ │ │ │ ┌───────┐ WebRTC ┌──────────────────────────────┐ │ │ │ │ Mic │────────────────▶│ STT Engine │ │ │ │ └───────┘ │ (Whisper) │ │ │ │ └────────────┬─────────────────┘ │ │ │ │ text │ │ │ ▼ │ │ │ ┌──────────────────────────────┐ │ │ │ │ LLM Engine │ │ │ │ │ (Gemini Flash) │ │ │ │ └────────────┬─────────────────┘ │ │ │ │ response │ │ │ ▼ │ │ │ ┌──────────────────────────────┐ │ │ │ │ TTS Engine │ │ │ │ │ (Google/ElevenLabs) │ │ │ │ └────────────┬─────────────────┘ │ │ │ │ audio │ │ │ ▼ │ │ │ ┌──────────────────────────────┐ │ │ │ │ Lip Sync Engine │───┘ │ │ │ (MuseTalk) │ │ │ └────────────┬─────────────────┘ │ │ │ video │ │ ┌───────┐ WebRTC ┌────────────▼─────────────────┐ │ │ │Screen │◀────────────────│ Video/Audio Stream │ │ │ └───────┘ └──────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────────────────┘ ``` ## 1. STT 엔진 (Whisper) ### Faster-Whisper 설정 ```python # src/stt/whisper_engine.py from faster_whisper import WhisperModel import numpy as np import asyncio from typing import AsyncIterator, Optional from dataclasses import dataclass @dataclass class TranscriptionResult: text: str confidence: float language: str is_final: bool class WhisperSTTEngine: """Faster-Whisper 기반 STT 엔진""" def __init__( self, model_size: str = "base", # tiny, base, small, medium, large-v3 device: str = "cuda", compute_type: str = "float16", language: str = "ko" ): self.model = WhisperModel( model_size, device=device, compute_type=compute_type ) self.language = language self.sample_rate = 16000 async def transcribe( self, audio: np.ndarray ) -> TranscriptionResult: """오디오를 텍스트로 변환""" # 비동기 실행 loop = asyncio.get_event_loop() segments, info = await loop.run_in_executor( None, lambda: self.model.transcribe( audio, language=self.language, beam_size=5, vad_filter=True, vad_parameters={ "min_silence_duration_ms": 500, "speech_pad_ms": 200 } ) ) text = " ".join([segment.text for segment in segments]) return TranscriptionResult( text=text.strip(), confidence=info.language_probability, language=info.language, is_final=True ) async def transcribe_stream( self, audio_stream: AsyncIterator[np.ndarray], chunk_duration_sec: float = 2.0 ) -> AsyncIterator[TranscriptionResult]: """스트리밍 음성 인식""" buffer = np.array([], dtype=np.float32) chunk_samples = int(self.sample_rate * chunk_duration_sec) async for audio_chunk in audio_stream: buffer = np.concatenate([buffer, audio_chunk]) # 충분한 오디오가 모이면 인식 if len(buffer) >= chunk_samples: result = await self.transcribe(buffer) if result.text: yield result # 버퍼의 후반부 유지 (오버랩) buffer = buffer[chunk_samples // 2:] ``` ### VAD (Voice Activity Detection) ```python # src/stt/vad.py import webrtcvad import numpy as np from collections import deque from typing import AsyncIterator class VoiceActivityDetector: """음성 구간 감지""" def __init__( self, sample_rate: int = 16000, frame_duration_ms: int = 30, aggressiveness: int = 3, # 0-3 speech_pad_ms: int = 300, min_speech_ms: int = 250 ): self.vad = webrtcvad.Vad(aggressiveness) self.sample_rate = sample_rate self.frame_duration_ms = frame_duration_ms self.frame_size = int(sample_rate * frame_duration_ms / 1000) # 패딩 설정 self.speech_pad_frames = int(speech_pad_ms / frame_duration_ms) self.min_speech_frames = int(min_speech_ms / frame_duration_ms) # 상태 self.is_speaking = False self.speech_buffer = deque() self.num_voiced = 0 self.num_unvoiced = 0 def _frame_to_bytes(self, frame: np.ndarray) -> bytes: """numpy 배열을 바이트로 변환""" return (frame * 32768).astype(np.int16).tobytes() async def process_stream( self, audio_stream: AsyncIterator[np.ndarray] ) -> AsyncIterator[np.ndarray]: """음성 구간만 필터링하여 반환""" buffer = np.array([], dtype=np.float32) async for audio_chunk in audio_stream: buffer = np.concatenate([buffer, audio_chunk]) # 프레임 단위로 처리 while len(buffer) >= self.frame_size: frame = buffer[:self.frame_size] buffer = buffer[self.frame_size:] # VAD 체크 frame_bytes = self._frame_to_bytes(frame) is_speech = self.vad.is_speech(frame_bytes, self.sample_rate) if is_speech: self.num_voiced += 1 self.num_unvoiced = 0 else: self.num_unvoiced += 1 # 버퍼에 추가 self.speech_buffer.append(frame) # 음성 시작 감지 if not self.is_speaking: if self.num_voiced >= self.min_speech_frames: self.is_speaking = True # 패딩 포함하여 버퍼 반환 yield np.concatenate(list(self.speech_buffer)) self.speech_buffer.clear() else: # 음성 끝 감지 if self.num_unvoiced >= self.speech_pad_frames: self.is_speaking = False self.num_voiced = 0 # 마지막 음성 구간 반환 if self.speech_buffer: yield np.concatenate(list(self.speech_buffer)) self.speech_buffer.clear() else: # 계속 음성 중 yield frame ``` ## 2. 대화 상태 관리 ### 대화 상태 머신 ```python # src/conversation/state_machine.py from enum import Enum, auto from dataclasses import dataclass from typing import Optional import asyncio class ConversationState(Enum): IDLE = auto() # 대기 중 LISTENING = auto() # 사용자 음성 듣는 중 PROCESSING = auto() # AI 처리 중 SPEAKING = auto() # 아바타 말하는 중 INTERRUPTING = auto() # 사용자 끼어들기 @dataclass class StateContext: current_state: ConversationState user_text: Optional[str] = None avatar_text: Optional[str] = None interrupt_requested: bool = False class ConversationStateMachine: """대화 상태 관리""" def __init__(self): self.state = ConversationState.IDLE self.context = StateContext(current_state=self.state) self.state_lock = asyncio.Lock() self.listeners = [] async def transition_to( self, new_state: ConversationState, **kwargs ): """상태 전환""" async with self.state_lock: old_state = self.state self.state = new_state self.context.current_state = new_state for key, value in kwargs.items(): setattr(self.context, key, value) # 리스너 알림 for listener in self.listeners: await listener(old_state, new_state, self.context) def on_state_change(self, callback): """상태 변경 리스너 등록""" self.listeners.append(callback) async def handle_user_speech_start(self): """사용자 음성 시작""" if self.state == ConversationState.SPEAKING: # 아바타가 말하는 중 끼어들기 await self.transition_to( ConversationState.INTERRUPTING, interrupt_requested=True ) else: await self.transition_to(ConversationState.LISTENING) async def handle_user_speech_end(self, text: str): """사용자 음성 종료""" await self.transition_to( ConversationState.PROCESSING, user_text=text ) async def handle_avatar_start_speaking(self, text: str): """아바타 말하기 시작""" await self.transition_to( ConversationState.SPEAKING, avatar_text=text, interrupt_requested=False ) async def handle_avatar_done_speaking(self): """아바타 말하기 완료""" await self.transition_to(ConversationState.IDLE) ``` ### 끼어들기 처리 ```python # src/conversation/interrupt_handler.py import asyncio from typing import Callable, Awaitable class InterruptHandler: """사용자 끼어들기 처리""" def __init__( self, state_machine: ConversationStateMachine, on_interrupt: Callable[[], Awaitable[None]] ): self.state_machine = state_machine self.on_interrupt = on_interrupt self.speaking_task: Optional[asyncio.Task] = None async def start_speaking(self, coroutine): """아바타 말하기 시작 (끼어들기 가능)""" # 기존 말하기 취소 if self.speaking_task and not self.speaking_task.done(): self.speaking_task.cancel() self.speaking_task = asyncio.create_task( self._speaking_with_interrupt(coroutine) ) return self.speaking_task async def _speaking_with_interrupt(self, coroutine): """끼어들기 감지하며 말하기""" try: async for item in coroutine: # 끼어들기 체크 if self.state_machine.context.interrupt_requested: await self.on_interrupt() raise asyncio.CancelledError("User interrupt") yield item except asyncio.CancelledError: # 깔끔한 정리 pass ``` ## 3. 전체 대화 컨트롤러 ```python # src/conversation/controller.py import asyncio from typing import AsyncIterator from dataclasses import dataclass from src.stt.whisper_engine import WhisperSTTEngine from src.stt.vad import VoiceActivityDetector from src.pipeline.avatar_pipeline import AvatarPipeline, AvatarFrame from src.conversation.state_machine import ( ConversationStateMachine, ConversationState ) from src.conversation.interrupt_handler import InterruptHandler @dataclass class ConversationEvent: event_type: str # "state_change", "user_speech", "avatar_frame" data: any class ConversationController: """전체 대화 흐름 제어""" def __init__( self, avatar_pipeline: AvatarPipeline, stt_engine: WhisperSTTEngine ): self.avatar_pipeline = avatar_pipeline self.stt = stt_engine self.vad = VoiceActivityDetector() self.state_machine = ConversationStateMachine() self.interrupt_handler = InterruptHandler( self.state_machine, self._on_interrupt ) self.audio_queue: asyncio.Queue = asyncio.Queue() self.event_queue: asyncio.Queue = asyncio.Queue() self.running = False async def start(self): """대화 시작""" self.running = True # 상태 변경 리스너 등록 self.state_machine.on_state_change(self._on_state_change) # 백그라운드 태스크 시작 asyncio.create_task(self._process_audio()) async def stop(self): """대화 종료""" self.running = False async def feed_audio(self, audio_chunk: bytes): """오디오 데이터 입력""" await self.audio_queue.put(audio_chunk) async def get_events(self) -> AsyncIterator[ConversationEvent]: """이벤트 스트림""" while self.running: event = await self.event_queue.get() yield event async def _process_audio(self): """오디오 처리 루프""" async def audio_generator(): while self.running: chunk = await self.audio_queue.get() yield np.frombuffer(chunk, dtype=np.float32) # VAD로 음성 구간 필터링 speech_stream = self.vad.process_stream(audio_generator()) # STT로 텍스트 변환 async for result in self.stt.transcribe_stream(speech_stream): if result.is_final and result.text: # 사용자 음성 처리 await self._handle_user_input(result.text) async def _handle_user_input(self, text: str): """사용자 입력 처리""" await self.state_machine.handle_user_speech_end(text) # 이벤트 발행 await self.event_queue.put(ConversationEvent( event_type="user_speech", data={"text": text} )) # 아바타 응답 생성 await self._generate_avatar_response(text) async def _generate_avatar_response(self, user_input: str): """아바타 응답 생성 및 스트리밍""" await self.state_machine.handle_avatar_start_speaking("") # 끼어들기 가능한 스트리밍 async def response_generator(): async for frame in self.avatar_pipeline.process_input(user_input): yield frame try: async for frame in self.interrupt_handler.start_speaking( response_generator() ): await self.event_queue.put(ConversationEvent( event_type="avatar_frame", data=frame )) finally: await self.state_machine.handle_avatar_done_speaking() async def _on_interrupt(self): """끼어들기 처리""" await self.event_queue.put(ConversationEvent( event_type="interrupt", data={} )) async def _on_state_change(self, old_state, new_state, context): """상태 변경 알림""" await self.event_queue.put(ConversationEvent( event_type="state_change", data={ "old_state": old_state.name, "new_state": new_state.name } )) ``` ## 4. 프론트엔드 통합 ### React Hook ```typescript // hooks/useConversation.ts import { useState, useCallback, useEffect, useRef } from 'react'; interface ConversationState { state: 'idle' | 'listening' | 'processing' | 'speaking'; transcript: string; avatarText: string; } export function useConversation(sessionId: string) { const [state, setState] = useState({ state: 'idle', transcript: '', avatarText: '' }); const wsRef = useRef(null); const audioContextRef = useRef(null); const streamRef = useRef(null); // 마이크 스트림 시작 const startListening = useCallback(async () => { const stream = await navigator.mediaDevices.getUserMedia({ audio: { sampleRate: 16000, channelCount: 1, echoCancellation: true, noiseSuppression: true } }); streamRef.current = stream; audioContextRef.current = new AudioContext({ sampleRate: 16000 }); const source = audioContextRef.current.createMediaStreamSource(stream); const processor = audioContextRef.current.createScriptProcessor( 4096, 1, 1 ); processor.onaudioprocess = (e) => { const audioData = e.inputBuffer.getChannelData(0); const buffer = new Float32Array(audioData); // WebSocket으로 오디오 전송 wsRef.current?.send(buffer.buffer); }; source.connect(processor); processor.connect(audioContextRef.current.destination); setState(prev => ({ ...prev, state: 'listening' })); }, []); // 마이크 스트림 중지 const stopListening = useCallback(() => { streamRef.current?.getTracks().forEach(track => track.stop()); audioContextRef.current?.close(); setState(prev => ({ ...prev, state: 'idle' })); }, []); // WebSocket 이벤트 처리 useEffect(() => { const ws = new WebSocket(`wss://api.domain.com/conversation/${sessionId}`); wsRef.current = ws; ws.onmessage = (event) => { const data = JSON.parse(event.data); switch (data.event_type) { case 'state_change': setState(prev => ({ ...prev, state: data.data.new_state.toLowerCase() })); break; case 'user_speech': setState(prev => ({ ...prev, transcript: data.data.text })); break; case 'avatar_frame': // 비디오 프레임 처리는 별도 컴포넌트에서 break; } }; return () => ws.close(); }, [sessionId]); return { state, startListening, stopListening, isListening: state.state === 'listening' }; } ``` ### UI 컴포넌트 ```typescript // components/ConversationUI.tsx import { useConversation } from '@/hooks/useConversation'; import { StreamingAvatar } from './StreamingAvatar'; export function ConversationUI({ sessionId }: { sessionId: string }) { const { state, startListening, stopListening, isListening } = useConversation(sessionId); return (
{/* 아바타 비디오 */} {/* 상태 표시 */}
{state.state === 'listening' && '🎤 Listening...'} {state.state === 'processing' && '🤔 Thinking...'} {state.state === 'speaking' && '🗣️ Speaking...'} {state.state === 'idle' && '😊 Ready'}
{/* 자막 */}
{state.transcript && (

You: {state.transcript}

)} {state.avatarText && (

Avatar: {state.avatarText}

)}
{/* 마이크 버튼 */}
); } ``` ## 5. 테스트 ```python # tests/test_conversation.py import pytest import asyncio from src.conversation.controller import ConversationController @pytest.mark.asyncio async def test_full_conversation_flow(): """전체 대화 흐름 테스트""" controller = ConversationController( avatar_pipeline=mock_pipeline, stt_engine=mock_stt ) await controller.start() # 음성 입력 시뮬레이션 audio_data = generate_test_audio("안녕하세요") await controller.feed_audio(audio_data) # 이벤트 수집 events = [] async for event in controller.get_events(): events.append(event) if len(events) >= 5: break await controller.stop() # 검증 event_types = [e.event_type for e in events] assert "user_speech" in event_types assert "avatar_frame" in event_types ``` ## 다음 단계 (v8) 시스템 전체의 성능을 최적화하여 지연 시간을 700ms 이하로 줄입니다. --- *이 시리즈는 총 10개의 포스트로 구성되어 있습니다.*