# Qwen-Image-Layered 클라우드 전환 (5/10): API 엔드포인트 재설계 ## 기존 아키텍처 회고 기존 시스템 (로컬 GPU 기반): ``` [FastAPI] → [Worker] → [로컬 GPU] → [Qwen Pipeline] ↓ [Redis Queue] ``` **핵심 코드** (`models/qwen_decomposer.py`): ```python class QwenDecomposer: def __init__(self): self.pipeline = QwenImageLayeredPipeline.from_pretrained( "Qwen/Qwen-Image-Layered" ) self.pipeline.to("cuda", torch.bfloat16) async def decompose(self, image_path, num_layers, resolution): image = Image.open(image_path) layers = self.pipeline( image=image, layers=num_layers, resolution=resolution ) return layers ``` **문제**: - GPU가 없으면 실행 불가 - 동시 처리 제한 (GPU 메모리) - 스케일링 불가능 ## 새로운 아키텍처 Vertex AI 통합: ``` [FastAPI] → [Worker] → [Vertex AI Endpoint] → [Qwen (Cloud GPU)] ↓ [Redis Queue] ``` **변경 사항**: - 로컬 GPU → Vertex AI API 호출 - 동기 → 비동기 요청 - 직접 메모리 접근 → HTTP/JSON ## Vertex AI 클라이언트 구현 ### 1. 설정 파일 업데이트 `.env`: ```bash # Vertex AI VERTEX_AI_PROJECT_ID=poster-decomposer-12345 VERTEX_AI_LOCATION=us-central1 VERTEX_AI_ENDPOINT_ID=1234567890123456789 # 서비스 계정 키 GOOGLE_APPLICATION_CREDENTIALS=./vertex-ai-key.json # 기존 설정 REDIS_HOST=localhost ... ``` `config.py`: ```python from pydantic_settings import BaseSettings class Settings(BaseSettings): # ... 기존 설정 ... # Vertex AI vertex_ai_project_id: str vertex_ai_location: str vertex_ai_endpoint_id: str vertex_ai_timeout: int = 300 # 5분 class Config: env_file = ".env" ``` ### 2. Vertex AI 클라이언트 `models/vertex_ai_client.py`: ```python from google.cloud import aiplatform from google.auth import default import base64 from PIL import Image import io from typing import List from app.config import get_settings class VertexAIClient: def __init__(self): settings = get_settings() # 인증 credentials, project = default() # Vertex AI 초기화 aiplatform.init( project=settings.vertex_ai_project_id, location=settings.vertex_ai_location, credentials=credentials ) # 엔드포인트 가져오기 self.endpoint = aiplatform.Endpoint( endpoint_name=f"projects/{settings.vertex_ai_project_id}/locations/{settings.vertex_ai_location}/endpoints/{settings.vertex_ai_endpoint_id}" ) self.timeout = settings.vertex_ai_timeout def decompose_image( self, image_path: str, num_layers: int = 5, resolution: int = 1024 ) -> List[Image.Image]: """이미지를 레이어로 분해""" # 1. 이미지를 Base64로 인코딩 with open(image_path, "rb") as f: image_bytes = f.read() image_b64 = base64.b64encode(image_bytes).decode() # 2. Vertex AI 엔드포인트 호출 response = self.endpoint.predict( instances=[{ "image": image_b64, "layers": num_layers, "resolution": resolution }], timeout=self.timeout ) # 3. 응답에서 레이어 추출 predictions = response.predictions[0] layers_b64 = predictions["layers"] # 4. Base64 → PIL Image 변환 layers = [] for layer_b64 in layers_b64: layer_bytes = base64.b64decode(layer_b64) layer_image = Image.open(io.BytesIO(layer_bytes)) layers.append(layer_image) return layers async def decompose_image_async( self, image_path: str, num_layers: int = 5, resolution: int = 1024 ) -> List[Image.Image]: """비동기 버전""" import asyncio # Vertex AI 클라이언트는 동기 API만 제공 # executor로 비동기 래핑 loop = asyncio.get_event_loop() return await loop.run_in_executor( None, self.decompose_image, image_path, num_layers, resolution ) ``` ### 3. Worker 업데이트 `worker.py`: ```python #!/usr/bin/env python3 import asyncio from app.services.job_queue import JobQueue from app.models.vertex_ai_client import VertexAIClient # 변경됨 from app.config import get_settings import os async def process_job(job_id: str, job_data: dict): """작업 처리""" queue = JobQueue() client = VertexAIClient() # Qwen_Decomposer 대신 settings = get_settings() try: # 상태 업데이트 queue.update_job( job_id, status="processing", progress=10, message="Vertex AI에 요청 중..." ) # 이미지 분해 (Vertex AI 호출) def progress_callback(progress: int, message: str): queue.update_job(job_id, progress=progress, message=message) # 진행 상황 업데이트 progress_callback(30, "클라우드 GPU에서 처리 중...") # Vertex AI 호출 layers = await client.decompose_image_async( image_path=job_data["image_path"], num_layers=job_data["num_layers"], resolution=job_data["resolution"] ) progress_callback(80, "레이어 저장 중...") # 결과 저장 result_dir = os.path.join(settings.results_dir, job_id) os.makedirs(result_dir, exist_ok=True) layer_info = [] for i, layer in enumerate(layers): filename = f"layer_{i}.png" filepath = os.path.join(result_dir, filename) layer.save(filepath, format="PNG") size_kb = os.path.getsize(filepath) / 1024 layer_info.append({ "index": i, "filename": filename, "url": f"/results/{job_id}/{filename}", "size_kb": round(size_kb, 2) }) # 완료 queue.update_job( job_id, status="completed", progress=100, message="완료!", layers=layer_info ) except Exception as e: queue.update_job( job_id, status="failed", progress=0, message="처리 실패", error=str(e) ) # worker_loop는 동일 ``` ## 에러 핸들링 강화 ### Vertex AI 특정 에러 ```python # models/vertex_ai_client.py from google.api_core.exceptions import ( GoogleAPIError, ResourceExhausted, DeadlineExceeded ) class VertexAIClient: def decompose_image(self, image_path, num_layers, resolution): try: # ... 기존 코드 ... except ResourceExhausted as e: # Rate Limit 또는 Quota 초과 raise ValueError(f"Vertex AI quota exceeded: {e}") except DeadlineExceeded as e: # 타임아웃 raise TimeoutError(f"Vertex AI request timeout: {e}") except GoogleAPIError as e: # 기타 GCP 에러 raise RuntimeError(f"Vertex AI error: {e}") ``` ### Worker에서 재시도 로직 ```python async def process_job_with_retry(job_id: str, job_data: dict, max_retries=3): """재시도 로직 포함""" for attempt in range(max_retries): try: await process_job(job_id, job_data) return # 성공 시 종료 except ResourceExhausted: # Rate Limit → 대기 후 재시도 wait_time = 2 ** attempt # Exponential backoff await asyncio.sleep(wait_time) continue except TimeoutError: # 타임아웃 → 재시도 continue except Exception as e: # 기타 에러 → 즉시 실패 queue.update_job( job_id, status="failed", error=f"Unrecoverable error: {e}" ) break ``` ## 성능 최적화 ### 1. 이미지 압축 Vertex AI로 보내는 이미지 크기를 줄여 네트워크 시간 단축: ```python def compress_image_for_api(image_path: str, max_size: int = 1024) -> str: """API 전송용 이미지 압축""" image = Image.open(image_path) # 비율 유지하며 리사이즈 image.thumbnail((max_size, max_size), Image.LANCZOS) # JPEG로 압축 (품질 85) buffer = io.BytesIO() image.save(buffer, format="JPEG", quality=85) image_bytes = buffer.getvalue() return base64.b64encode(image_bytes).decode() ``` ### 2. 병렬 처리 여러 요청을 동시에 처리: ```python # worker.py의 worker_loop 수정 async def worker_loop(): queue = JobQueue() settings = get_settings() # 동시 실행 태스크 active_tasks = [] while True: # 최대 동시 작업 수 체크 if len(active_tasks) < settings.max_concurrent_jobs: job_id = queue.dequeue_job() if job_id: job_data = queue.get_job(job_id) # 비동기 태스크 생성 task = asyncio.create_task( process_job_with_retry(job_id, job_data) ) active_tasks.append(task) # 완료된 태스크 정리 done_tasks = [t for t in active_tasks if t.done()] for task in done_tasks: active_tasks.remove(task) # 예외 처리 try: task.result() except Exception as e: print(f"Task failed: {e}") await asyncio.sleep(0.5) ``` ## 테스트 ### 단위 테스트 ```python # tests/test_vertex_ai_client.py import pytest from app.models.vertex_ai_client import VertexAIClient from PIL import Image import tempfile @pytest.fixture def sample_image(): # 테스트용 이미지 생성 img = Image.new("RGB", (512, 512), color="red") with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as f: img.save(f, format="JPEG") return f.name def test_decompose_image(sample_image): client = VertexAIClient() layers = client.decompose_image( image_path=sample_image, num_layers=3, resolution=512 ) assert len(layers) == 3 assert all(isinstance(layer, Image.Image) for layer in layers) ``` ### 통합 테스트 ```bash # 1. API 서버 시작 uvicorn app.main:app --reload & # 2. Worker 시작 python worker.py & # 3. 이미지 업로드 curl -X POST http://localhost:8000/api/upload \ -F "file=@test_poster.jpg" # 응답: {"file_id": "uuid-1"} # 4. 분해 작업 생성 curl -X POST http://localhost:8000/api/decompose \ -H "Content-Type: application/json" \ -d '{ "file_id": "uuid-1", "num_layers": 5, "resolution": 1024 }' # 응답: {"job_id": "uuid-2", "status": "queued"} # 5. 상태 모니터링 (WebSocket) wscat -c ws://localhost:8000/ws/status/uuid-2 # 출력: # {"status": "processing", "progress": 10, "message": "Vertex AI에 요청 중..."} # {"status": "processing", "progress": 30, "message": "클라우드 GPU에서 처리 중..."} # {"status": "completed", "progress": 100, "message": "완료!"} ``` ## 비용 추적 ### 로깅 추가 ```python # models/vertex_ai_client.py import time import logging logger = logging.getLogger(__name__) class VertexAIClient: def decompose_image(self, image_path, num_layers, resolution): start_time = time.time() try: # ... API 호출 ... end_time = time.time() duration = end_time - start_time # 비용 추정 cost = self._estimate_cost(duration) logger.info( f"Vertex AI inference completed in {duration:.2f}s, " f"estimated cost: ${cost:.4f}" ) return layers except Exception as e: logger.error(f"Vertex AI inference failed: {e}") raise def _estimate_cost(self, duration_seconds: float) -> float: """비용 추정 (T4 GPU 기준)""" COST_PER_HOUR = 0.45 hours = duration_seconds / 3600 return COST_PER_HOUR * hours ``` ### 일일 비용 집계 ```python # scripts/daily_cost_report.py import redis from datetime import datetime, timedelta redis_client = redis.Redis(decode_responses=True) # 오늘 날짜 키 today_key = f"cost:{datetime.now().strftime('%Y-%m-%d')}" # 비용 누적 (Worker에서 호출) def track_cost(cost: float): redis_client.incrbyfloat(today_key, cost) redis_client.expire(today_key, 86400 * 30) # 30일 보관 # 일일 리포트 def daily_report(): today = datetime.now() last_7_days = [today - timedelta(days=i) for i in range(7)] print("=== Vertex AI Cost Report ===") for day in reversed(last_7_days): key = f"cost:{day.strftime('%Y-%m-%d')}" cost = redis_client.get(key) or 0 print(f"{day.strftime('%Y-%m-%d')}: ${float(cost):.2f}") if __name__ == "__main__": daily_report() ``` ## 마이그레이션 체크리스트 기존 로컬 GPU 코드 제거: ```bash # ❌ 삭제할 파일 rm app/models/qwen_decomposer.py # ✅ 새로 추가된 파일 app/models/vertex_ai_client.py scripts/daily_cost_report.py # 📝 수정된 파일 app/config.py worker.py requirements.txt ``` `requirements.txt` 업데이트: ```diff - torch>=2.0 - diffusers>=0.30.0 + google-cloud-aiplatform>=1.40.0 + google-auth>=2.25.0 ``` ## 다음 단계 v6에서는 **비용 최적화 전략**을 다룬다: 1. 자동 스케일링 설정 최적화 2. Spot Instances 활용 3. 캐싱 전략 4. 비용 알림 자동화 클라우드 전환은 완료되었고, 이제 운영 효율을 높일 차례다. --- **이전 글**: [Vertex AI 배포 실전 (4/10)](./update-qwen-image-layered-project-v4.md) **다음 글**: [비용 최적화 전략 (6/10)](./update-qwen-image-layered-project-v6.md) **참고 자료**: - [Vertex AI Python Client](https://cloud.google.com/python/docs/reference/aiplatform/latest) - [Async I/O in Python](https://docs.python.org/3/library/asyncio.html)