# Qwen-Image-Layered로 포스터 자동 레이어 분해 (5/10): FastAPI 백엔드 구축 ## 백엔드 아키텍처 설계 로컬에서 추론이 성공했다. 이제 이를 웹 서비스로 전환하자. **목표**: - 웹에서 이미지 업로드 - 백그라운드에서 AI 처리 - 실시간 진행 상황 전송 - 결과 다운로드 **핵심 컴포넌트**: 1. **FastAPI** - REST API 서버 2. **Redis** - 작업 큐 및 상태 저장 3. **Celery** (또는 간단한 Worker) - 백그라운드 작업 4. **WebSocket** - 실시간 업데이트 ## 프로젝트 구조 ``` backend/ ├── app/ │ ├── __init__.py │ ├── main.py # FastAPI 앱 │ ├── config.py # 환경 설정 │ │ │ ├── api/ # API 엔드포인트 │ │ ├── __init__.py │ │ ├── upload.py │ │ ├── decompose.py │ │ └── websocket.py │ │ │ ├── models/ # AI 모델 │ │ ├── __init__.py │ │ └── qwen_decomposer.py │ │ │ ├── services/ # 비즈니스 로직 │ │ ├── __init__.py │ │ ├── job_queue.py │ │ ├── storage.py │ │ └── processor.py │ │ │ └── schemas/ # Pydantic 모델 │ ├── __init__.py │ └── job.py │ ├── worker.py # 백그라운드 워커 ├── requirements.txt └── .env ``` ## 환경 설정 `.env`: ```bash # App APP_ENV=development HOST=0.0.0.0 PORT=8000 # Redis REDIS_HOST=localhost REDIS_PORT=6379 REDIS_DB=0 # Storage UPLOAD_DIR=./storage/uploads RESULTS_DIR=./storage/results MAX_FILE_SIZE=10485760 # 10MB ALLOWED_EXTENSIONS=jpg,jpeg,png,webp # Processing MAX_CONCURRENT_JOBS=2 JOB_TIMEOUT=600 CLEANUP_AFTER_HOURS=24 # Model MODEL_NAME=Qwen/Qwen-Image-Layered MODEL_CACHE_DIR=./models DEFAULT_NUM_LAYERS=5 DEFAULT_RESOLUTION=1024 ``` `config.py`: ```python from pydantic_settings import BaseSettings from functools import lru_cache class Settings(BaseSettings): # App app_env: str = "development" host: str = "0.0.0.0" port: int = 8000 # Redis redis_host: str = "localhost" redis_port: int = 6379 redis_db: int = 0 # Storage upload_dir: str = "./storage/uploads" results_dir: str = "./storage/results" max_file_size: int = 10485760 allowed_extensions: str = "jpg,jpeg,png,webp" # Processing max_concurrent_jobs: int = 2 job_timeout: int = 600 cleanup_after_hours: int = 24 # Model model_name: str = "Qwen/Qwen-Image-Layered" model_cache_dir: str = "./models" default_num_layers: int = 5 default_resolution: int = 1024 class Config: env_file = ".env" @lru_cache() def get_settings(): return Settings() ``` ## Pydantic 스키마 `schemas/job.py`: ```python from pydantic import BaseModel, Field from typing import List, Optional from datetime import datetime from enum import Enum class JobStatus(str, Enum): QUEUED = "queued" PROCESSING = "processing" COMPLETED = "completed" FAILED = "failed" class JobCreate(BaseModel): """작업 생성 요청""" num_layers: int = Field(default=5, ge=2, le=10) resolution: int = Field(default=1024, ge=512, le=2048) class LayerInfo(BaseModel): """레이어 정보""" index: int filename: str url: str size_kb: float description: Optional[str] = None class JobResponse(BaseModel): """작업 상태 응답""" job_id: str status: JobStatus progress: int = Field(ge=0, le=100) message: str created_at: datetime completed_at: Optional[datetime] = None layers: Optional[List[LayerInfo]] = None error: Optional[str] = None ``` ## Redis 작업 큐 `services/job_queue.py`: ```python import redis import json import uuid from datetime import datetime from typing import Dict, Optional from app.config import get_settings from app.schemas.job import JobStatus class JobQueue: def __init__(self): settings = get_settings() self.redis = redis.Redis( host=settings.redis_host, port=settings.redis_port, db=settings.redis_db, decode_responses=True ) def create_job( self, image_path: str, num_layers: int, resolution: int ) -> str: """작업 생성""" job_id = str(uuid.uuid4()) job_data = { "job_id": job_id, "image_path": image_path, "num_layers": num_layers, "resolution": resolution, "status": JobStatus.QUEUED, "progress": 0, "message": "대기 중...", "created_at": datetime.now().isoformat(), "completed_at": None, "layers": [], "error": None } # Redis에 저장 self.redis.set(f"job:{job_id}", json.dumps(job_data)) # 작업 큐에 추가 self.redis.lpush("job_queue", job_id) return job_id def get_job(self, job_id: str) -> Optional[Dict]: """작업 조회""" data = self.redis.get(f"job:{job_id}") return json.loads(data) if data else None def update_job( self, job_id: str, status: Optional[JobStatus] = None, progress: Optional[int] = None, message: Optional[str] = None, layers: Optional[list] = None, error: Optional[str] = None ): """작업 상태 업데이트""" job_data = self.get_job(job_id) if not job_data: return if status: job_data["status"] = status if progress is not None: job_data["progress"] = progress if message: job_data["message"] = message if layers: job_data["layers"] = layers if error: job_data["error"] = error if status == JobStatus.COMPLETED or status == JobStatus.FAILED: job_data["completed_at"] = datetime.now().isoformat() self.redis.set(f"job:{job_id}", json.dumps(job_data)) def dequeue_job(self) -> Optional[str]: """큐에서 작업 가져오기""" return self.redis.rpop("job_queue") def get_queue_length(self) -> int: """대기 중인 작업 수""" return self.redis.llen("job_queue") ``` ## FastAPI 엔드포인트 `main.py`: ```python from fastapi import FastAPI, UploadFile, File, HTTPException from fastapi.responses import FileResponse from fastapi.middleware.cors import CORSMiddleware from fastapi.staticfiles import StaticFiles import os from app.config import get_settings from app.api import upload, decompose, websocket settings = get_settings() app = FastAPI( title="Qwen Image Layered API", version="1.0.0" ) # CORS 설정 app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # 정적 파일 서빙 os.makedirs(settings.upload_dir, exist_ok=True) os.makedirs(settings.results_dir, exist_ok=True) app.mount("/uploads", StaticFiles(directory=settings.upload_dir), name="uploads") app.mount("/results", StaticFiles(directory=settings.results_dir), name="results") # 라우터 등록 app.include_router(upload.router, prefix="/api", tags=["upload"]) app.include_router(decompose.router, prefix="/api", tags=["decompose"]) app.include_router(websocket.router, prefix="/ws", tags=["websocket"]) @app.get("/") async def root(): return {"message": "Qwen Image Layered API"} @app.get("/health") async def health_check(): from app.services.job_queue import JobQueue queue = JobQueue() queue_length = queue.get_queue_length() return { "status": "healthy", "queue_length": queue_length } ``` ### 업로드 엔드포인트 `api/upload.py`: ```python from fastapi import APIRouter, UploadFile, File, HTTPException from app.config import get_settings import os import uuid from pathlib import Path router = APIRouter() settings = get_settings() ALLOWED_EXTENSIONS = set(settings.allowed_extensions.split(",")) def validate_file(file: UploadFile): """파일 검증""" # 확장자 확인 ext = file.filename.split(".")[-1].lower() if ext not in ALLOWED_EXTENSIONS: raise HTTPException( status_code=400, detail=f"허용되지 않은 파일 형식. 허용: {ALLOWED_EXTENSIONS}" ) # 파일 크기 확인 (실제로는 스트리밍 중 체크 필요) return True @router.post("/upload") async def upload_image(file: UploadFile = File(...)): """이미지 업로드""" validate_file(file) # 고유 파일명 생성 file_id = str(uuid.uuid4()) ext = file.filename.split(".")[-1] filename = f"{file_id}.{ext}" filepath = os.path.join(settings.upload_dir, filename) # 파일 저장 with open(filepath, "wb") as f: content = await file.read() # 크기 확인 if len(content) > settings.max_file_size: raise HTTPException( status_code=413, detail=f"파일이 너무 큽니다. 최대: {settings.max_file_size / 1024 / 1024}MB" ) f.write(content) return { "file_id": file_id, "filename": filename, "filepath": filepath, "size_bytes": len(content) } ``` ### 분해 엔드포인트 `api/decompose.py`: ```python from fastapi import APIRouter, HTTPException, Body from app.schemas.job import JobCreate, JobResponse from app.services.job_queue import JobQueue from app.config import get_settings import os router = APIRouter() settings = get_settings() @router.post("/decompose", response_model=JobResponse) async def decompose_image( file_id: str = Body(...), job_params: JobCreate = Body(...) ): """이미지 분해 작업 생성""" queue = JobQueue() # 파일 존재 확인 image_path = os.path.join( settings.upload_dir, f"{file_id}.jpg" # 업로드 시 저장한 확장자 ) if not os.path.exists(image_path): raise HTTPException(status_code=404, detail="파일을 찾을 수 없습니다") # 작업 생성 job_id = queue.create_job( image_path=image_path, num_layers=job_params.num_layers, resolution=job_params.resolution ) # 작업 정보 반환 job_data = queue.get_job(job_id) return JobResponse(**job_data) @router.get("/status/{job_id}", response_model=JobResponse) async def get_job_status(job_id: str): """작업 상태 조회""" queue = JobQueue() job_data = queue.get_job(job_id) if not job_data: raise HTTPException(status_code=404, detail="작업을 찾을 수 없습니다") return JobResponse(**job_data) @router.get("/download/{job_id}") async def download_results(job_id: str): """결과 다운로드 (ZIP)""" import zipfile from fastapi.responses import StreamingResponse import io queue = JobQueue() job_data = queue.get_job(job_id) if not job_data or job_data["status"] != "completed": raise HTTPException(status_code=400, detail="완료된 작업이 아닙니다") # ZIP 파일 생성 zip_buffer = io.BytesIO() with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zip_file: for layer in job_data["layers"]: layer_path = os.path.join(settings.results_dir, job_id, layer["filename"]) zip_file.write(layer_path, layer["filename"]) zip_buffer.seek(0) return StreamingResponse( zip_buffer, media_type="application/zip", headers={"Content-Disposition": f"attachment; filename=layers_{job_id}.zip"} ) ``` ### WebSocket 엔드포인트 `api/websocket.py`: ```python from fastapi import APIRouter, WebSocket, WebSocketDisconnect from app.services.job_queue import JobQueue import asyncio import json router = APIRouter() @router.websocket("/status/{job_id}") async def websocket_status(websocket: WebSocket, job_id: str): """작업 상태 실시간 업데이트""" await websocket.accept() queue = JobQueue() try: while True: # 작업 상태 조회 job_data = queue.get_job(job_id) if not job_data: await websocket.send_json({"error": "작업을 찾을 수 없습니다"}) break # 클라이언트에 전송 await websocket.send_json({ "job_id": job_data["job_id"], "status": job_data["status"], "progress": job_data["progress"], "message": job_data["message"] }) # 완료 또는 실패 시 종료 if job_data["status"] in ["completed", "failed"]: break # 1초 대기 await asyncio.sleep(1) except WebSocketDisconnect: print(f"WebSocket disconnected: {job_id}") ``` ## 백그라운드 워커 `worker.py`: ```python #!/usr/bin/env python3 """백그라운드 작업 워커""" import asyncio import time from app.services.job_queue import JobQueue from app.models.qwen_decomposer import QwenDecomposer from app.config import get_settings import os async def process_job(job_id: str, job_data: dict): """작업 처리""" queue = JobQueue() decomposer = QwenDecomposer() settings = get_settings() try: # 상태 업데이트: 처리 시작 queue.update_job( job_id, status="processing", progress=5, message="모델 로딩 중..." ) # 이미지 분해 def progress_callback(progress: int, message: str): queue.update_job(job_id, progress=progress, message=message) layers = await decomposer.decompose( image_path=job_data["image_path"], num_layers=job_data["num_layers"], resolution=job_data["resolution"], progress_callback=progress_callback ) # 결과 저장 result_dir = os.path.join(settings.results_dir, job_id) os.makedirs(result_dir, exist_ok=True) layer_info = [] for i, layer in enumerate(layers): filename = f"layer_{i}.png" filepath = os.path.join(result_dir, filename) layer.save(filepath, format="PNG") size_kb = os.path.getsize(filepath) / 1024 layer_info.append({ "index": i, "filename": filename, "url": f"/results/{job_id}/{filename}", "size_kb": round(size_kb, 2), "description": None # v6에서 Gemini로 생성 }) # 완료 queue.update_job( job_id, status="completed", progress=100, message="완료!", layers=layer_info ) except Exception as e: # 실패 queue.update_job( job_id, status="failed", progress=0, message="처리 실패", error=str(e) ) async def worker_loop(): """워커 메인 루프""" queue = JobQueue() settings = get_settings() print(f"Worker started (max_concurrent_jobs: {settings.max_concurrent_jobs})") active_jobs = set() while True: # 동시 실행 제한 if len(active_jobs) >= settings.max_concurrent_jobs: await asyncio.sleep(1) continue # 큐에서 작업 가져오기 job_id = queue.dequeue_job() if job_id: job_data = queue.get_job(job_id) print(f"Processing job: {job_id}") # 작업 실행 (비동기) task = asyncio.create_task(process_job(job_id, job_data)) active_jobs.add(task) # 완료된 작업 정리 task.add_done_callback(lambda t: active_jobs.discard(t)) else: # 큐가 비었으면 대기 await asyncio.sleep(0.5) if __name__ == "__main__": asyncio.run(worker_loop()) ``` ## 실행 ### Redis 시작 ```bash # Docker로 Redis 실행 docker run -d --name redis -p 6379:6379 redis:7-alpine # 또는 로컬 설치 sudo apt install redis-server sudo systemctl start redis ``` ### API 서버 시작 ```bash # 개발 모드 uvicorn app.main:app --reload --host 0.0.0.0 --port 8000 # 프로덕션 모드 uvicorn app.main:app --workers 4 --host 0.0.0.0 --port 8000 ``` ### 워커 시작 ```bash python worker.py ``` ## API 테스트 ```bash # 1. 이미지 업로드 curl -X POST http://localhost:8000/api/upload \ -F "file=@poster.jpg" # 응답: {"file_id": "uuid", ...} # 2. 분해 작업 생성 curl -X POST http://localhost:8000/api/decompose \ -H "Content-Type: application/json" \ -d '{ "file_id": "uuid", "num_layers": 5, "resolution": 1024 }' # 응답: {"job_id": "uuid", "status": "queued", ...} # 3. 상태 조회 curl http://localhost:8000/api/status/uuid # 4. 결과 다운로드 curl -O http://localhost:8000/api/download/uuid ``` ## 다음 단계 v6에서는 **Vertex AI 통합**을 다룬다: - Gemini Vision API로 레이어 설명 생성 - 최적 레이어 수 추천 - 레이어 품질 평가 백엔드가 완성되었으니, 이제 AI 보조 기능을 추가하자. --- **이전 글**: [로컬 환경 세팅과 첫 추론 (4/10)](./qwen-image-layered-v4.md) **다음 글**: [Vertex AI 통합 (6/10)](./qwen-image-layered-v6.md)