# SSL 증명서 자동화 프로젝트 (6/10): 증명서 관리 DB와 RESTful API ## 데이터 모델 설계 철학 증명서 관리 시스템은 단순한 CRUD가 아니라 **라이프사이클 관리**가 핵심이다: 1. **발급 요청** → 승인 → 검증 → 발급 → 배포 → 갱신 → 폐기 2. **감사 추적**: 모든 변경사항과 의사결정 기록 3. **관계 추적**: 1개 증명서 → N개 배포 대상 ## PostgreSQL 스키마 설계 ### ERD 개요 ``` ┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐ │ certificates │──1:N──│ certificate_ │──N:1──│ deployment_ │ │ │ │ deployments │ │ targets │ │ - id (PK) │ │ │ │ │ │ - common_name │ │ - certificate_id │ │ - id (PK) │ │ - sans[] │ │ - target_id │ │ - type │ │ - status │ │ - deployed_at │ │ - hostname │ │ - not_after │ │ - status │ │ - credentials │ └─────────────────┘ └──────────────────┘ └─────────────────┘ │ │ 1:N ▼ ┌─────────────────┐ ┌──────────────────┐ │ certificate_ │ │ acme_accounts │ │ history │ │ │ │ │ │ - id (PK) │ │ - certificate_id│ │ - ca_provider │ │ - event_type │ │ - account_url │ │ - created_at │ │ - eab_kid │ │ - metadata │ │ - public_key_jwk │ └─────────────────┘ └──────────────────┘ ``` ### SQLAlchemy Models ```python # models/certificate.py from sqlalchemy import Column, String, DateTime, JSON, Enum, Integer, Boolean, ARRAY, ForeignKey, Text from sqlalchemy.orm import relationship from sqlalchemy.ext.declarative import declarative_base from datetime import datetime import enum Base = declarative_base() class CertificateStatus(enum.Enum): PENDING = "pending" # 발급 대기 VALIDATING = "validating" # 도메인 검증 중 ISSUING = "issuing" # 발급 중 ACTIVE = "active" # 활성 (배포됨) EXPIRING_SOON = "expiring_soon" # 만료 임박 EXPIRED = "expired" # 만료됨 REVOKED = "revoked" # 폐기됨 FAILED = "failed" # 발급 실패 class CertificateType(enum.Enum): DV = "dv" # Domain Validation OV = "ov" # Organization Validation EV = "ev" # Extended Validation class Certificate(Base): __tablename__ = "certificates" id = Column(String(36), primary_key=True) # UUID # 도메인 정보 common_name = Column(String(255), nullable=False, index=True) sans = Column(ARRAY(String(255)), nullable=False, default=[]) # Subject Alternative Names is_wildcard = Column(Boolean, default=False) # 증명서 타입 cert_type = Column(Enum(CertificateType), nullable=False, default=CertificateType.DV) # CA 정보 ca_provider = Column(String(50), nullable=False, default="globalsign") acme_account_id = Column(String(36), ForeignKey("acme_accounts.id")) # 증명서 내용 (암호화 저장 권장) certificate_pem = Column(Text) certificate_chain_pem = Column(Text) private_key_encrypted = Column(Text) # Vault에 저장하고 여기는 참조만 저장 권장 # 유효기간 not_before = Column(DateTime) not_after = Column(DateTime, index=True) # 상태 status = Column(Enum(CertificateStatus), nullable=False, default=CertificateStatus.PENDING, index=True) # 갱신 설정 auto_renew = Column(Boolean, default=True) renew_before_days = Column(Integer, default=30) # Challenge 정보 challenge_type = Column(String(20)) # "http-01", "dns-01" # ACME Order 정보 acme_order_url = Column(String(512)) acme_order_status = Column(String(50)) # 메타데이터 requested_by = Column(String(255)) # 요청자 (이메일 또는 user_id) approved_by = Column(String(255)) # 승인자 organization = Column(String(255)) # 조직/부서 cost_center = Column(String(100)) # 비용 센터 (chargeback용) tags = Column(JSON, default={}) # 자유 태그 {"environment": "production", "project": "api-gateway"} # Timestamp created_at = Column(DateTime, default=datetime.utcnow, nullable=False) updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) issued_at = Column(DateTime) renewed_at = Column(DateTime) revoked_at = Column(DateTime) # Relationships history = relationship("CertificateHistory", back_populates="certificate", cascade="all, delete-orphan") deployments = relationship("CertificateDeployment", back_populates="certificate", cascade="all, delete-orphan") acme_account = relationship("ACMEAccount", back_populates="certificates") def __repr__(self): return f"" class CertificateHistory(Base): """증명서 변경 이력 (불변 로그)""" __tablename__ = "certificate_history" id = Column(Integer, primary_key=True, autoincrement=True) certificate_id = Column(String(36), ForeignKey("certificates.id"), nullable=False, index=True) event_type = Column(String(50), nullable=False) # "created", "issued", "deployed", "renewed", "revoked" event_status = Column(String(20)) # "success", "failed" # 변경 내용 old_status = Column(String(50)) new_status = Column(String(50)) # 상세 정보 details = Column(JSON) # 자유 형식 메타데이터 error_message = Column(Text) # 감사 performed_by = Column(String(255)) # 작업자 client_ip = Column(String(45)) # IPv6 지원 created_at = Column(DateTime, default=datetime.utcnow, nullable=False, index=True) certificate = relationship("Certificate", back_populates="history") class DeploymentTargetType(enum.Enum): NGINX = "nginx" APACHE = "apache" HAPROXY = "haproxy" F5_BIG_IP = "f5_big_ip" AWS_ALB = "aws_alb" AWS_CLOUDFRONT = "aws_cloudfront" KUBERNETES = "kubernetes" class DeploymentTarget(Base): """배포 대상 서버/장비""" __tablename__ = "deployment_targets" id = Column(String(36), primary_key=True) name = Column(String(255), nullable=False, unique=True) type = Column(Enum(DeploymentTargetType), nullable=False) # 접속 정보 hostname = Column(String(255), nullable=False) port = Column(Integer, default=22) # SSH port # 인증 정보 (Vault 참조) credentials_vault_path = Column(String(512)) # "secret/data/ssh/webserver-01" # 배포 설정 cert_path = Column(String(512)) # "/etc/nginx/ssl/example.com.crt" key_path = Column(String(512)) # "/etc/nginx/ssl/example.com.key" chain_path = Column(String(512)) # "/etc/nginx/ssl/example.com-chain.crt" reload_command = Column(String(512)) # "systemctl reload nginx" # 상태 enabled = Column(Boolean, default=True) last_deployment_at = Column(DateTime) last_deployment_status = Column(String(20)) # "success", "failed" # 메타데이터 tags = Column(JSON, default={}) created_at = Column(DateTime, default=datetime.utcnow) updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) # Relationships deployments = relationship("CertificateDeployment", back_populates="target") class CertificateDeployment(Base): """증명서 배포 이력 (N:N 중간 테이블)""" __tablename__ = "certificate_deployments" id = Column(Integer, primary_key=True, autoincrement=True) certificate_id = Column(String(36), ForeignKey("certificates.id"), nullable=False, index=True) target_id = Column(String(36), ForeignKey("deployment_targets.id"), nullable=False, index=True) status = Column(String(20), nullable=False) # "pending", "deploying", "success", "failed" deployed_at = Column(DateTime) deployment_duration_seconds = Column(Integer) error_message = Column(Text) rollback_performed = Column(Boolean, default=False) created_at = Column(DateTime, default=datetime.utcnow, index=True) # Relationships certificate = relationship("Certificate", back_populates="deployments") target = relationship("DeploymentTarget", back_populates="deployments") class ACMEAccount(Base): """ACME 계정 정보""" __tablename__ = "acme_accounts" id = Column(String(36), primary_key=True) ca_provider = Column(String(50), nullable=False, index=True) # "globalsign", "digicert", "letsencrypt" environment = Column(String(20), nullable=False) # "sandbox", "production" # ACME 정보 account_url = Column(String(512), unique=True) # "https://acme.globalsign.com/acme/account/abc123" eab_kid = Column(String(255)) eab_hmac_key_encrypted = Column(Text) # 암호화 저장 # 계정 키 (Vault 참조) private_key_vault_path = Column(String(512)) public_key_jwk = Column(JSON) # 상태 status = Column(String(20), default="active") # "active", "suspended", "deactivated" # 통계 total_orders = Column(Integer, default=0) failed_orders = Column(Integer, default=0) created_at = Column(DateTime, default=datetime.utcnow) updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) # Relationships certificates = relationship("Certificate", back_populates="acme_account") ``` ### 마이그레이션 (Alembic) ```python # alembic/versions/001_initial_schema.py from alembic import op import sqlalchemy as sa def upgrade(): # certificates 테이블 op.create_table( 'certificates', sa.Column('id', sa.String(36), primary_key=True), sa.Column('common_name', sa.String(255), nullable=False), sa.Column('sans', sa.ARRAY(sa.String(255)), nullable=False), # ... (위 모델과 동일) ) # 인덱스 op.create_index('idx_cert_cn', 'certificates', ['common_name']) op.create_index('idx_cert_not_after', 'certificates', ['not_after']) op.create_index('idx_cert_status', 'certificates', ['status']) # Full-text search for domains op.execute(""" CREATE INDEX idx_cert_domains_fts ON certificates USING GIN (to_tsvector('english', common_name || ' ' || array_to_string(sans, ' '))) """) def downgrade(): op.drop_table('certificates') ``` ## RESTful API 구현 (FastAPI) ### API 구조 ``` GET /api/v1/certificates # 목록 조회 (필터/페이징) POST /api/v1/certificates # 신규 발급 요청 GET /api/v1/certificates/{id} # 상세 조회 PATCH /api/v1/certificates/{id} # 부분 업데이트 DELETE /api/v1/certificates/{id} # 폐기 POST /api/v1/certificates/{id}/renew # 수동 갱신 POST /api/v1/certificates/{id}/revoke # 폐기 POST /api/v1/certificates/{id}/deploy # 수동 배포 GET /api/v1/certificates/{id}/history # 이력 조회 GET /api/v1/certificates/{id}/deployments # 배포 현황 GET /api/v1/deployment-targets # 배포 대상 목록 POST /api/v1/deployment-targets # 배포 대상 등록 ``` ### FastAPI 구현 ```python # api/main.py from fastapi import FastAPI, Depends, HTTPException, Query from sqlalchemy.orm import Session from typing import List, Optional from datetime import datetime, timedelta app = FastAPI(title="Certificate Management API", version="1.0.0") # Dependency def get_db(): db = SessionLocal() try: yield db finally: db.close() # Pydantic Schemas from pydantic import BaseModel, Field, validator class CertificateCreateRequest(BaseModel): common_name: str = Field(..., example="www.example.com") sans: List[str] = Field(default=[], example=["api.example.com", "admin.example.com"]) cert_type: str = Field("dv", regex="^(dv|ov|ev)$") challenge_type: str = Field("dns-01", regex="^(http-01|dns-01)$") auto_renew: bool = True renew_before_days: int = Field(30, ge=7, le=90) requested_by: str = Field(..., example="admin@example.com") organization: str = Field("", example="Engineering") tags: dict = Field(default={}) @validator('common_name') def validate_common_name(cls, v): if not v or len(v) > 253: raise ValueError("Invalid common name") return v.lower() @validator('sans') def validate_sans(cls, v): return [domain.lower() for domain in v] class CertificateResponse(BaseModel): id: str common_name: str sans: List[str] status: str not_before: Optional[datetime] not_after: Optional[datetime] days_until_expiry: Optional[int] auto_renew: bool created_at: datetime issued_at: Optional[datetime] class Config: orm_mode = True @staticmethod def from_orm(cert: Certificate): """ORM 객체를 Response로 변환""" days_until_expiry = None if cert.not_after: days_until_expiry = (cert.not_after - datetime.utcnow()).days return CertificateResponse( id=cert.id, common_name=cert.common_name, sans=cert.sans, status=cert.status.value, not_before=cert.not_before, not_after=cert.not_after, days_until_expiry=days_until_expiry, auto_renew=cert.auto_renew, created_at=cert.created_at, issued_at=cert.issued_at ) # API Endpoints @app.post("/api/v1/certificates", response_model=CertificateResponse, status_code=201) async def create_certificate( request: CertificateCreateRequest, db: Session = Depends(get_db) ): """신규 증명서 발급 요청""" import uuid from tasks.issuance import issue_certificate_async # 중복 체크 existing = db.query(Certificate).filter( Certificate.common_name == request.common_name, Certificate.status.in_([CertificateStatus.PENDING, CertificateStatus.ACTIVE]) ).first() if existing: raise HTTPException(status_code=409, detail="Certificate already exists") # Certificate 생성 cert = Certificate( id=str(uuid.uuid4()), common_name=request.common_name, sans=request.sans, is_wildcard=request.common_name.startswith("*."), cert_type=CertificateType[request.cert_type.upper()], challenge_type=request.challenge_type, auto_renew=request.auto_renew, renew_before_days=request.renew_before_days, requested_by=request.requested_by, organization=request.organization, tags=request.tags, status=CertificateStatus.PENDING ) db.add(cert) # 이력 기록 history = CertificateHistory( certificate_id=cert.id, event_type="created", event_status="success", new_status=CertificateStatus.PENDING.value, performed_by=request.requested_by ) db.add(history) db.commit() db.refresh(cert) # 비동기 발급 작업 시작 (Celery) issue_certificate_async.delay(cert.id) return CertificateResponse.from_orm(cert) @app.get("/api/v1/certificates", response_model=List[CertificateResponse]) async def list_certificates( status: Optional[str] = Query(None), expiring_in_days: Optional[int] = Query(None, ge=0, le=365), search: Optional[str] = Query(None), skip: int = Query(0, ge=0), limit: int = Query(100, ge=1, le=1000), db: Session = Depends(get_db) ): """증명서 목록 조회 (필터링/페이징)""" query = db.query(Certificate) # 필터: 상태 if status: query = query.filter(Certificate.status == CertificateStatus[status.upper()]) # 필터: 만료 임박 if expiring_in_days is not None: threshold = datetime.utcnow() + timedelta(days=expiring_in_days) query = query.filter( Certificate.not_after <= threshold, Certificate.status == CertificateStatus.ACTIVE ) # 필터: 도메인 검색 (full-text search) if search: query = query.filter( sa.or_( Certificate.common_name.ilike(f"%{search}%"), Certificate.sans.any(search) # PostgreSQL array contains ) ) # 정렬 query = query.order_by(Certificate.created_at.desc()) # 페이징 certificates = query.offset(skip).limit(limit).all() return [CertificateResponse.from_orm(cert) for cert in certificates] @app.get("/api/v1/certificates/{cert_id}", response_model=CertificateResponse) async def get_certificate(cert_id: str, db: Session = Depends(get_db)): """증명서 상세 조회""" cert = db.query(Certificate).filter(Certificate.id == cert_id).first() if not cert: raise HTTPException(status_code=404, detail="Certificate not found") return CertificateResponse.from_orm(cert) @app.post("/api/v1/certificates/{cert_id}/renew", status_code=202) async def renew_certificate(cert_id: str, db: Session = Depends(get_db)): """수동 갱신 트리거""" from tasks.renewal import renew_certificate_async cert = db.query(Certificate).filter(Certificate.id == cert_id).first() if not cert: raise HTTPException(status_code=404, detail="Certificate not found") if cert.status != CertificateStatus.ACTIVE: raise HTTPException(status_code=400, detail="Only active certificates can be renewed") # 갱신 작업 큐에 추가 task = renew_certificate_async.delay(cert_id) return {"message": "Renewal job queued", "task_id": task.id} @app.get("/api/v1/certificates/{cert_id}/history") async def get_certificate_history(cert_id: str, db: Session = Depends(get_db)): """증명서 변경 이력""" history = db.query(CertificateHistory).filter( CertificateHistory.certificate_id == cert_id ).order_by(CertificateHistory.created_at.desc()).all() return history ``` ### 통합 테스트 ```python # tests/test_api.py from fastapi.testclient import TestClient from api.main import app client = TestClient(app) def test_create_certificate(): response = client.post("/api/v1/certificates", json={ "common_name": "test.example.com", "sans": ["www.test.example.com"], "cert_type": "dv", "challenge_type": "dns-01", "requested_by": "test@example.com" }) assert response.status_code == 201 data = response.json() assert data["common_name"] == "test.example.com" assert data["status"] == "pending" def test_list_certificates_expiring_soon(): response = client.get("/api/v1/certificates?expiring_in_days=30") assert response.status_code == 200 data = response.json() assert isinstance(data, list) ``` ## 다음 단계 v7에서는 증명서 배포 자동화 파이프라인을 구축한다: - Ansible 플레이북 통합 - SSH 키 관리 (Vault) - 블루-그린 배포 전략 - 롤백 메커니즘 --- **시리즈 목차** 1. SSL 증명서 자동화 필요성과 프로젝트 개요 2. ACME 프로토콜 이해와 동작 원리 3. GlobalSign Atlas ACME 연동 실습 4. 도메인 검증 방식 구현 (HTTP-01, DNS-01) 5. ACME 클라이언트 선정과 커스텀 구현 6. **[현재글] 증명서 관리 DB 설계와 API 구현** 7. 증명서 배포 자동화 파이프라인 구축 8. 승인 워크플로우와 거버넌스 구현 9. PQC 대응 설계와 미래 확장성 10. 전체 시스템 통합과 프로덕션 배포 가이드