# Qwen-Image-Layered 클라우드 전환 (8/10): 하이브리드 아키텍처 구현 ## 왜 하이브리드인가? 단일 클라우드 의존의 위험: - Vertex AI 장애 시 서비스 중단 - API Rate Limit 도달 시 요청 실패 - 특정 지역에서 네트워크 이슈 **해결책**: 여러 백엔드를 조합한 **폴백 체인** ## 폴백 전략 ``` 1차: Vertex AI (Primary) ↓ 실패 시 2차: Hugging Face Inference Endpoints (Secondary) ↓ 실패 시 3차: 로컬 GPU (Tertiary, 선택적) ↓ 실패 시 4차: 에러 반환 ``` ## 추상화 레이어 설계 ### 1. 인터페이스 정의 ```python # models/base_decomposer.py from abc import ABC, abstractmethod from typing import List from PIL import Image class BaseDecomposer(ABC): """이미지 분해 추상 인터페이스""" @abstractmethod async def decompose( self, image_path: str, num_layers: int, resolution: int ) -> List[Image.Image]: """이미지를 레이어로 분해""" pass @abstractmethod def is_available(self) -> bool: """백엔드 사용 가능 여부""" pass @abstractmethod def get_priority(self) -> int: """우선순위 (낮을수록 우선)""" pass ``` ### 2. Vertex AI 구현 ```python # models/vertex_ai_decomposer.py from models.base_decomposer import BaseDecomposer from google.cloud import aiplatform import base64 from PIL import Image import io class VertexAIDecomposer(BaseDecomposer): def __init__(self): self.endpoint = aiplatform.Endpoint(ENDPOINT_NAME) self._available = True async def decompose(self, image_path, num_layers, resolution): try: # Base64 인코딩 with open(image_path, "rb") as f: image_b64 = base64.b64encode(f.read()).decode() # Vertex AI 호출 response = self.endpoint.predict( instances=[{ "image": image_b64, "layers": num_layers, "resolution": resolution }], timeout=300 ) # 레이어 디코딩 layers = [] for layer_b64 in response.predictions[0]["layers"]: layer_bytes = base64.b64decode(layer_b64) layers.append(Image.open(io.BytesIO(layer_bytes))) self._available = True # 성공 시 available 유지 return layers except Exception as e: self._available = False # 실패 시 temporary unavailable raise def is_available(self) -> bool: return self._available def get_priority(self) -> int: return 1 # 최우선 ``` ### 3. Hugging Face 구현 ```python # models/hf_decomposer.py from models.base_decomposer import BaseDecomposer import requests import base64 from PIL import Image import io class HuggingFaceDecomposer(BaseDecomposer): def __init__(self): self.endpoint_url = "https://xxxxx.endpoints.huggingface.cloud" self.api_token = os.getenv("HF_API_TOKEN") self._available = True async def decompose(self, image_path, num_layers, resolution): try: # Base64 인코딩 with open(image_path, "rb") as f: image_b64 = base64.b64encode(f.read()).decode() # Hugging Face API 호출 response = requests.post( self.endpoint_url, headers={"Authorization": f"Bearer {self.api_token}"}, json={ "inputs": image_b64, "parameters": { "layers": num_layers, "resolution": resolution } }, timeout=300 ) response.raise_for_status() # 레이어 디코딩 layers_b64 = response.json()["layers"] layers = [] for layer_b64 in layers_b64: layer_bytes = base64.b64decode(layer_b64) layers.append(Image.open(io.BytesIO(layer_bytes))) self._available = True return layers except Exception as e: self._available = False raise def is_available(self) -> bool: return self._available def get_priority(self) -> int: return 2 # 두 번째 우선순위 ``` ### 4. 로컬 GPU 구현 ```python # models/local_decomposer.py from models.base_decomposer import BaseDecomposer from PIL import Image import torch class LocalDecomposer(BaseDecomposer): def __init__(self): self._available = self._check_gpu() if self._available: from diffusers import QwenImageLayeredPipeline self.pipeline = QwenImageLayeredPipeline.from_pretrained( "Qwen/Qwen-Image-Layered" ) self.pipeline.to("cuda", torch.bfloat16) def _check_gpu(self) -> bool: """GPU 사용 가능 여부 확인""" try: import torch return torch.cuda.is_available() except: return False async def decompose(self, image_path, num_layers, resolution): if not self._available: raise RuntimeError("GPU not available") try: image = Image.open(image_path) layers = self.pipeline( image=image, layers=num_layers, resolution=resolution ) return layers except Exception as e: self._available = False raise def is_available(self) -> bool: return self._available def get_priority(self) -> int: return 3 # 마지막 우선순위 ``` ## 폴백 오케스트레이터 ### 자동 폴백 로직 ```python # services/decomposer_orchestrator.py from typing import List, Type from models.base_decomposer import BaseDecomposer from models.vertex_ai_decomposer import VertexAIDecomposer from models.hf_decomposer import HuggingFaceDecomposer from models.local_decomposer import LocalDecomposer from PIL import Image import logging logger = logging.getLogger(__name__) class DecomposerOrchestrator: def __init__(self): # 모든 백엔드 등록 self.decomposers: List[BaseDecomposer] = [ VertexAIDecomposer(), HuggingFaceDecomposer(), LocalDecomposer() ] # 우선순위로 정렬 self.decomposers.sort(key=lambda d: d.get_priority()) async def decompose( self, image_path: str, num_layers: int, resolution: int ) -> List[Image.Image]: """폴백 체인을 따라 분해 시도""" errors = [] for decomposer in self.decomposers: # 사용 불가능하면 스킵 if not decomposer.is_available(): logger.warning(f"{decomposer.__class__.__name__} is unavailable, skipping") continue try: logger.info(f"Trying {decomposer.__class__.__name__}...") layers = await decomposer.decompose( image_path, num_layers, resolution ) logger.info(f"✅ Success with {decomposer.__class__.__name__}") return layers except Exception as e: logger.error(f"❌ {decomposer.__class__.__name__} failed: {e}") errors.append({ "backend": decomposer.__class__.__name__, "error": str(e) }) continue # 모든 백엔드 실패 raise RuntimeError(f"All backends failed: {errors}") def get_status(self) -> dict: """모든 백엔드 상태 조회""" return { decomposer.__class__.__name__: decomposer.is_available() for decomposer in self.decomposers } ``` ### Worker 통합 ```python # worker.py 수정 from services.decomposer_orchestrator import DecomposerOrchestrator async def process_job(job_id: str, job_data: dict): queue = JobQueue() orchestrator = DecomposerOrchestrator() # 변경됨 settings = get_settings() try: queue.update_job( job_id, status="processing", progress=10, message="처리 중..." ) # 오케스트레이터를 통해 자동 폴백 layers = await orchestrator.decompose( image_path=job_data["image_path"], num_layers=job_data["num_layers"], resolution=job_data["resolution"] ) # 결과 저장 # ... queue.update_job( job_id, status="completed", progress=100, message="완료!", layers=layer_info ) except Exception as e: queue.update_job( job_id, status="failed", progress=0, message="모든 백엔드 실패", error=str(e) ) ``` ## 헬스 체크 엔드포인트 ### API 추가 ```python # api/health.py from fastapi import APIRouter from services.decomposer_orchestrator import DecomposerOrchestrator router = APIRouter() @router.get("/health/backends") async def check_backends(): """백엔드 상태 확인""" orchestrator = DecomposerOrchestrator() status = orchestrator.get_status() overall_healthy = any(status.values()) return { "healthy": overall_healthy, "backends": status } ``` ### 모니터링 ```python # scripts/monitor_backends.py import requests import time from datetime import datetime def monitor(): """백엔드 상태 모니터링""" while True: response = requests.get("http://localhost:8000/api/health/backends") data = response.json() print(f"\n=== {datetime.now().strftime('%H:%M:%S')} ===") print(f"Overall: {'✅ Healthy' if data['healthy'] else '❌ Unhealthy'}") for backend, available in data["backends"].items(): status = "✅" if available else "❌" print(f" {status} {backend}") time.sleep(60) # 1분마다 체크 if __name__ == "__main__": monitor() ``` ## 자동 복구 메커니즘 ### Circuit Breaker 패턴 ```python # services/circuit_breaker.py from datetime import datetime, timedelta from enum import Enum class CircuitState(Enum): CLOSED = "closed" # 정상 OPEN = "open" # 차단됨 HALF_OPEN = "half_open" # 복구 시도 class CircuitBreaker: def __init__(self, failure_threshold=3, timeout=60): self.failure_threshold = failure_threshold self.timeout = timeout # 초 self.failure_count = 0 self.last_failure_time = None self.state = CircuitState.CLOSED def call(self, func, *args, **kwargs): """Circuit Breaker를 통한 함수 호출""" # OPEN 상태: 타임아웃 경과 여부 체크 if self.state == CircuitState.OPEN: if datetime.now() - self.last_failure_time > timedelta(seconds=self.timeout): self.state = CircuitState.HALF_OPEN else: raise RuntimeError("Circuit breaker is OPEN") try: result = func(*args, **kwargs) # 성공 시 if self.state == CircuitState.HALF_OPEN: self.state = CircuitState.CLOSED self.failure_count = 0 return result except Exception as e: # 실패 시 self.failure_count += 1 self.last_failure_time = datetime.now() if self.failure_count >= self.failure_threshold: self.state = CircuitState.OPEN raise ``` ### Decomposer에 통합 ```python # models/vertex_ai_decomposer.py (수정) from services.circuit_breaker import CircuitBreaker class VertexAIDecomposer(BaseDecomposer): def __init__(self): self.endpoint = aiplatform.Endpoint(ENDPOINT_NAME) self.circuit_breaker = CircuitBreaker(failure_threshold=3, timeout=60) self._available = True async def decompose(self, image_path, num_layers, resolution): try: # Circuit Breaker를 통해 호출 return self.circuit_breaker.call( self._decompose_internal, image_path, num_layers, resolution ) except Exception as e: self._available = False raise def _decompose_internal(self, image_path, num_layers, resolution): """실제 분해 로직""" # ... 기존 코드 ... def is_available(self) -> bool: # Circuit Breaker 상태도 고려 return ( self._available and self.circuit_breaker.state != CircuitState.OPEN ) ``` ## 비용 최적화: 동적 라우팅 ### 비용 기반 라우팅 ```python # services/cost_aware_orchestrator.py from services.decomposer_orchestrator import DecomposerOrchestrator class CostAwareOrchestrator(DecomposerOrchestrator): def __init__(self): super().__init__() # 백엔드별 비용 (1회당) self.costs = { "VertexAIDecomposer": 0.00375, "HuggingFaceDecomposer": 0.06, "LocalDecomposer": 0.0 } async def decompose(self, image_path, num_layers, resolution, budget=None): """예산을 고려한 분해""" # 예산 제한이 있으면 저렴한 백엔드 우선 if budget: # 비용 순으로 정렬 decomposers = sorted( self.decomposers, key=lambda d: self.costs.get(d.__class__.__name__, 999) ) else: # 기본: 우선순위 순 decomposers = self.decomposers # 폴백 체인 실행 for decomposer in decomposers: if not decomposer.is_available(): continue cost = self.costs.get(decomposer.__class__.__name__, 0) # 예산 초과하면 스킵 if budget and cost > budget: logger.info(f"Skipping {decomposer.__class__.__name__} (cost ${cost} > budget ${budget})") continue try: layers = await decomposer.decompose( image_path, num_layers, resolution ) # 비용 기록 logger.info(f"✅ Used {decomposer.__class__.__name__} (cost: ${cost})") return layers except Exception as e: logger.error(f"❌ {decomposer.__class__.__name__} failed: {e}") continue raise RuntimeError("No backend available within budget") ``` ## 테스트 ### 폴백 시나리오 테스트 ```python # tests/test_fallback.py import pytest from services.decomposer_orchestrator import DecomposerOrchestrator @pytest.mark.asyncio async def test_fallback_to_secondary(): """Primary 실패 시 Secondary로 폴백""" orchestrator = DecomposerOrchestrator() # Vertex AI를 강제로 unavailable 설정 orchestrator.decomposers[0]._available = False # 분해 시도 (HuggingFace로 폴백되어야 함) layers = await orchestrator.decompose( "test_image.jpg", num_layers=3, resolution=512 ) assert len(layers) == 3 ``` ## 다음 단계 v9에서는 **비용 분석 및 모니터링 대시보드**를 구축한다: 1. 실시간 비용 추적 2. 백엔드별 사용량 분석 3. 비용 최적화 제안 4. 알림 설정 하이브리드 아키텍처로 안정성을 확보했으니, 이제 운영 가시성을 높일 차례다. --- **이전 글**: [성능 벤치마크 (7/10)](./update-qwen-image-layered-project-v7.md) **다음 글**: [비용 모니터링 대시보드 (9/10)](./update-qwen-image-layered-project-v9.md) **참고 자료**: - [Circuit Breaker Pattern](https://martinfowler.com/bliki/CircuitBreaker.html) - [Fallback Strategies in Microservices](https://microservices.io/patterns/reliability/circuit-breaker.html)