# SSL 증명서 자동화 프로젝트 (8/10): 승인 워크플로우와 거버넌스 ## 자동화 ≠ 무책임: 거버넌스의 필요성 완전 자동화는 편리하지만, 엔터프라이즈 환경에서는 **통제와 감사**가 필수다: 1. **누가** 이 도메인의 증명서를 요청할 권한이 있는가? 2. **왜** EV 증명서가 필요한가? (비용 $500 vs DV $50) 3. **언제** 갱신할 것인가? (자동 vs 수동 승인) 4. **어디에** 배포되는가? (프로덕션 vs 스테이징) 5. **비용**을 누구에게 청구할 것인가? ## 승인 워크플로우 설계 ### 시나리오별 승인 정책 ``` ┌─────────────────────┬──────────────┬───────────────────────────────┐ │ Certificate Type │ Action │ Approval Required │ ├─────────────────────┼──────────────┼───────────────────────────────┤ │ DV (자동갱신) │ Renewal │ ✗ (Pre-approved) │ │ DV (신규) │ Issuance │ ✓ (Team Lead) │ │ OV (신규) │ Issuance │ ✓✓ (Team Lead + Security) │ │ EV (신규) │ Issuance │ ✓✓✓ (TL + Security + Finance) │ │ 와일드카드 (신규) │ Issuance │ ✓✓ (Security + Architect) │ │ Production 배포 │ Deployment │ ✓ (Change Management) │ └─────────────────────┴──────────────┴───────────────────────────────┘ ``` ### 워크플로우 상태 머신 ``` ┌─────────────┐ │ DRAFT │ (초안) └──────┬──────┘ │ submit_for_approval() ▼ ┌─────────────┐ ┌───>│ PENDING │ (승인 대기) │ └──────┬──────┘ │ │ approve() / reject() │ ▼ reject │ ┌─────────────┐ │ │ APPROVED │ (승인됨) │ └──────┬──────┘ │ │ issue_certificate() │ ▼ │ ┌─────────────┐ └────│ ISSUED │ (발급됨) └──────┬──────┘ │ deploy() ▼ ┌─────────────┐ │ DEPLOYED │ (배포됨) └─────────────┘ ``` ## DB 스키마 확장: Approval 테이블 ```python # models/approval.py from sqlalchemy import Column, String, DateTime, Enum, ForeignKey, Text, JSON from sqlalchemy.orm import relationship import enum class ApprovalStatus(enum.Enum): PENDING = "pending" APPROVED = "approved" REJECTED = "rejected" CANCELLED = "cancelled" class ApprovalRequest(Base): """승인 요청""" __tablename__ = "approval_requests" id = Column(String(36), primary_key=True) # 연관 리소스 certificate_id = Column(String(36), ForeignKey("certificates.id"), nullable=False, index=True) # 요청 정보 request_type = Column(String(50), nullable=False) # "issuance", "renewal", "deployment", "revocation" requester_id = Column(String(255), nullable=False) # user_id 또는 email requester_justification = Column(Text) # 요청 사유 # 승인 정책 required_approvers = Column(JSON, nullable=False) # ["team_lead", "security_team", "finance"] approval_count_required = Column(Integer, default=1) # 최소 승인자 수 # 상태 status = Column(Enum(ApprovalStatus), default=ApprovalStatus.PENDING, index=True) # Timestamps created_at = Column(DateTime, default=datetime.utcnow) updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) expires_at = Column(DateTime) # 승인 만료 기한 (7일 후) # Relationships certificate = relationship("Certificate", back_populates="approval_requests") approvals = relationship("Approval", back_populates="request", cascade="all, delete-orphan") class Approval(Base): """개별 승인/거부""" __tablename__ = "approvals" id = Column(Integer, primary_key=True, autoincrement=True) approval_request_id = Column(String(36), ForeignKey("approval_requests.id"), nullable=False, index=True) # 승인자 정보 approver_id = Column(String(255), nullable=False) approver_role = Column(String(50), nullable=False) # "team_lead", "security_team" # 결정 decision = Column(Enum(ApprovalStatus), nullable=False) # APPROVED or REJECTED comments = Column(Text) # Audit decided_at = Column(DateTime, default=datetime.utcnow) client_ip = Column(String(45)) # Relationships request = relationship("ApprovalRequest", back_populates="approvals") ``` ## RBAC (Role-Based Access Control) ```python # auth/rbac.py from enum import Enum from typing import List, Set class Role(Enum): VIEWER = "viewer" # 조회만 가능 REQUESTER = "requester" # 증명서 요청 가능 TEAM_LEAD = "team_lead" # 팀 내 승인 가능 SECURITY_ADMIN = "security_admin" # 보안 정책 승인 FINANCE = "finance" # 비용 승인 SUPER_ADMIN = "super_admin" # 모든 권한 class Permission(Enum): # Certificate CERT_VIEW = "cert:view" CERT_CREATE = "cert:create" CERT_APPROVE = "cert:approve" CERT_DEPLOY = "cert:deploy" CERT_REVOKE = "cert:revoke" # Deployment Target TARGET_VIEW = "target:view" TARGET_CREATE = "target:create" TARGET_DELETE = "target:delete" # Approval APPROVAL_VIEW = "approval:view" APPROVAL_DECIDE = "approval:decide" # 역할별 권한 매핑 ROLE_PERMISSIONS: dict[Role, Set[Permission]] = { Role.VIEWER: { Permission.CERT_VIEW, Permission.TARGET_VIEW, Permission.APPROVAL_VIEW }, Role.REQUESTER: { Permission.CERT_VIEW, Permission.CERT_CREATE, Permission.TARGET_VIEW, Permission.APPROVAL_VIEW }, Role.TEAM_LEAD: { Permission.CERT_VIEW, Permission.CERT_CREATE, Permission.CERT_APPROVE, Permission.APPROVAL_VIEW, Permission.APPROVAL_DECIDE }, Role.SECURITY_ADMIN: { Permission.CERT_VIEW, Permission.CERT_APPROVE, Permission.CERT_REVOKE, Permission.TARGET_VIEW, Permission.TARGET_CREATE, Permission.APPROVAL_DECIDE }, Role.SUPER_ADMIN: set(Permission) # 모든 권한 } class RBACService: def __init__(self, user_roles: List[Role]): self.user_roles = user_roles self.permissions = self._compute_permissions() def _compute_permissions(self) -> Set[Permission]: """사용자의 모든 역할에서 권한 합집합 계산""" perms = set() for role in self.user_roles: perms.update(ROLE_PERMISSIONS.get(role, set())) return perms def has_permission(self, permission: Permission) -> bool: """권한 체크""" return permission in self.permissions def require_permission(self, permission: Permission): """권한 없으면 예외 발생""" if not self.has_permission(permission): raise PermissionError(f"Permission denied: {permission.value}") # FastAPI Dependency from fastapi import Depends, HTTPException, Header def get_current_user_roles(authorization: str = Header(...)) -> List[Role]: """JWT 토큰에서 역할 추출""" import jwt try: token = authorization.split(" ")[1] # "Bearer " payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"]) roles_str = payload.get("roles", []) return [Role(r) for r in roles_str] except Exception as e: raise HTTPException(status_code=401, detail="Invalid authentication") def require_permission(permission: Permission): """FastAPI dependency: 권한 체크 데코레이터""" def dependency(roles: List[Role] = Depends(get_current_user_roles)): rbac = RBACService(roles) rbac.require_permission(permission) return rbac return dependency ``` ## 승인 워크플로우 구현 ```python # services/approval_service.py from datetime import datetime, timedelta from typing import List class ApprovalService: def __init__(self, db: Session): self.db = db def create_approval_request( self, certificate_id: str, request_type: str, requester_id: str, justification: str ) -> ApprovalRequest: """승인 요청 생성""" import uuid cert = self.db.query(Certificate).get(certificate_id) if not cert: raise ValueError("Certificate not found") # 승인 정책 결정 required_approvers = self._determine_required_approvers(cert, request_type) # 승인 요청 생성 request = ApprovalRequest( id=str(uuid.uuid4()), certificate_id=certificate_id, request_type=request_type, requester_id=requester_id, requester_justification=justification, required_approvers=required_approvers, approval_count_required=len(required_approvers), expires_at=datetime.utcnow() + timedelta(days=7) # 7일 후 만료 ) self.db.add(request) self.db.commit() # 승인자들에게 알림 self._notify_approvers(request, required_approvers) return request def _determine_required_approvers(self, cert: Certificate, request_type: str) -> List[str]: """증명서 타입과 요청 타입에 따른 승인자 결정""" approvers = [] # 기본: 팀 리드 승인 approvers.append("team_lead") # OV/EV는 보안팀 승인 필요 if cert.cert_type in [CertificateType.OV, CertificateType.EV]: approvers.append("security_team") # EV는 재무팀 승인 필요 (비용 $500) if cert.cert_type == CertificateType.EV: approvers.append("finance") # 와일드카드는 아키텍트 승인 필요 (보안 위험) if cert.is_wildcard: approvers.append("architect") # Production 배포는 변경 관리팀 if request_type == "deployment" and cert.tags.get("environment") == "production": approvers.append("change_management") return approvers def approve( self, request_id: str, approver_id: str, approver_role: str, comments: str = "" ): """승인 처리""" request = self.db.query(ApprovalRequest).get(request_id) if not request: raise ValueError("Approval request not found") if request.status != ApprovalStatus.PENDING: raise ValueError(f"Request already {request.status.value}") # 승인자 역할 검증 if approver_role not in request.required_approvers: raise PermissionError(f"Role {approver_role} not authorized to approve") # 이미 이 역할로 승인했는지 체크 existing = self.db.query(Approval).filter( Approval.approval_request_id == request_id, Approval.approver_role == approver_role ).first() if existing: raise ValueError(f"Already approved by {approver_role}") # 승인 기록 approval = Approval( approval_request_id=request_id, approver_id=approver_id, approver_role=approver_role, decision=ApprovalStatus.APPROVED, comments=comments ) self.db.add(approval) # 모든 필수 승인자가 승인했는지 체크 approved_roles = {a.approver_role for a in request.approvals if a.decision == ApprovalStatus.APPROVED} approved_roles.add(approver_role) # 현재 승인 포함 if set(request.required_approvers).issubset(approved_roles): # 모든 승인 완료 → 자동 발급 request.status = ApprovalStatus.APPROVED self._trigger_certificate_issuance(request.certificate_id) self.db.commit() def reject( self, request_id: str, approver_id: str, approver_role: str, reason: str ): """거부 처리""" request = self.db.query(ApprovalRequest).get(request_id) if request.status != ApprovalStatus.PENDING: raise ValueError(f"Request already {request.status.value}") # 거부 기록 approval = Approval( approval_request_id=request_id, approver_id=approver_id, approver_role=approver_role, decision=ApprovalStatus.REJECTED, comments=reason ) self.db.add(approval) # 1명이라도 거부하면 전체 거부 request.status = ApprovalStatus.REJECTED # 요청자에게 알림 self._notify_requester_rejection(request, reason) self.db.commit() def _trigger_certificate_issuance(self, certificate_id: str): """승인 완료 후 자동 발급""" from tasks.issuance import issue_certificate_async issue_certificate_async.delay(certificate_id) def _notify_approvers(self, request: ApprovalRequest, approvers: List[str]): """승인자들에게 알림 전송""" # 이메일, Slack 등 pass ``` ## API 엔드포인트: 승인 워크플로우 ```python # api/approval.py from fastapi import APIRouter, Depends, HTTPException router = APIRouter(prefix="/api/v1/approvals", tags=["approvals"]) @router.post("", status_code=201) async def create_approval_request( request: ApprovalRequestCreate, db: Session = Depends(get_db), rbac: RBACService = Depends(require_permission(Permission.CERT_CREATE)) ): """승인 요청 생성""" service = ApprovalService(db) approval_request = service.create_approval_request( certificate_id=request.certificate_id, request_type=request.request_type, requester_id=request.requester_id, justification=request.justification ) return approval_request @router.post("/{request_id}/approve") async def approve_request( request_id: str, approval: ApprovalDecision, db: Session = Depends(get_db), rbac: RBACService = Depends(require_permission(Permission.APPROVAL_DECIDE)) ): """승인""" service = ApprovalService(db) service.approve( request_id=request_id, approver_id=approval.approver_id, approver_role=approval.approver_role, comments=approval.comments ) return {"message": "Approved successfully"} @router.post("/{request_id}/reject") async def reject_request( request_id: str, rejection: ApprovalDecision, db: Session = Depends(get_db), rbac: RBACService = Depends(require_permission(Permission.APPROVAL_DECIDE)) ): """거부""" service = ApprovalService(db) service.reject( request_id=request_id, approver_id=rejection.approver_id, approver_role=rejection.approver_role, reason=rejection.comments ) return {"message": "Rejected"} @router.get("/my-pending") async def get_my_pending_approvals( db: Session = Depends(get_db), roles: List[Role] = Depends(get_current_user_roles) ): """내가 승인해야 할 요청 목록""" role_names = [r.value for r in roles] # required_approvers에 내 역할이 포함된 PENDING 요청 requests = db.query(ApprovalRequest).filter( ApprovalRequest.status == ApprovalStatus.PENDING, ApprovalRequest.required_approvers.contains(role_names) # PostgreSQL JSONB ).all() return requests ``` ## 비용 할당 (Chargeback) ```python # services/chargeback.py class ChargebackService: """증명서 비용을 cost center에 할당""" PRICING = { CertificateType.DV: 50, CertificateType.OV: 200, CertificateType.EV: 500 } def calculate_monthly_cost(self, cost_center: str, month: str) -> dict: """월별 비용 계산""" # "2025-12" → 해당 월에 발급된 증명서 start = datetime.strptime(month, "%Y-%m") end = start + timedelta(days=32) end = end.replace(day=1) # 다음 달 1일 certs = db.query(Certificate).filter( Certificate.cost_center == cost_center, Certificate.issued_at >= start, Certificate.issued_at < end ).all() total_cost = sum(self.PRICING[cert.cert_type] for cert in certs) return { "cost_center": cost_center, "month": month, "certificate_count": len(certs), "total_cost_usd": total_cost, "breakdown": { cert_type.name: { "count": sum(1 for c in certs if c.cert_type == cert_type), "cost": self.PRICING[cert_type] } for cert_type in CertificateType } } ``` ## 감사 로그 ```python # middleware/audit_log.py from fastapi import Request import logging audit_logger = logging.getLogger("audit") @app.middleware("http") async def audit_log_middleware(request: Request, call_next): """모든 API 요청 감사 로그""" import time start_time = time.time() # 요청 처리 response = await call_next(request) # 로그 기록 audit_logger.info({ "timestamp": datetime.utcnow().isoformat(), "method": request.method, "path": request.url.path, "user_id": request.state.user_id if hasattr(request.state, "user_id") else None, "client_ip": request.client.host, "status_code": response.status_code, "duration_ms": (time.time() - start_time) * 1000 }) return response ``` ## 다음 단계 v9에서는 PQC(Post-Quantum Cryptography) 대응을 설계한다: - PQC 알고리즘 개요 (MLKEM, MLDSA) - 하이브리드 증명서 전략 - 마이그레이션 로드맵 - 레거시 클라이언트 호환성 --- **시리즈 목차** 1. SSL 증명서 자동화 필요성과 프로젝트 개요 2. ACME 프로토콜 이해와 동작 원리 3. GlobalSign Atlas ACME 연동 실습 4. 도메인 검증 방식 구현 (HTTP-01, DNS-01) 5. ACME 클라이언트 선정과 커스텀 구현 6. 증명서 관리 DB 설계와 API 구현 7. 증명서 배포 자동화 파이프라인 구축 8. **[현재글] 승인 워크플로우와 거버넌스 구현** 9. PQC 대응 설계와 미래 확장성 10. 전체 시스템 통합과 프로덕션 배포 가이드