# SSL 증명서 자동화 프로젝트 (7/10): 증명서 배포 자동화 파이프라인 ## 배포 자동화의 핵심 과제 증명서를 발급받는 것은 전체 과정의 절반일 뿐이다. 진짜 도전은 **300개 서버에 안전하게 배포**하는 것이다: 1. **Downtime Zero**: 서비스 중단 없이 교체 2. **Atomic Operation**: 성공 또는 롤백 (중간 상태 없음) 3. **Audit Trail**: 누가, 언제, 어디에 배포했는지 추적 4. **Credential Management**: SSH 키, API 토큰 안전 보관 ## 배포 대상별 전략 ``` ┌────────────────────┬──────────────────┬─────────────────────────────┐ │ Target Type │ Deployment Method│ Reload Command │ ├────────────────────┼──────────────────┼─────────────────────────────┤ │ Nginx/Apache │ Ansible/SSH │ systemctl reload nginx │ │ HAProxy │ Ansible/SSH │ systemctl reload haproxy │ │ F5 BIG-IP │ F5 iControl API │ API call (no reload) │ │ AWS ALB/CloudFront │ AWS SDK (boto3) │ N/A (instant) │ │ Kubernetes Ingress │ kubectl/K8s API │ rolling restart │ │ CDN (CloudFlare) │ CloudFlare API │ instant │ └────────────────────┴──────────────────┴─────────────────────────────┘ ``` ## Ansible 기반 배포: 리눅스 웹서버 ### Ansible 플레이북 구조 ``` ansible/ ├── inventory/ │ ├── production/ │ │ └── hosts.yml │ └── staging/ │ └── hosts.yml ├── playbooks/ │ ├── deploy_certificate.yml │ ├── rollback_certificate.yml │ └── verify_certificate.yml ├── roles/ │ └── certificate_deployment/ │ ├── tasks/ │ │ ├── main.yml │ │ ├── backup.yml │ │ ├── deploy.yml │ │ └── reload.yml │ └── templates/ │ └── nginx_ssl.conf.j2 └── ansible.cfg ``` ### 메인 플레이북 ```yaml # playbooks/deploy_certificate.yml --- - name: Deploy SSL Certificate hosts: "{{ target_group }}" become: yes serial: "{{ batch_size | default(5) }}" # 5대씩 순차 배포 max_fail_percentage: 10 # 10% 이상 실패 시 중단 vars: cert_common_name: "{{ common_name }}" cert_path: "/etc/nginx/ssl/{{ common_name }}.crt" key_path: "/etc/nginx/ssl/{{ common_name }}.key" chain_path: "/etc/nginx/ssl/{{ common_name }}-chain.crt" backup_dir: "/etc/nginx/ssl/backup/{{ ansible_date_time.epoch }}" pre_tasks: - name: Create backup directory file: path: "{{ backup_dir }}" state: directory mode: '0700' - name: Backup existing certificates copy: src: "{{ item }}" dest: "{{ backup_dir }}/" remote_src: yes loop: - "{{ cert_path }}" - "{{ key_path }}" - "{{ chain_path }}" ignore_errors: yes # 첫 배포 시 파일 없을 수 있음 tasks: - name: Copy certificate copy: content: "{{ certificate_pem }}" dest: "{{ cert_path }}" mode: '0644' owner: root group: root register: cert_copy - name: Copy private key copy: content: "{{ private_key_pem }}" dest: "{{ key_path }}" mode: '0600' # 중요: 비공개 키는 root만 읽기 가능 owner: root group: root register: key_copy - name: Copy certificate chain copy: content: "{{ chain_pem }}" dest: "{{ chain_path }}" mode: '0644' owner: root group: root - name: Verify certificate and key match shell: | CERT_MD5=$(openssl x509 -noout -modulus -in {{ cert_path }} | openssl md5) KEY_MD5=$(openssl rsa -noout -modulus -in {{ key_path }} | openssl md5) [ "$CERT_MD5" = "$KEY_MD5" ] changed_when: false - name: Test Nginx configuration command: nginx -t changed_when: false register: nginx_test - name: Reload Nginx (graceful) systemd: name: nginx state: reloaded when: nginx_test is success - name: Wait for Nginx to be ready wait_for: port: 443 delay: 2 timeout: 30 - name: Verify HTTPS endpoint uri: url: "https://{{ common_name }}" validate_certs: yes status_code: [200, 301, 302] delegate_to: localhost retries: 3 delay: 5 post_tasks: - name: Record deployment timestamp lineinfile: path: "/etc/nginx/ssl/.deployment_log" line: "{{ ansible_date_time.iso8601 }} | {{ common_name }} | {{ ansible_hostname }}" create: yes rescue: - name: Rollback on failure import_playbook: rollback_certificate.yml vars: backup_path: "{{ backup_dir }}" ``` ### Python에서 Ansible 실행 ```python # deployment/ansible_executor.py import subprocess import json import tempfile from pathlib import Path class AnsibleDeployment: def __init__(self, playbook_path, inventory_path): self.playbook_path = playbook_path self.inventory_path = inventory_path def deploy_certificate(self, certificate, targets): """ 증명서를 여러 대상에 배포 Args: certificate: Certificate ORM 객체 targets: List[DeploymentTarget] """ # 동적 inventory 생성 inventory = self._create_dynamic_inventory(targets) # Extra vars (증명서 내용 전달) extra_vars = { "common_name": certificate.common_name, "certificate_pem": certificate.certificate_pem, "private_key_pem": self._decrypt_private_key(certificate), "chain_pem": certificate.certificate_chain_pem, "batch_size": 5 # 5대씩 배포 } # Ansible 실행 result = self._run_ansible_playbook( playbook="deploy_certificate.yml", inventory=inventory, extra_vars=extra_vars ) return result def _create_dynamic_inventory(self, targets): """동적 inventory 생성""" inventory = { "webservers": { "hosts": {} }, "_meta": { "hostvars": {} } } for target in targets: hostname = target.hostname # 호스트 추가 inventory["webservers"]["hosts"][hostname] = {} # 호스트별 변수 inventory["_meta"]["hostvars"][hostname] = { "ansible_host": hostname, "ansible_port": target.port, "ansible_user": "deploy", "ansible_ssh_private_key_file": self._get_ssh_key(target), "cert_path": target.cert_path, "key_path": target.key_path, "chain_path": target.chain_path, "reload_command": target.reload_command } return inventory def _run_ansible_playbook(self, playbook, inventory, extra_vars): """Ansible playbook 실행""" # 임시 inventory 파일 생성 with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f: json.dump(inventory, f) inventory_file = f.name # Extra vars 파일 생성 with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f: json.dump(extra_vars, f) extra_vars_file = f.name try: # ansible-playbook 실행 cmd = [ "ansible-playbook", f"{self.playbook_path}/{playbook}", "-i", inventory_file, "-e", f"@{extra_vars_file}", "--diff", "-v" ] result = subprocess.run( cmd, capture_output=True, text=True, timeout=600 # 10분 타임아웃 ) return { "success": result.returncode == 0, "stdout": result.stdout, "stderr": result.stderr, "returncode": result.returncode } finally: # 임시 파일 삭제 Path(inventory_file).unlink(missing_ok=True) Path(extra_vars_file).unlink(missing_ok=True) def _get_ssh_key(self, target): """Vault에서 SSH 키 가져오기""" import hvac vault_client = hvac.Client(url="https://vault.example.com") vault_client.token = os.getenv("VAULT_TOKEN") secret = vault_client.secrets.kv.v2.read_secret_version( path=target.credentials_vault_path ) # SSH 키를 임시 파일로 저장 with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.pem') as f: f.write(secret['data']['data']['private_key']) f.flush() os.chmod(f.name, 0o600) return f.name def _decrypt_private_key(self, certificate): """암호화된 private key 복호화""" # Vault 또는 KMS 사용 # 여기서는 간략화 return certificate.private_key_encrypted ``` ## AWS 리소스 배포: ALB/CloudFront ```python # deployment/aws_deployer.py import boto3 from datetime import datetime class AWSCertificateDeployer: def __init__(self): self.acm = boto3.client('acm') self.elbv2 = boto3.client('elbv2') self.cloudfront = boto3.client('cloudfront') def deploy_to_alb(self, certificate, alb_arn): """ALB에 증명서 배포""" # 1. ACM에 증명서 import acm_cert_arn = self.acm.import_certificate( Certificate=certificate.certificate_pem.encode(), PrivateKey=self._decrypt_private_key(certificate).encode(), CertificateChain=certificate.certificate_chain_pem.encode() )['CertificateArn'] # 2. ALB Listener에서 기존 증명서 찾기 listeners = self.elbv2.describe_listeners( LoadBalancerArn=alb_arn )['Listeners'] https_listener = next( (l for l in listeners if l['Protocol'] == 'HTTPS'), None ) if not https_listener: raise ValueError("No HTTPS listener found") # 3. 증명서 교체 self.elbv2.modify_listener( ListenerArn=https_listener['ListenerArn'], Certificates=[{'CertificateArn': acm_cert_arn}] ) # 4. 이전 증명서 삭제 (선택적) old_cert_arn = https_listener['Certificates'][0]['CertificateArn'] self._schedule_cert_deletion(old_cert_arn, days=7) return acm_cert_arn def deploy_to_cloudfront(self, certificate, distribution_id): """CloudFront에 증명서 배포""" # 1. ACM (us-east-1 리전에서만) acm_us_east_1 = boto3.client('acm', region_name='us-east-1') acm_cert_arn = acm_us_east_1.import_certificate( Certificate=certificate.certificate_pem.encode(), PrivateKey=self._decrypt_private_key(certificate).encode(), CertificateChain=certificate.certificate_chain_pem.encode() )['CertificateArn'] # 2. CloudFront distribution 업데이트 distribution_config = self.cloudfront.get_distribution_config( Id=distribution_id ) etag = distribution_config['ETag'] config = distribution_config['DistributionConfig'] # 증명서 ARN 교체 config['ViewerCertificate']['ACMCertificateArn'] = acm_cert_arn config['ViewerCertificate']['Certificate'] = acm_cert_arn # 배포 (수 분 소요) self.cloudfront.update_distribution( Id=distribution_id, DistributionConfig=config, IfMatch=etag ) # 3. 배포 완료 대기 waiter = self.cloudfront.get_waiter('distribution_deployed') waiter.wait(Id=distribution_id, WaiterConfig={'Delay': 30, 'MaxAttempts': 60}) return acm_cert_arn ``` ## Kubernetes Ingress 배포 ```python # deployment/k8s_deployer.py from kubernetes import client, config class K8sCertificateDeployer: def __init__(self, kubeconfig_path=None): if kubeconfig_path: config.load_kube_config(config_file=kubeconfig_path) else: config.load_incluster_config() self.v1 = client.CoreV1Api() def deploy_as_secret(self, certificate, namespace, secret_name): """TLS Secret으로 배포""" # Secret 데이터 secret_data = { "tls.crt": base64.b64encode(certificate.certificate_pem.encode()).decode(), "tls.key": base64.b64encode(self._decrypt_private_key(certificate).encode()).decode() } secret = client.V1Secret( metadata=client.V1ObjectMeta( name=secret_name, namespace=namespace, annotations={ "cert-manager.io/common-name": certificate.common_name, "acme/issued-at": certificate.issued_at.isoformat() } ), type="kubernetes.io/tls", data=secret_data ) # Upsert try: self.v1.create_namespaced_secret(namespace, secret) except client.exceptions.ApiException as e: if e.status == 409: # Already exists self.v1.replace_namespaced_secret(secret_name, namespace, secret) else: raise # Ingress가 새 Secret을 사용하도록 reload (pod restart) self._rollout_restart_ingress_controller(namespace) def _rollout_restart_ingress_controller(self, namespace): """Ingress controller pod restart""" apps_v1 = client.AppsV1Api() # nginx-ingress-controller deployment 재시작 deployments = apps_v1.list_namespaced_deployment( namespace=namespace, label_selector="app.kubernetes.io/name=ingress-nginx" ) for deployment in deployments.items: # 재시작 trigger (annotation 변경) deployment.spec.template.metadata.annotations = { "kubectl.kubernetes.io/restartedAt": datetime.utcnow().isoformat() } apps_v1.patch_namespaced_deployment( name=deployment.metadata.name, namespace=namespace, body=deployment ) ``` ## Celery Task: 배포 오케스트레이션 ```python # tasks/deployment.py from celery import chain, group from tasks import app @app.task(bind=True, max_retries=3) def deploy_certificate_to_target(self, certificate_id, target_id): """단일 대상에 배포""" from deployment.ansible_executor import AnsibleDeployment cert = db.query(Certificate).get(certificate_id) target = db.query(DeploymentTarget).get(target_id) try: # 배포 기록 생성 deployment = CertificateDeployment( certificate_id=certificate_id, target_id=target_id, status="deploying" ) db.add(deployment) db.commit() # 실제 배포 if target.type == DeploymentTargetType.NGINX: deployer = AnsibleDeployment() result = deployer.deploy_certificate(cert, [target]) elif target.type == DeploymentTargetType.AWS_ALB: deployer = AWSCertificateDeployer() result = deployer.deploy_to_alb(cert, target.aws_resource_arn) # ... 기타 타입 # 성공 deployment.status = "success" deployment.deployed_at = datetime.utcnow() deployment.deployment_duration_seconds = ( datetime.utcnow() - deployment.created_at ).total_seconds() db.commit() except Exception as e: # 실패 시 롤백 deployment.status = "failed" deployment.error_message = str(e) db.commit() # 재시도 raise self.retry(exc=e, countdown=60 * (2 ** self.request.retries)) @app.task def deploy_certificate_to_all_targets(certificate_id): """모든 대상에 병렬 배포""" # 배포 대상 조회 deployments = db.query(CertificateDeployment).filter( CertificateDeployment.certificate_id == certificate_id ).all() target_ids = [d.target_id for d in deployments] # 병렬 배포 (Celery group) job = group( deploy_certificate_to_target.s(certificate_id, target_id) for target_id in target_ids ) result = job.apply_async() return { "job_id": result.id, "total_targets": len(target_ids) } ``` ## 배포 검증 ```python # deployment/verifier.py import requests import ssl import socket from cryptography import x509 from cryptography.hazmat.backends import default_backend class DeploymentVerifier: def verify_https_endpoint(self, domain, expected_cert_cn): """HTTPS 엔드포인트에서 증명서 검증""" context = ssl.create_default_context() with socket.create_connection((domain, 443), timeout=10) as sock: with context.wrap_socket(sock, server_hostname=domain) as ssock: # DER 형식 증명서 가져오기 der_cert = ssock.getpeercert(binary_form=True) # 파싱 cert = x509.load_der_x509_certificate(der_cert, default_backend()) # CN 확인 cn = cert.subject.get_attributes_for_oid(x509.NameOID.COMMON_NAME)[0].value if cn != expected_cert_cn: raise ValueError(f"CN mismatch: expected {expected_cert_cn}, got {cn}") # 유효기간 확인 if cert.not_valid_after < datetime.utcnow(): raise ValueError("Certificate expired") return { "cn": cn, "not_after": cert.not_valid_after, "issuer": cert.issuer.rfc4514_string(), "serial": cert.serial_number } ``` ## 다음 단계 v8에서는 승인 워크플로우와 거버넌스를 구현한다: - 발급 요청 승인 프로세스 - RBAC (Role-Based Access Control) - 감사 로그와 컴플라이언스 - 비용 할당 (chargeback) --- **시리즈 목차** 1. SSL 증명서 자동화 필요성과 프로젝트 개요 2. ACME 프로토콜 이해와 동작 원리 3. GlobalSign Atlas ACME 연동 실습 4. 도메인 검증 방식 구현 (HTTP-01, DNS-01) 5. ACME 클라이언트 선정과 커스텀 구현 6. 증명서 관리 DB 설계와 API 구현 7. **[현재글] 증명서 배포 자동화 파이프라인 구축** 8. 승인 워크플로우와 거버넌스 구현 9. PQC 대응 설계와 미래 확장성 10. 전체 시스템 통합과 프로덕션 배포 가이드