# Streaming Avatar 개발기 - v5: WebRTC 실시간 스트리밍 ## 개요 MuseTalk으로 생성한 립싱크 프레임을 WebRTC를 통해 실시간으로 클라이언트에 스트리밍하는 시스템을 구현합니다. ## WebRTC 기초 ### WebRTC란? **Web Real-Time Communication** - 브라우저 간 P2P 통신을 위한 표준 API ``` 특징: - 초저지연: 100-500ms - P2P: 서버 부하 감소 - 암호화: DTLS-SRTP 기본 적용 - 적응형: 네트워크 상태에 따른 품질 조절 ``` ### 시그널링 흐름 ``` ┌──────────┐ ┌──────────┐ ┌──────────┐ │ Client │ │ Server │ │ Avatar │ │ (Browser)│ │(Signaling)│ │ (GPU) │ └────┬─────┘ └────┬─────┘ └────┬─────┘ │ │ │ │ 1. Connect WebSocket │ │ │──────────────────────────>│ │ │ │ │ │ 2. Request Avatar Session │ │ │──────────────────────────>│ │ │ │ 3. Initialize Avatar │ │ │──────────────────────────>│ │ │ │ │ │ 4. Avatar Ready │ │ │<──────────────────────────│ │ │ │ │ 5. Create Offer (SDP) │ │ │──────────────────────────>│ │ │ │ 6. Forward to Avatar │ │ │──────────────────────────>│ │ │ │ │ │ 7. Create Answer │ │ │<──────────────────────────│ │ 8. Receive Answer │ │ │<──────────────────────────│ │ │ │ │ │ 9. ICE Candidates Exchange│ │ │<────────────────────────────────────────────────────>│ │ │ │ │ 10. P2P Connection Established │ │<─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─>│ │ │ │ │ [Video/Audio Stream] │ │ │<══════════════════════════════════════════════════════│ │ │ │ ``` ## 1. LiveKit 서버 설정 ### LiveKit 선택 이유 - 오픈소스 (Apache 2.0) - SFU 아키텍처 (확장성) - Simulcast 지원 - Python/JS SDK 완비 ### 설치 ```bash # Docker로 실행 docker run -d \ --name livekit \ -p 7880:7880 \ -p 7881:7881 \ -p 7882:7882/udp \ -e LIVEKIT_KEYS="devkey: secret" \ livekit/livekit-server \ --dev ``` ### 설정 파일 ```yaml # livekit.yaml port: 7880 rtc: port_range_start: 50000 port_range_end: 60000 use_external_ip: true keys: APIKey: secretAPIKey room: auto_create: true empty_timeout: 300 turn: enabled: true domain: turn.yourdomain.com tls_port: 5349 ``` ## 2. 백엔드 구현 ### 시그널링 서버 ```python # src/streaming/signaling_server.py from fastapi import FastAPI, WebSocket, WebSocketDisconnect from livekit import api, rtc import json app = FastAPI() # LiveKit 클라이언트 lk_api = api.LiveKitAPI( url="http://localhost:7880", api_key="devkey", api_secret="secret" ) class SignalingServer: def __init__(self): self.sessions: dict[str, "AvatarSession"] = {} async def create_session( self, session_id: str, avatar_path: str ) -> dict: """새 아바타 세션 생성""" # LiveKit 방 생성 room = await lk_api.room.create_room( api.CreateRoomRequest( name=f"avatar_{session_id}", empty_timeout=300, max_participants=2 ) ) # 참가자 토큰 생성 token = api.AccessToken( api_key="devkey", api_secret="secret" ) token.with_identity(session_id) token.with_name("viewer") token.add_grant(api.VideoGrants( room_join=True, room=room.name )) # 아바타 세션 초기화 session = AvatarSession( session_id=session_id, room_name=room.name, avatar_path=avatar_path ) self.sessions[session_id] = session return { "session_id": session_id, "room_name": room.name, "token": token.to_jwt(), "livekit_url": "wss://localhost:7880" } async def handle_websocket(self, ws: WebSocket, session_id: str): """WebSocket 연결 처리""" await ws.accept() try: while True: data = await ws.receive_json() if data["type"] == "start_avatar": result = await self.create_session( session_id, data["avatar_path"] ) await ws.send_json(result) elif data["type"] == "send_text": session = self.sessions.get(session_id) if session: await session.process_text(data["text"]) elif data["type"] == "stop": session = self.sessions.pop(session_id, None) if session: await session.cleanup() break except WebSocketDisconnect: session = self.sessions.pop(session_id, None) if session: await session.cleanup() signaling = SignalingServer() @app.websocket("/ws/{session_id}") async def websocket_endpoint(ws: WebSocket, session_id: str): await signaling.handle_websocket(ws, session_id) ``` ### 아바타 세션 클래스 ```python # src/streaming/avatar_session.py import asyncio from livekit import rtc from src.lipsync.realtime_engine import RealtimeLipSyncEngine from src.tts.streaming_tts import StreamingTTS from src.llm.gemini_client import GeminiLLM class AvatarSession: def __init__( self, session_id: str, room_name: str, avatar_path: str ): self.session_id = session_id self.room_name = room_name # AI 컴포넌트 self.llm = GeminiLLM() self.tts = StreamingTTS() self.lipsync = RealtimeLipSyncEngine() # 아바타 로드 self.avatar_data = self.lipsync.preprocess_avatar(avatar_path) # LiveKit 연결 self.room = rtc.Room() self.video_source = rtc.VideoSource(640, 480) self.audio_source = rtc.AudioSource(16000, 1) self.running = False async def connect_to_room(self): """LiveKit 방에 연결""" token = self._generate_avatar_token() await self.room.connect( "ws://localhost:7880", token ) # 비디오 트랙 발행 video_track = rtc.LocalVideoTrack.create_video_track( "avatar_video", self.video_source ) await self.room.local_participant.publish_track( video_track, rtc.TrackPublishOptions( source=rtc.TrackSource.SOURCE_CAMERA ) ) # 오디오 트랙 발행 audio_track = rtc.LocalAudioTrack.create_audio_track( "avatar_audio", self.audio_source ) await self.room.local_participant.publish_track( audio_track, rtc.TrackPublishOptions( source=rtc.TrackSource.SOURCE_MICROPHONE ) ) async def process_text(self, text: str): """텍스트 입력 처리 및 아바타 응답""" self.running = True # 1. LLM 응답 생성 (스트리밍) response_text = "" async for chunk in self.llm.generate_stream(text): response_text += chunk # 2. TTS 변환 (청크 단위) if len(response_text) > 20: # 충분한 텍스트가 모이면 audio_chunk = await self.tts.synthesize(response_text) response_text = "" # 3. 립싱크 생성 frames = await self.lipsync.process_audio_chunk( self.avatar_data, audio_chunk ) # 4. 스트리밍 for frame, audio in zip(frames, self._split_audio(audio_chunk)): if not self.running: break # 비디오 프레임 전송 video_frame = rtc.VideoFrame( width=frame.shape[1], height=frame.shape[0], type=rtc.VideoBufferType.RGBA, data=frame.tobytes() ) self.video_source.capture_frame(video_frame) # 오디오 프레임 전송 audio_frame = rtc.AudioFrame( data=audio.tobytes(), sample_rate=16000, num_channels=1, samples_per_channel=len(audio) ) await self.audio_source.capture_frame(audio_frame) await asyncio.sleep(1/25) # 25 FPS # 남은 텍스트 처리 if response_text: await self._process_remaining(response_text) async def cleanup(self): """리소스 정리""" self.running = False await self.room.disconnect() ``` ## 3. 프론트엔드 구현 ### React 컴포넌트 ```typescript // components/StreamingAvatar.tsx import { useState, useEffect, useRef } from 'react'; import { Room, RoomEvent, VideoPresets, Track, RemoteTrack, RemoteTrackPublication, } from 'livekit-client'; interface StreamingAvatarProps { avatarId: string; onReady?: () => void; onError?: (error: Error) => void; } export function StreamingAvatar({ avatarId, onReady, onError }: StreamingAvatarProps) { const videoRef = useRef(null); const audioRef = useRef(null); const [room, setRoom] = useState(null); const [isConnected, setIsConnected] = useState(false); const wsRef = useRef(null); useEffect(() => { initializeSession(); return () => cleanup(); }, [avatarId]); async function initializeSession() { try { // 1. WebSocket 연결 const ws = new WebSocket(`wss://api.yourdomain.com/ws/${avatarId}`); wsRef.current = ws; ws.onopen = () => { // 아바타 세션 시작 요청 ws.send(JSON.stringify({ type: 'start_avatar', avatar_path: `/avatars/${avatarId}.png` })); }; ws.onmessage = async (event) => { const data = JSON.parse(event.data); if (data.token) { // 2. LiveKit 연결 await connectToRoom(data.livekit_url, data.token); } }; ws.onerror = (error) => { onError?.(new Error('WebSocket connection failed')); }; } catch (error) { onError?.(error as Error); } } async function connectToRoom(url: string, token: string) { const newRoom = new Room({ adaptiveStream: true, dynacast: true, videoCaptureDefaults: { resolution: VideoPresets.h720.resolution, }, }); // 트랙 구독 이벤트 newRoom.on(RoomEvent.TrackSubscribed, handleTrackSubscribed); newRoom.on(RoomEvent.TrackUnsubscribed, handleTrackUnsubscribed); newRoom.on(RoomEvent.Connected, () => { setIsConnected(true); onReady?.(); }); newRoom.on(RoomEvent.Disconnected, () => { setIsConnected(false); }); await newRoom.connect(url, token); setRoom(newRoom); } function handleTrackSubscribed( track: RemoteTrack, publication: RemoteTrackPublication ) { if (track.kind === Track.Kind.Video) { // 비디오 트랙 연결 track.attach(videoRef.current!); } else if (track.kind === Track.Kind.Audio) { // 오디오 트랙 연결 track.attach(audioRef.current!); } } function handleTrackUnsubscribed(track: RemoteTrack) { track.detach(); } function sendText(text: string) { wsRef.current?.send(JSON.stringify({ type: 'send_text', text })); } function cleanup() { wsRef.current?.send(JSON.stringify({ type: 'stop' })); wsRef.current?.close(); room?.disconnect(); } return (
); } ``` ### 사용 예시 ```typescript // pages/avatar-demo.tsx import { StreamingAvatar } from '@/components/StreamingAvatar'; import { useState } from 'react'; export default function AvatarDemo() { const [inputText, setInputText] = useState(''); const avatarRef = useRef<{ sendText: (text: string) => void }>(null); return (

Streaming Avatar Demo

console.log('Avatar ready!')} onError={(e) => console.error(e)} />
setInputText(e.target.value)} placeholder="Say something..." />
); } ``` ## 4. 네트워크 최적화 ### 적응형 비트레이트 ```python # src/streaming/adaptive_bitrate.py from dataclasses import dataclass from enum import Enum class QualityLevel(Enum): LOW = "low" # 360p, 500kbps MEDIUM = "medium" # 480p, 1Mbps HIGH = "high" # 720p, 2.5Mbps @dataclass class StreamConfig: width: int height: int fps: int bitrate: int QUALITY_CONFIGS = { QualityLevel.LOW: StreamConfig(640, 360, 25, 500_000), QualityLevel.MEDIUM: StreamConfig(854, 480, 25, 1_000_000), QualityLevel.HIGH: StreamConfig(1280, 720, 30, 2_500_000), } class AdaptiveBitrateController: def __init__(self, initial_quality: QualityLevel = QualityLevel.MEDIUM): self.current_quality = initial_quality self.packet_loss_history = [] self.rtt_history = [] def update_stats(self, packet_loss: float, rtt_ms: float): """네트워크 상태 업데이트""" self.packet_loss_history.append(packet_loss) self.rtt_history.append(rtt_ms) # 최근 10개 샘플만 유지 self.packet_loss_history = self.packet_loss_history[-10:] self.rtt_history = self.rtt_history[-10:] # 품질 조정 self._adjust_quality() def _adjust_quality(self): avg_loss = sum(self.packet_loss_history) / len(self.packet_loss_history) avg_rtt = sum(self.rtt_history) / len(self.rtt_history) if avg_loss > 5 or avg_rtt > 300: self._decrease_quality() elif avg_loss < 1 and avg_rtt < 100: self._increase_quality() def _decrease_quality(self): levels = list(QualityLevel) current_idx = levels.index(self.current_quality) if current_idx > 0: self.current_quality = levels[current_idx - 1] def _increase_quality(self): levels = list(QualityLevel) current_idx = levels.index(self.current_quality) if current_idx < len(levels) - 1: self.current_quality = levels[current_idx + 1] def get_config(self) -> StreamConfig: return QUALITY_CONFIGS[self.current_quality] ``` ## 5. 테스트 ### 통합 테스트 ```python # tests/test_streaming.py import pytest import asyncio from src.streaming.avatar_session import AvatarSession @pytest.mark.asyncio async def test_full_streaming_pipeline(): """전체 스트리밍 파이프라인 테스트""" session = AvatarSession( session_id="test-session", room_name="test-room", avatar_path="test_assets/avatar.png" ) # 연결 await session.connect_to_room() # 텍스트 전송 await session.process_text("안녕하세요, 테스트입니다.") # 정리 await session.cleanup() ``` ### 지연 시간 측정 ```python # benchmarks/measure_latency.py import time import asyncio async def measure_e2e_latency(): """End-to-End 지연 시간 측정""" start = time.time() # 1. 텍스트 전송 send_time = time.time() # 2. 첫 프레임 수신까지 대기 first_frame_time = await wait_for_first_frame() latency = first_frame_time - send_time print(f"E2E Latency: {latency*1000:.0f}ms") return latency # 목표: < 700ms ``` ## 다음 단계 (v6) TTS 엔진과 LLM을 통합하여 자연스러운 대화 시스템을 구현합니다. --- ## 참고 자료 - [LiveKit Documentation](https://docs.livekit.io/) - [WebRTC for the Curious](https://webrtcforthecurious.com/) - [aiortc - Python WebRTC](https://github.com/aiortc/aiortc) *이 시리즈는 총 10개의 포스트로 구성되어 있습니다.*