# SSL 증명서 자동화 프로젝트 (5/10): 프로덕션급 ACME 클라이언트 구현 ## ACME 클라이언트 선택: Build vs Buy ### 옵션 1: Certbot (EFF) **장점**: - 가장 성숙한 구현 (2016년부터) - 플러그인 생태계 (nginx, apache, dns-*) - 커뮤니티 지원 활발 **단점**: - Python 2/3 호환성 레거시 코드 - 엔터프라이즈 기능 부족 (멀티 테넌시, 워크플로우 통합) - Certbot hooks가 쉘 스크립트로 제한됨 ```bash # Certbot 사용 예시 certbot certonly \ --dns-route53 \ --server https://acme.globalsign.com/v2/directory \ --eab-kid $EAB_KID \ --eab-hmac-key $EAB_HMAC \ -d "*.example.com" \ --non-interactive ``` ### 옵션 2: acme.sh (Shell Script) **장점**: - 경량 (단일 쉘 스크립트) - DNS provider 80+ 지원 - cron 통합 간편 **단점**: - 쉘 스크립트 한계 (복잡한 로직 구현 어려움) - 에러 핸들링 부족 - 엔터프라이즈 감사 로그 없음 ### 옵션 3: 커스텀 구현 (우리의 선택) **이유**: 1. **기존 시스템 통합**: 증명서 관리 DB, 워크플로우 엔진과 긴밀한 연동 2. **커스텀 비즈니스 로직**: OV 증명서 조직 검증 대기, 승인 프로세스 3. **멀티 CA 지원**: GlobalSign 외에 DigiCert, Sectigo도 준비 4. **완전한 제어**: 로깅, 메트릭, 알림을 우리 표준에 맞춤 ## 프로덕션급 ACME 클라이언트 아키텍처 ``` ┌─────────────────────────────────────────────────────────────┐ │ ACME Client Service │ ├─────────────────────────────────────────────────────────────┤ │ ┌────────────────┐ ┌────────────────┐ ┌──────────────┐ │ │ │ Order Manager │ │ Challenge │ │ Certificate │ │ │ │ │ │ Orchestrator │ │ Lifecycle │ │ │ │ - Create order │ │ - HTTP-01 │ │ - Renew │ │ │ │ - Poll status │ │ - DNS-01 │ │ - Revoke │ │ │ └────────────────┘ └────────────────┘ └──────────────┘ │ ├─────────────────────────────────────────────────────────────┤ │ Core ACME Protocol │ │ ┌────────────┐ ┌──────────────┐ ┌──────────────────┐ │ │ │ JWS Signer │ │ Nonce Manager│ │ Account Manager │ │ │ └────────────┘ └──────────────┘ └──────────────────┘ │ ├─────────────────────────────────────────────────────────────┤ │ Storage & Integration │ │ ┌──────────┐ ┌────────────┐ ┌──────────┐ ┌──────────┐ │ │ │ Postgres │ │ Redis │ │ S3 │ │ Vault │ │ │ │ (metadata)│ │ (job queue)│ │ (certs) │ │ (keys) │ │ │ └──────────┘ └────────────┘ └──────────┘ └──────────┘ │ └─────────────────────────────────────────────────────────────┘ ``` ## 핵심 컴포넌트 구현 ### 1. 에러 핸들링과 재시도 로직 ```python # core/retry.py import time import functools from typing import Type, Tuple import logging logger = logging.getLogger(__name__) class ACMERetryableError(Exception): """재시도 가능한 ACME 에러""" pass class ACMERateLimitError(ACMERetryableError): """Rate limit 초과""" def __init__(self, retry_after=None): self.retry_after = retry_after super().__init__(f"Rate limited, retry after {retry_after}s") def exponential_backoff_retry( max_attempts=5, initial_delay=1, max_delay=60, exponential_base=2, jitter=True, retryable_exceptions: Tuple[Type[Exception], ...] = (ACMERetryableError,) ): """지수 백오프 재시도 데코레이터""" def decorator(func): @functools.wraps(func) def wrapper(*args, **kwargs): attempt = 0 delay = initial_delay while attempt < max_attempts: try: return func(*args, **kwargs) except retryable_exceptions as e: attempt += 1 if attempt >= max_attempts: logger.error(f"Max retry attempts ({max_attempts}) reached") raise # Rate limit의 경우 Retry-After 헤더 우선 사용 if isinstance(e, ACMERateLimitError) and e.retry_after: sleep_time = e.retry_after else: # Exponential backoff sleep_time = min(delay * (exponential_base ** (attempt - 1)), max_delay) # Jitter 추가 (thundering herd 방지) if jitter: import random sleep_time = sleep_time * (0.5 + random.random()) logger.warning( f"Attempt {attempt}/{max_attempts} failed: {e}. " f"Retrying in {sleep_time:.2f}s..." ) time.sleep(sleep_time) raise Exception("Unreachable code") return wrapper return decorator # 사용 예시 class ACMEClient: @exponential_backoff_retry( max_attempts=5, initial_delay=2, max_delay=120, retryable_exceptions=(ACMERetryableError, requests.exceptions.Timeout) ) def _make_signed_request(self, url, payload): """JWS 서명된 요청 (자동 재시도)""" try: response = requests.post(url, json=payload, timeout=30) # Rate limit 체크 if response.status_code == 429: retry_after = int(response.headers.get('Retry-After', 60)) raise ACMERateLimitError(retry_after) # 서버 에러는 재시도 if 500 <= response.status_code < 600: raise ACMERetryableError(f"Server error: {response.status_code}") response.raise_for_status() return response except requests.exceptions.ConnectionError as e: raise ACMERetryableError(f"Connection failed: {e}") ``` ### 2. Rate Limit 대응 전략 GlobalSign Atlas의 실제 rate limits (추정치, 실제는 문서 확인 필요): ```python # core/rate_limiter.py import redis import time from datetime import datetime class RateLimiter: """Token bucket 알고리즘 기반 rate limiter""" def __init__(self, redis_client, limit_config): self.redis = redis_client self.config = limit_config def check_and_consume(self, resource_type, identifier): """ Rate limit 체크 및 토큰 소비 Args: resource_type: "new_account", "new_order", "challenge_validation" identifier: account_id 또는 domain """ limit = self.config.get(resource_type, {}) max_tokens = limit.get("max_requests", 100) refill_rate = limit.get("per_seconds", 3600) # 초당 토큰 추가율 key = f"rate_limit:{resource_type}:{identifier}" # Lua script로 원자적 연산 lua_script = """ local key = KEYS[1] local max_tokens = tonumber(ARGV[1]) local refill_rate = tonumber(ARGV[2]) local now = tonumber(ARGV[3]) local bucket = redis.call('HMGET', key, 'tokens', 'last_refill') local tokens = tonumber(bucket[1]) or max_tokens local last_refill = tonumber(bucket[2]) or now -- 토큰 리필 local elapsed = now - last_refill local refill = (elapsed / refill_rate) * max_tokens tokens = math.min(max_tokens, tokens + refill) if tokens >= 1 then tokens = tokens - 1 redis.call('HMSET', key, 'tokens', tokens, 'last_refill', now) redis.call('EXPIRE', key, refill_rate * 2) return 1 -- 허용 else return 0 -- 거부 end """ result = self.redis.eval( lua_script, 1, key, max_tokens, refill_rate, time.time() ) if result == 0: # 다음 토큰까지 대기 시간 계산 wait_time = refill_rate / max_tokens raise ACMERateLimitError(retry_after=wait_time) return True # 사용 예시 rate_limiter = RateLimiter(redis_client, { "new_order": {"max_requests": 50, "per_seconds": 3600}, "challenge_validation": {"max_requests": 100, "per_seconds": 3600} }) # Order 생성 전 체크 rate_limiter.check_and_consume("new_order", account_id) order = acme_client.new_order(domains) ``` ### 3. 증명서 갱신 스케줄링 #### 옵션 A: Cron 기반 (간단) ```python # renewal_daemon.py import logging from datetime import datetime, timedelta from sqlalchemy.orm import Session logger = logging.getLogger(__name__) class CertificateRenewalScheduler: def __init__(self, db: Session, acme_client, renew_before_days=30): self.db = db self.acme_client = acme_client self.renew_before_days = renew_before_days def scan_and_renew(self): """만료 임박 증명서 갱신""" threshold = datetime.utcnow() + timedelta(days=self.renew_before_days) # 갱신 대상 조회 certificates = self.db.query(Certificate).filter( Certificate.not_after <= threshold, Certificate.status == "active", Certificate.auto_renew == True ).all() logger.info(f"Found {len(certificates)} certificates to renew") for cert in certificates: try: self._renew_certificate(cert) except Exception as e: logger.error(f"Failed to renew {cert.common_name}: {e}") self._send_alert(cert, e) def _renew_certificate(self, cert): """단일 증명서 갱신""" logger.info(f"Renewing certificate: {cert.common_name}") # 동일한 도메인으로 새 Order 생성 domains = [cert.common_name] + cert.sans new_cert, private_key = self.acme_client.issue_certificate( domains=domains, challenge_type=cert.challenge_type ) # DB 업데이트 cert.certificate_pem = new_cert cert.renewed_at = datetime.utcnow() cert.not_after = self._parse_expiry(new_cert) self.db.commit() # 배포 작업 큐에 추가 (v7에서 다룸) self._enqueue_deployment(cert.id) logger.info(f"✓ Certificate renewed: {cert.common_name}") # Cron: 매일 오전 3시 실행 # 0 3 * * * /usr/bin/python3 /opt/acme/renewal_daemon.py if __name__ == "__main__": scheduler = CertificateRenewalScheduler(db, acme_client) scheduler.scan_and_renew() ``` #### 옵션 B: Celery 기반 (고급) ```python # tasks/renewal.py from celery import Celery from celery.schedules import crontab app = Celery('acme_tasks', broker='redis://localhost:6379/0') app.conf.beat_schedule = { 'renew-certificates-daily': { 'task': 'tasks.renewal.scan_and_renew', 'schedule': crontab(hour=3, minute=0), # 매일 03:00 }, } @app.task(bind=True, max_retries=3) def renew_certificate(self, certificate_id): """비동기 증명서 갱신 작업""" try: cert = db.query(Certificate).get(certificate_id) acme_client = get_acme_client(cert.ca_provider) new_cert, key = acme_client.issue_certificate( domains=[cert.common_name] + cert.sans, challenge_type=cert.challenge_type ) # DB 업데이트 cert.certificate_pem = new_cert cert.renewed_at = datetime.utcnow() db.commit() # 배포 작업 체인 deploy_certificate.delay(certificate_id) except Exception as e: logger.error(f"Renewal failed for cert {certificate_id}: {e}") # Exponential backoff retry raise self.retry(exc=e, countdown=60 * (2 ** self.request.retries)) @app.task def scan_and_renew(): """스캔 후 갱신 작업 분산""" threshold = datetime.utcnow() + timedelta(days=30) cert_ids = db.query(Certificate.id).filter( Certificate.not_after <= threshold, Certificate.auto_renew == True ).all() # 각 증명서를 개별 Celery task로 분산 for (cert_id,) in cert_ids: renew_certificate.delay(cert_id) # Celery worker 실행 # celery -A tasks.renewal worker --beat --loglevel=info ``` ### 4. 구조화된 로깅 ```python # core/logging.py import logging import json from pythonjsonlogger import jsonlogger class ACMEContextFilter(logging.Filter): """ACME 컨텍스트 정보 자동 추가""" def filter(self, record): # 현재 처리 중인 order/certificate 정보 추가 if hasattr(record, 'order_id'): record.order_id = record.order_id if hasattr(record, 'domain'): record.domain = record.domain return True def setup_logging(): """JSON 구조화 로깅 설정""" logger = logging.getLogger() logger.setLevel(logging.INFO) # JSON formatter formatter = jsonlogger.JsonFormatter( '%(timestamp)s %(level)s %(name)s %(message)s %(order_id)s %(domain)s' ) handler = logging.StreamHandler() handler.setFormatter(formatter) logger.addHandler(handler) logger.addFilter(ACMEContextFilter()) # 사용 예시 logger = logging.getLogger(__name__) def issue_certificate(domains): logger.info( "Starting certificate issuance", extra={"domains": domains, "order_id": order.id} ) # JSON 출력: # { # "timestamp": "2025-12-19T10:30:00Z", # "level": "INFO", # "name": "acme_client", # "message": "Starting certificate issuance", # "domains": ["www.example.com"], # "order_id": "ord_abc123" # } ``` ### 5. 메트릭과 모니터링 ```python # monitoring/metrics.py from prometheus_client import Counter, Histogram, Gauge, start_http_server # Metrics 정의 certificate_issuance_total = Counter( 'acme_certificate_issuance_total', 'Total certificate issuances', ['status', 'ca_provider'] ) certificate_issuance_duration = Histogram( 'acme_certificate_issuance_duration_seconds', 'Certificate issuance duration', ['ca_provider'] ) certificate_expiry_days = Gauge( 'acme_certificate_expiry_days', 'Days until certificate expiration', ['common_name', 'sans'] ) challenge_validation_failures = Counter( 'acme_challenge_validation_failures_total', 'Challenge validation failures', ['challenge_type', 'domain'] ) # 사용 예시 class MonitoredACMEClient: def issue_certificate(self, domains): with certificate_issuance_duration.labels(ca_provider="globalsign").time(): try: cert, key = self._issue_certificate_impl(domains) certificate_issuance_total.labels( status="success", ca_provider="globalsign" ).inc() return cert, key except Exception as e: certificate_issuance_total.labels( status="failure", ca_provider="globalsign" ).inc() raise def update_expiry_metrics(self): """주기적으로 만료일 메트릭 업데이트""" certificates = db.query(Certificate).filter( Certificate.status == "active" ).all() for cert in certificates: days_until_expiry = (cert.not_after - datetime.utcnow()).days certificate_expiry_days.labels( common_name=cert.common_name, sans=",".join(cert.sans) ).set(days_until_expiry) # Prometheus 서버 시작 (포트 9090) start_http_server(9090) ``` ### Grafana 대시보드 쿼리 예시 ```promql # 최근 24시간 증명서 발급 성공률 rate(acme_certificate_issuance_total{status="success"}[24h]) / rate(acme_certificate_issuance_total[24h]) # 30일 이내 만료 예정 증명서 수 count(acme_certificate_expiry_days < 30) # 평균 발급 소요 시간 histogram_quantile(0.99, rate(acme_certificate_issuance_duration_seconds_bucket[5m])) ``` ## 설정 관리 ```python # config.py from pydantic import BaseSettings, Field from typing import Dict, List class ACMEConfig(BaseSettings): """환경 변수 기반 설정 (12-Factor App)""" # CA Provider 설정 ca_provider: str = Field("globalsign", env="ACME_CA_PROVIDER") acme_directory_url: str = Field(..., env="ACME_DIRECTORY_URL") eab_kid: str = Field(..., env="ACME_EAB_KID") eab_hmac_key: str = Field(..., env="ACME_EAB_HMAC_KEY") # Database database_url: str = Field(..., env="DATABASE_URL") # Redis redis_url: str = Field("redis://localhost:6379/0", env="REDIS_URL") # 갱신 설정 renew_before_days: int = Field(30, env="ACME_RENEW_BEFORE_DAYS") auto_renew_enabled: bool = Field(True, env="ACME_AUTO_RENEW") # Rate limits (per hour) rate_limits: Dict[str, int] = { "new_account": 10, "new_order": 50, "challenge_validation": 100 } # Retry 설정 max_retry_attempts: int = 5 initial_retry_delay: int = 2 # 알림 alert_email: List[str] = Field([], env="ACME_ALERT_EMAILS") slack_webhook_url: str = Field("", env="SLACK_WEBHOOK_URL") class Config: env_file = ".env" env_file_encoding = "utf-8" # 사용 config = ACMEConfig() ``` ## 다음 단계 v6에서는 증명서 관리 DB와 RESTful API를 설계한다: - 증명서 메타데이터 스키마 - 발급 이력 추적 - 배포 대상 관리 - API 엔드포인트 구현 --- **시리즈 목차** 1. SSL 증명서 자동화 필요성과 프로젝트 개요 2. ACME 프로토콜 이해와 동작 원리 3. GlobalSign Atlas ACME 연동 실습 4. 도메인 검증 방식 구현 (HTTP-01, DNS-01) 5. **[현재글] ACME 클라이언트 선정과 커스텀 구현** 6. 증명서 관리 DB 설계와 API 구현 7. 증명서 배포 자동화 파이프라인 구축 8. 승인 워크플로우와 거버넌스 구현 9. PQC 대응 설계와 미래 확장성 10. 전체 시스템 통합과 프로덕션 배포 가이드