# PlanitAI KPI 개발기 v11: 보안 및 인증 > 시리즈: PlanitAI KPI 개발 여정 (11/16) > 작성일: 2024년 12월 ## 개요 SaaS 서비스에서 보안은 선택이 아닌 필수입니다. 특히 KPI 데이터는 기업의 핵심 경영 정보이므로 철저한 보안이 요구됩니다. 이번 글에서는 PlanitAI KPI의 보안 아키텍처를 설계합니다. --- ## 1. 보안 아키텍처 개요 ### 1.1 보안 레이어 ``` ┌─────────────────────────────────────────────────────────────┐ │ Security Layers │ ├─────────────────────────────────────────────────────────────┤ │ ┌─────────────────────────────────────────────────────┐ │ │ │ L1: Network Security │ │ │ │ - HTTPS/TLS 1.3 │ │ │ │ - WAF (Web Application Firewall) │ │ │ │ - DDoS Protection │ │ │ └─────────────────────────────────────────────────────┘ │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ L2: Authentication │ │ │ │ - JWT Bearer Token │ │ │ │ - OAuth 2.0 / OIDC │ │ │ │ - MFA (Multi-Factor Authentication) │ │ │ └─────────────────────────────────────────────────────┘ │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ L3: Authorization │ │ │ │ - RBAC (Role-Based Access Control) │ │ │ │ - Resource-level Permissions │ │ │ │ - Organization Isolation │ │ │ └─────────────────────────────────────────────────────┘ │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ L4: Data Security │ │ │ │ - Encryption at Rest (AES-256) │ │ │ │ - Encryption in Transit (TLS) │ │ │ │ - Data Masking │ │ │ └─────────────────────────────────────────────────────┘ │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ L5: Audit & Monitoring │ │ │ │ - Access Logging │ │ │ │ - Anomaly Detection │ │ │ │ - Compliance Reporting │ │ │ └─────────────────────────────────────────────────────┘ │ └─────────────────────────────────────────────────────────────┘ ``` --- ## 2. 인증 (Authentication) ### 2.1 JWT 토큰 구조 ```python # src/auth/jwt.py from datetime import datetime, timedelta from typing import Optional import jwt from pydantic import BaseModel class TokenPayload(BaseModel): """JWT 페이로드""" sub: str # 사용자 ID org_id: str # 조직 ID email: str # 이메일 role: str # 역할 permissions: list[str] # 권한 목록 exp: datetime # 만료 시간 iat: datetime # 발급 시간 jti: str # 토큰 ID (revoke용) class JWTManager: """JWT 토큰 관리자""" def __init__( self, secret_key: str, algorithm: str = "HS256", access_token_expire_minutes: int = 30, refresh_token_expire_days: int = 7 ): self.secret_key = secret_key self.algorithm = algorithm self.access_expire = timedelta(minutes=access_token_expire_minutes) self.refresh_expire = timedelta(days=refresh_token_expire_days) def create_access_token( self, user_id: str, org_id: str, email: str, role: str, permissions: list[str] ) -> str: """액세스 토큰 생성""" now = datetime.utcnow() payload = { "sub": user_id, "org_id": org_id, "email": email, "role": role, "permissions": permissions, "exp": now + self.access_expire, "iat": now, "jti": self._generate_jti(), "type": "access" } return jwt.encode(payload, self.secret_key, algorithm=self.algorithm) def create_refresh_token(self, user_id: str) -> str: """리프레시 토큰 생성""" now = datetime.utcnow() payload = { "sub": user_id, "exp": now + self.refresh_expire, "iat": now, "jti": self._generate_jti(), "type": "refresh" } return jwt.encode(payload, self.secret_key, algorithm=self.algorithm) def verify_token(self, token: str) -> Optional[dict]: """토큰 검증""" try: payload = jwt.decode( token, self.secret_key, algorithms=[self.algorithm] ) # Revoked 토큰 체크 if self._is_token_revoked(payload.get("jti")): return None return payload except jwt.ExpiredSignatureError: return None except jwt.InvalidTokenError: return None def _generate_jti(self) -> str: """토큰 ID 생성""" import uuid return str(uuid.uuid4()) def _is_token_revoked(self, jti: str) -> bool: """토큰 무효화 여부 확인 (Redis에서 체크)""" # TODO: Redis 블랙리스트 체크 return False ``` ### 2.2 OAuth 2.0 / Google SSO 연동 ```python # src/auth/oauth.py from authlib.integrations.starlette_client import OAuth from starlette.config import Config class OAuthManager: """OAuth 인증 관리자""" def __init__(self): self.oauth = OAuth() # Google OAuth 설정 self.oauth.register( name='google', client_id=Config().get('GOOGLE_CLIENT_ID'), client_secret=Config().get('GOOGLE_CLIENT_SECRET'), server_metadata_url='https://accounts.google.com/.well-known/openid-configuration', client_kwargs={'scope': 'openid email profile'} ) async def get_google_login_url(self, redirect_uri: str) -> str: """Google 로그인 URL 생성""" return await self.oauth.google.authorize_redirect(redirect_uri) async def handle_google_callback(self, request) -> dict: """Google 콜백 처리""" token = await self.oauth.google.authorize_access_token(request) user_info = token.get('userinfo') return { "email": user_info.get("email"), "name": user_info.get("name"), "picture": user_info.get("picture"), "google_id": user_info.get("sub") } ``` ### 2.3 비밀번호 보안 ```python # src/auth/password.py import bcrypt from passlib.context import CryptContext class PasswordManager: """비밀번호 관리자""" def __init__(self): self.context = CryptContext( schemes=["bcrypt"], deprecated="auto", bcrypt__rounds=12 # 보안과 성능의 균형 ) def hash_password(self, password: str) -> str: """비밀번호 해시""" return self.context.hash(password) def verify_password(self, plain_password: str, hashed_password: str) -> bool: """비밀번호 검증""" return self.context.verify(plain_password, hashed_password) def validate_password_strength(self, password: str) -> tuple[bool, list[str]]: """비밀번호 강도 검증""" errors = [] if len(password) < 8: errors.append("Password must be at least 8 characters") if len(password) > 128: errors.append("Password must be at most 128 characters") if not any(c.isupper() for c in password): errors.append("Password must contain at least one uppercase letter") if not any(c.islower() for c in password): errors.append("Password must contain at least one lowercase letter") if not any(c.isdigit() for c in password): errors.append("Password must contain at least one digit") if not any(c in "!@#$%^&*()_+-=[]{}|;:,.<>?" for c in password): errors.append("Password must contain at least one special character") return len(errors) == 0, errors ``` --- ## 3. 권한 관리 (Authorization) ### 3.1 RBAC 모델 ```python # src/auth/rbac.py from enum import Enum from typing import Set from functools import wraps class Role(str, Enum): """사용자 역할""" OWNER = "owner" # 조직 소유자 ADMIN = "admin" # 관리자 EDITOR = "editor" # 편집자 VIEWER = "viewer" # 뷰어 class Permission(str, Enum): """권한 목록""" # KPI Tree TREE_CREATE = "tree:create" TREE_READ = "tree:read" TREE_UPDATE = "tree:update" TREE_DELETE = "tree:delete" # KPI Node NODE_CREATE = "node:create" NODE_READ = "node:read" NODE_UPDATE = "node:update" NODE_DELETE = "node:delete" # KPI Data DATA_CREATE = "data:create" DATA_READ = "data:read" DATA_UPDATE = "data:update" DATA_DELETE = "data:delete" DATA_IMPORT = "data:import" DATA_EXPORT = "data:export" # Analysis ANALYSIS_RUN = "analysis:run" ANALYSIS_READ = "analysis:read" # Organization ORG_MANAGE = "org:manage" ORG_INVITE = "org:invite" ORG_BILLING = "org:billing" # Admin ADMIN_USERS = "admin:users" ADMIN_SETTINGS = "admin:settings" # 역할별 권한 매핑 ROLE_PERMISSIONS: dict[Role, Set[Permission]] = { Role.OWNER: set(Permission), # 모든 권한 Role.ADMIN: { Permission.TREE_CREATE, Permission.TREE_READ, Permission.TREE_UPDATE, Permission.TREE_DELETE, Permission.NODE_CREATE, Permission.NODE_READ, Permission.NODE_UPDATE, Permission.NODE_DELETE, Permission.DATA_CREATE, Permission.DATA_READ, Permission.DATA_UPDATE, Permission.DATA_DELETE, Permission.DATA_IMPORT, Permission.DATA_EXPORT, Permission.ANALYSIS_RUN, Permission.ANALYSIS_READ, Permission.ORG_INVITE, Permission.ADMIN_USERS, Permission.ADMIN_SETTINGS, }, Role.EDITOR: { Permission.TREE_CREATE, Permission.TREE_READ, Permission.TREE_UPDATE, Permission.NODE_CREATE, Permission.NODE_READ, Permission.NODE_UPDATE, Permission.DATA_CREATE, Permission.DATA_READ, Permission.DATA_UPDATE, Permission.DATA_IMPORT, Permission.ANALYSIS_RUN, Permission.ANALYSIS_READ, }, Role.VIEWER: { Permission.TREE_READ, Permission.NODE_READ, Permission.DATA_READ, Permission.DATA_EXPORT, Permission.ANALYSIS_READ, }, } class RBACManager: """RBAC 권한 관리자""" def has_permission(self, role: Role, permission: Permission) -> bool: """권한 확인""" return permission in ROLE_PERMISSIONS.get(role, set()) def get_permissions(self, role: Role) -> Set[Permission]: """역할의 권한 목록""" return ROLE_PERMISSIONS.get(role, set()) def check_permissions( self, role: Role, required: list[Permission] ) -> tuple[bool, list[Permission]]: """권한 검사 (필요 권한 목록)""" user_perms = self.get_permissions(role) missing = [p for p in required if p not in user_perms] return len(missing) == 0, missing ``` ### 3.2 권한 데코레이터 ```python # src/auth/decorators.py from functools import wraps from fastapi import HTTPException, status, Depends from fastapi.security import HTTPBearer from .rbac import Permission, RBACManager from .jwt import JWTManager security = HTTPBearer() def require_permissions(*permissions: Permission): """권한 필요 데코레이터""" def decorator(func): @wraps(func) async def wrapper(*args, **kwargs): # 현재 사용자 정보 가져오기 current_user = kwargs.get("current_user") if not current_user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Not authenticated" ) # 권한 체크 rbac = RBACManager() has_all, missing = rbac.check_permissions( current_user.role, list(permissions) ) if not has_all: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=f"Missing permissions: {[p.value for p in missing]}" ) return await func(*args, **kwargs) return wrapper return decorator # FastAPI Dependency async def get_current_user(token: str = Depends(security)): """현재 사용자 가져오기""" jwt_manager = JWTManager(secret_key="...") payload = jwt_manager.verify_token(token.credentials) if not payload: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid or expired token" ) return TokenPayload(**payload) ``` ### 3.3 리소스 레벨 권한 ```python # src/auth/resource_permission.py from sqlalchemy.orm import Session from src.models import KPITree, TreeMember class ResourcePermissionChecker: """리소스 레벨 권한 검사""" def __init__(self, db: Session): self.db = db def can_access_tree(self, user_id: str, tree_id: str) -> bool: """트리 접근 권한 확인""" # 트리 조회 tree = self.db.query(KPITree).filter(KPITree.id == tree_id).first() if not tree: return False # 조직 소속 확인 # (사용자가 트리 조직에 소속되어 있는지) # 개별 권한 확인 membership = self.db.query(TreeMember).filter( TreeMember.tree_id == tree_id, TreeMember.user_id == user_id ).first() return membership is not None def get_tree_role(self, user_id: str, tree_id: str) -> str: """트리에서의 역할 조회""" membership = self.db.query(TreeMember).filter( TreeMember.tree_id == tree_id, TreeMember.user_id == user_id ).first() if membership: return membership.role return None # API에서 사용 @router.get("/trees/{tree_id}") @require_permissions(Permission.TREE_READ) async def get_tree( tree_id: str, current_user: TokenPayload = Depends(get_current_user), db: Session = Depends(get_db) ): """트리 조회""" checker = ResourcePermissionChecker(db) if not checker.can_access_tree(current_user.sub, tree_id): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="You don't have access to this tree" ) # 트리 조회 로직... ``` --- ## 4. 데이터 보안 ### 4.1 저장 데이터 암호화 ```python # src/security/encryption.py from cryptography.fernet import Fernet from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC import base64 import os class DataEncryptor: """데이터 암호화 관리자""" def __init__(self, master_key: str): # 마스터 키에서 Fernet 키 유도 kdf = PBKDF2HMAC( algorithm=hashes.SHA256(), length=32, salt=b"planitai_kpi_salt", # 실제로는 환경변수로 iterations=100_000, ) key = base64.urlsafe_b64encode(kdf.derive(master_key.encode())) self.fernet = Fernet(key) def encrypt(self, data: str) -> str: """데이터 암호화""" return self.fernet.encrypt(data.encode()).decode() def decrypt(self, encrypted_data: str) -> str: """데이터 복호화""" return self.fernet.decrypt(encrypted_data.encode()).decode() class SensitiveFieldEncryptor: """민감 필드 암호화""" SENSITIVE_FIELDS = [ "api_key", "webhook_secret", "external_credentials" ] def __init__(self, encryptor: DataEncryptor): self.encryptor = encryptor def encrypt_model(self, model: dict) -> dict: """모델의 민감 필드 암호화""" result = model.copy() for field in self.SENSITIVE_FIELDS: if field in result and result[field]: result[field] = self.encryptor.encrypt(result[field]) result[f"_{field}_encrypted"] = True return result def decrypt_model(self, model: dict) -> dict: """모델의 민감 필드 복호화""" result = model.copy() for field in self.SENSITIVE_FIELDS: if result.get(f"_{field}_encrypted") and field in result: result[field] = self.encryptor.decrypt(result[field]) del result[f"_{field}_encrypted"] return result ``` ### 4.2 데이터 마스킹 ```python # src/security/masking.py import re from typing import Any class DataMasker: """데이터 마스킹""" MASKING_PATTERNS = { "email": r"(^[^@]{2})[^@]*(@.*)$", "phone": r"(\d{3})\d{4}(\d{4})", "name": r"^(.).*$", } def mask_email(self, email: str) -> str: """이메일 마스킹: ab***@domain.com""" if not email: return email match = re.match(self.MASKING_PATTERNS["email"], email) if match: return f"{match.group(1)}***{match.group(2)}" return email def mask_phone(self, phone: str) -> str: """전화번호 마스킹: 010-****-1234""" if not phone: return phone digits = re.sub(r"\D", "", phone) if len(digits) >= 10: return f"{digits[:3]}-****-{digits[-4:]}" return phone def mask_name(self, name: str) -> str: """이름 마스킹: 김**""" if not name: return name if len(name) == 1: return "*" elif len(name) == 2: return f"{name[0]}*" else: return f"{name[0]}{'*' * (len(name) - 1)}" def mask_kpi_value(self, value: float, role: str) -> Any: """KPI 값 마스킹 (권한에 따라)""" if role in ["owner", "admin", "editor"]: return value # Viewer는 대략적인 범위만 표시 if value >= 1_000_000_000: return "10억+" elif value >= 100_000_000: return "1억+" elif value >= 10_000_000: return "1천만+" else: return "***" ``` --- ## 5. API 보안 ### 5.1 Rate Limiting ```python # src/security/rate_limit.py from datetime import datetime from typing import Optional import redis from fastapi import Request, HTTPException class RateLimiter: """Rate Limiter""" def __init__(self, redis_client: redis.Redis): self.redis = redis_client # 엔드포인트별 제한 설정 self.limits = { "default": {"requests": 100, "window": 60}, # 분당 100회 "analysis": {"requests": 10, "window": 60}, # 분당 10회 (AI) "export": {"requests": 5, "window": 300}, # 5분당 5회 "auth": {"requests": 5, "window": 60}, # 분당 5회 (로그인) } async def check_rate_limit( self, key: str, endpoint_type: str = "default" ) -> tuple[bool, dict]: """Rate limit 체크""" limit_config = self.limits.get(endpoint_type, self.limits["default"]) max_requests = limit_config["requests"] window_seconds = limit_config["window"] redis_key = f"rate_limit:{endpoint_type}:{key}" # 현재 카운트 조회 current = self.redis.get(redis_key) if current is None: # 첫 요청 self.redis.setex(redis_key, window_seconds, 1) remaining = max_requests - 1 return True, { "limit": max_requests, "remaining": remaining, "reset": window_seconds } current_count = int(current) if current_count >= max_requests: # 제한 초과 ttl = self.redis.ttl(redis_key) return False, { "limit": max_requests, "remaining": 0, "reset": ttl } # 카운트 증가 self.redis.incr(redis_key) remaining = max_requests - current_count - 1 return True, { "limit": max_requests, "remaining": remaining, "reset": self.redis.ttl(redis_key) } # Middleware from starlette.middleware.base import BaseHTTPMiddleware class RateLimitMiddleware(BaseHTTPMiddleware): """Rate Limit 미들웨어""" def __init__(self, app, rate_limiter: RateLimiter): super().__init__(app) self.limiter = rate_limiter async def dispatch(self, request: Request, call_next): # 클라이언트 식별 (IP 또는 사용자 ID) client_id = self._get_client_id(request) # 엔드포인트 타입 결정 endpoint_type = self._get_endpoint_type(request.url.path) # Rate limit 체크 allowed, info = await self.limiter.check_rate_limit( client_id, endpoint_type ) if not allowed: raise HTTPException( status_code=429, detail="Rate limit exceeded", headers={ "X-RateLimit-Limit": str(info["limit"]), "X-RateLimit-Remaining": str(info["remaining"]), "X-RateLimit-Reset": str(info["reset"]), "Retry-After": str(info["reset"]) } ) # 응답에 Rate Limit 헤더 추가 response = await call_next(request) response.headers["X-RateLimit-Limit"] = str(info["limit"]) response.headers["X-RateLimit-Remaining"] = str(info["remaining"]) response.headers["X-RateLimit-Reset"] = str(info["reset"]) return response def _get_client_id(self, request: Request) -> str: """클라이언트 ID 추출""" # 인증된 사용자는 user_id 사용 if hasattr(request.state, "user"): return f"user:{request.state.user.sub}" # 미인증은 IP 사용 forwarded = request.headers.get("X-Forwarded-For") if forwarded: return f"ip:{forwarded.split(',')[0].strip()}" return f"ip:{request.client.host}" def _get_endpoint_type(self, path: str) -> str: """엔드포인트 타입 결정""" if "/analysis" in path: return "analysis" if "/export" in path: return "export" if "/auth" in path or "/login" in path: return "auth" return "default" ``` ### 5.2 입력 검증 ```python # src/security/validation.py from pydantic import BaseModel, validator, Field import re class CreateTreeRequest(BaseModel): """트리 생성 요청""" name: str = Field(..., min_length=1, max_length=100) description: str = Field(None, max_length=500) @validator("name") def validate_name(cls, v): # XSS 방지 if re.search(r"[<>\"'&]", v): raise ValueError("Name contains invalid characters") # 공백만 있는 경우 방지 if not v.strip(): raise ValueError("Name cannot be empty or whitespace only") return v.strip() class KPIFormulaRequest(BaseModel): """KPI 수식 요청""" formula: str = Field(..., max_length=500) @validator("formula") def validate_formula(cls, v): # 허용된 문자만 확인 allowed_pattern = r"^[\{\}\w\s\+\-\*\/\(\)\.\,]+$" if not re.match(allowed_pattern, v): raise ValueError("Formula contains invalid characters") # 위험한 키워드 차단 dangerous_keywords = [ "__", "import", "exec", "eval", "open", "system", "subprocess", "os.", "sys." ] lower_v = v.lower() for keyword in dangerous_keywords: if keyword in lower_v: raise ValueError(f"Formula contains forbidden keyword: {keyword}") return v class SQLInjectionValidator: """SQL Injection 검증""" DANGEROUS_PATTERNS = [ r"(\s|^)(OR|AND)\s+\d+\s*=\s*\d+", # OR 1=1 r"--", # SQL 주석 r";.*DROP", # DROP 시도 r"UNION\s+SELECT", # UNION 공격 r"'.*OR.*'", # OR 공격 ] @classmethod def is_safe(cls, value: str) -> bool: """안전한 입력인지 확인""" for pattern in cls.DANGEROUS_PATTERNS: if re.search(pattern, value, re.IGNORECASE): return False return True ``` --- ## 6. 감사 로깅 ### 6.1 Audit Log 시스템 ```python # src/security/audit.py from datetime import datetime from enum import Enum from typing import Optional import json from sqlalchemy import Column, String, DateTime, Text from sqlalchemy.orm import Session class AuditAction(str, Enum): """감사 액션 유형""" # 인증 LOGIN = "auth.login" LOGOUT = "auth.logout" LOGIN_FAILED = "auth.login_failed" PASSWORD_CHANGED = "auth.password_changed" # KPI Tree TREE_CREATED = "tree.created" TREE_UPDATED = "tree.updated" TREE_DELETED = "tree.deleted" TREE_SHARED = "tree.shared" # KPI Data DATA_IMPORTED = "data.imported" DATA_EXPORTED = "data.exported" DATA_UPDATED = "data.updated" # Analysis ANALYSIS_RUN = "analysis.run" # Admin USER_INVITED = "admin.user_invited" USER_ROLE_CHANGED = "admin.user_role_changed" USER_REMOVED = "admin.user_removed" class AuditLog(Base): """감사 로그 모델""" __tablename__ = "audit_logs" id = Column(String, primary_key=True) timestamp = Column(DateTime, default=datetime.utcnow) action = Column(String(50), nullable=False, index=True) user_id = Column(String, nullable=True, index=True) organization_id = Column(String, nullable=True, index=True) resource_type = Column(String(50), nullable=True) resource_id = Column(String, nullable=True) ip_address = Column(String(45), nullable=True) user_agent = Column(String(500), nullable=True) details = Column(Text, nullable=True) # JSON success = Column(Boolean, default=True) class AuditLogger: """감사 로거""" def __init__(self, db: Session): self.db = db async def log( self, action: AuditAction, user_id: Optional[str] = None, organization_id: Optional[str] = None, resource_type: Optional[str] = None, resource_id: Optional[str] = None, ip_address: Optional[str] = None, user_agent: Optional[str] = None, details: Optional[dict] = None, success: bool = True ): """감사 로그 기록""" import uuid log_entry = AuditLog( id=str(uuid.uuid4()), action=action.value, user_id=user_id, organization_id=organization_id, resource_type=resource_type, resource_id=resource_id, ip_address=ip_address, user_agent=user_agent, details=json.dumps(details) if details else None, success=success ) self.db.add(log_entry) await self.db.commit() async def get_logs( self, organization_id: str, start_date: datetime, end_date: datetime, action: Optional[AuditAction] = None, user_id: Optional[str] = None, limit: int = 100 ) -> list[AuditLog]: """감사 로그 조회""" query = self.db.query(AuditLog).filter( AuditLog.organization_id == organization_id, AuditLog.timestamp >= start_date, AuditLog.timestamp <= end_date ) if action: query = query.filter(AuditLog.action == action.value) if user_id: query = query.filter(AuditLog.user_id == user_id) return query.order_by(AuditLog.timestamp.desc()).limit(limit).all() ``` ### 6.2 API 감사 미들웨어 ```python # src/middleware/audit.py from starlette.middleware.base import BaseHTTPMiddleware from fastapi import Request from src.security.audit import AuditLogger, AuditAction class AuditMiddleware(BaseHTTPMiddleware): """감사 미들웨어""" # 감사 대상 엔드포인트 매핑 AUDIT_MAPPING = { ("POST", "/api/v1/trees"): AuditAction.TREE_CREATED, ("PUT", "/api/v1/trees/"): AuditAction.TREE_UPDATED, ("DELETE", "/api/v1/trees/"): AuditAction.TREE_DELETED, ("POST", "/api/v1/data/import"): AuditAction.DATA_IMPORTED, ("POST", "/api/v1/data/export"): AuditAction.DATA_EXPORTED, ("POST", "/api/v1/analysis"): AuditAction.ANALYSIS_RUN, } async def dispatch(self, request: Request, call_next): response = await call_next(request) # 감사 로그 기록 await self._log_if_needed(request, response) return response async def _log_if_needed(self, request: Request, response): """필요시 감사 로그 기록""" method = request.method path = request.url.path for (m, p), action in self.AUDIT_MAPPING.items(): if method == m and path.startswith(p): user = getattr(request.state, "user", None) await self.audit_logger.log( action=action, user_id=user.sub if user else None, organization_id=user.org_id if user else None, resource_type=self._extract_resource_type(path), resource_id=self._extract_resource_id(path), ip_address=request.client.host, user_agent=request.headers.get("user-agent"), success=200 <= response.status_code < 300 ) break def _extract_resource_type(self, path: str) -> str: """리소스 타입 추출""" parts = path.split("/") if "trees" in parts: return "tree" if "nodes" in parts: return "node" if "data" in parts: return "data" return "unknown" def _extract_resource_id(self, path: str) -> Optional[str]: """리소스 ID 추출""" import re # UUID 패턴 찾기 match = re.search( r"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}", path ) return match.group(0) if match else None ``` --- ## 7. 보안 체크리스트 ### 7.1 배포 전 체크리스트 | 항목 | 상태 | 담당 | |------|------|------| | HTTPS 설정 | [ ] | DevOps | | 환경변수 보안 | [ ] | DevOps | | DB 접근 제한 | [ ] | DevOps | | Rate Limiting | [ ] | Backend | | 입력 검증 | [ ] | Backend | | SQL Injection 방지 | [ ] | Backend | | XSS 방지 | [ ] | Frontend | | CSRF 토큰 | [ ] | Backend | | 보안 헤더 설정 | [ ] | DevOps | | 로그 민감정보 마스킹 | [ ] | Backend | ### 7.2 보안 헤더 설정 ```python # src/middleware/security_headers.py from starlette.middleware.base import BaseHTTPMiddleware class SecurityHeadersMiddleware(BaseHTTPMiddleware): """보안 헤더 미들웨어""" async def dispatch(self, request, call_next): response = await call_next(request) # 보안 헤더 추가 response.headers["X-Content-Type-Options"] = "nosniff" response.headers["X-Frame-Options"] = "DENY" response.headers["X-XSS-Protection"] = "1; mode=block" response.headers["Strict-Transport-Security"] = "max-age=31536000; includeSubDomains" response.headers["Content-Security-Policy"] = ( "default-src 'self'; " "script-src 'self' 'unsafe-inline' 'unsafe-eval'; " "style-src 'self' 'unsafe-inline'; " "img-src 'self' data: https:; " "font-src 'self' data:; " "connect-src 'self' https://api.planitai.co.jp" ) response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin" response.headers["Permissions-Policy"] = ( "camera=(), microphone=(), geolocation=()" ) return response ``` --- ## 8. まとめ ### 보안 설계 요약 | 보안 영역 | 구현 기술 | |----------|----------| | 인증 | JWT + OAuth 2.0 + bcrypt | | 권한 | RBAC + 리소스 레벨 권한 | | 데이터 | AES-256 암호화 + 마스킹 | | API | Rate Limiting + 입력 검증 | | 감사 | Audit Log + 미들웨어 | | 네트워크 | HTTPS + 보안 헤더 | ### 次回予告 v12에서는 **배포 전략**을 다룹니다: - CI/CD 파이프라인 - 컨테이너화 (Docker) - 인프라 구성 (AWS/GCP) --- *PlanitAI KPI - AI가 당신의 KPI를 계획하고 분석합니다*