# SSL 증명서 자동화 프로젝트 (4/10): 도메인 검증 방식 완벽 구현 ## Challenge 선택 전략 ACME는 3가지 도메인 검증 방식을 제공하며, 각각 다른 사용 사례에 최적화되어 있다. ``` ┌────────────────────┬─────────────┬──────────────────┬─────────────────┐ │ Challenge Type │ 요구사항 │ 와일드카드 지원 │ 방화벽 친화적 │ ├────────────────────┼─────────────┼──────────────────┼─────────────────┤ │ HTTP-01 │ 80포트 개방 │ ✗ │ △ (80 필요) │ │ DNS-01 │ DNS API │ ✓ │ ✓ │ │ TLS-ALPN-01 │ 443포트 개방 │ ✗ │ ✓ │ └────────────────────┴─────────────┴──────────────────┴─────────────────┘ ``` **우리 조직의 선택**: - **퍼블릭 웹서버**: HTTP-01 (Nginx 자동 설정) - **내부 서비스/와일드카드**: DNS-01 (Route53 자동화) - **로드밸런서**: TLS-ALPN-01 (F5/HAProxy) ## HTTP-01 Challenge 완전 자동화 ### 동작 원리 재확인 ``` 1. ACME 서버가 랜덤 token 발급 2. 클라이언트가 key_authorization 계산 3. 파일 배치: /.well-known/acme-challenge/{token} 4. ACME 서버 검증: GET http://{domain}/.well-known/acme-challenge/{token} 5. 응답 body == key_authorization → 통과 ``` ### Nginx 자동 설정 구현 #### 전략 1: 전용 디렉토리 + 심볼릭 링크 ```python # challenges/http01.py import os import subprocess from pathlib import Path class HTTP01Challenge: def __init__(self, webroot="/var/www/acme-challenges"): self.webroot = Path(webroot) self.webroot.mkdir(parents=True, exist_ok=True) def setup_nginx(self): """Nginx에 ACME challenge 디렉토리 설정""" nginx_config = """ # /etc/nginx/snippets/acme-challenge.conf location ^~ /.well-known/acme-challenge/ { default_type "text/plain"; root /var/www/acme-challenges; try_files $uri =404; } """ config_path = Path("/etc/nginx/snippets/acme-challenge.conf") config_path.write_text(nginx_config) # 모든 server 블록에 include 추가 # (실제로는 Ansible/Terraform으로 관리) print("✓ Add 'include snippets/acme-challenge.conf;' to each server block") # Nginx 설정 테스트 후 reload subprocess.run(["nginx", "-t"], check=True) subprocess.run(["systemctl", "reload", "nginx"], check=True) def create_challenge_file(self, token, key_authorization): """Challenge 파일 생성""" challenge_path = self.webroot / ".well-known" / "acme-challenge" / token challenge_path.parent.mkdir(parents=True, exist_ok=True) challenge_path.write_text(key_authorization) challenge_path.chmod(0o644) print(f"✓ Challenge file created: {challenge_path}") return challenge_path def cleanup_challenge_file(self, token): """Challenge 파일 삭제""" challenge_path = self.webroot / ".well-known" / "acme-challenge" / token challenge_path.unlink(missing_ok=True) print(f"✓ Challenge file removed: {challenge_path}") def verify_http_reachable(self, domain, token): """검증 전 HTTP 접근성 테스트""" import requests url = f"http://{domain}/.well-known/acme-challenge/{token}" try: response = requests.get(url, timeout=10) return response.status_code == 200 except Exception as e: print(f"✗ HTTP verification failed: {e}") return False ``` #### 전략 2: Nginx 동적 설정 (HTTP API 사용) 멀티 테넌트 환경에서 파일시스템 접근 없이 처리하는 방법: ```python class HTTP01ChallengeNginxPlus: """Nginx Plus의 Key-Value Store 활용""" def __init__(self, nginx_api="http://localhost:8080/api"): self.nginx_api = nginx_api def setup_nginx_plus(self): """Nginx Plus 설정 (key-value zone 생성)""" nginx_config = """ # /etc/nginx/conf.d/acme-kv.conf keyval_zone zone=acme_challenges:1m; keyval $request_uri $acme_response zone=acme_challenges; server { listen 80; server_name _; location ^~ /.well-known/acme-challenge/ { default_type "text/plain"; return 200 $acme_response; } location /api { api write=on; allow 127.0.0.1; deny all; } } """ print("Configure Nginx Plus with the above config") def create_challenge_kv(self, token, key_authorization): """Nginx Plus API로 key-value 등록""" import requests # Key: URI path, Value: key_authorization key = f"/.well-known/acme-challenge/{token}" response = requests.post( f"{self.nginx_api}/keyvals/acme_challenges", json={key: key_authorization} ) if response.status_code == 201: print(f"✓ Challenge registered in Nginx KV store") else: raise Exception(f"Failed to register challenge: {response.text}") def cleanup_challenge_kv(self, token): """Key-value 삭제""" import requests key = f"/.well-known/acme-challenge/{token}" requests.delete(f"{self.nginx_api}/keyvals/acme_challenges/{key}") ``` ### Apache 통합 ```python class HTTP01ChallengeApache: def __init__(self, webroot="/var/www/html"): self.webroot = Path(webroot) def setup_apache(self): """Apache Alias 설정""" apache_config = """ # /etc/apache2/conf-available/acme-challenge.conf Alias /.well-known/acme-challenge/ /var/www/acme-challenges/.well-known/acme-challenge/ Options None AllowOverride None Require all granted ForceType text/plain """ config_path = Path("/etc/apache2/conf-available/acme-challenge.conf") config_path.write_text(apache_config) subprocess.run(["a2enconf", "acme-challenge"], check=True) subprocess.run(["systemctl", "reload", "apache2"], check=True) def create_challenge_file(self, token, key_authorization): """Challenge 파일 생성 (Nginx와 동일)""" # 구현 동일... pass ``` ## DNS-01 Challenge 완전 자동화 DNS-01은 **와일드카드 증명서**에 필수이며, 방화벽 뒤 서버에도 적용 가능하다. ### AWS Route53 통합 ```python # challenges/dns01_route53.py import boto3 import time import hashlib import base64 class DNS01ChallengeRoute53: def __init__(self): self.route53 = boto3.client('route53') self.hosted_zones = self._load_hosted_zones() def _load_hosted_zones(self): """Hosted Zone 캐싱""" zones = {} paginator = self.route53.get_paginator('list_hosted_zones') for page in paginator.paginate(): for zone in page['HostedZones']: # example.com. → example.com domain = zone['Name'].rstrip('.') zones[domain] = zone['Id'].split('/')[-1] return zones def _get_hosted_zone_id(self, domain): """도메인에 해당하는 Hosted Zone ID 찾기""" # www.example.com → example.com # api.sub.example.com → sub.example.com or example.com parts = domain.split('.') for i in range(len(parts)): candidate = '.'.join(parts[i:]) if candidate in self.hosted_zones: return self.hosted_zones[candidate] raise ValueError(f"No hosted zone found for {domain}") def create_txt_record(self, domain, token, key_authorization): """DNS TXT 레코드 생성""" # DNS challenge 값 계산 dns_value = base64.urlsafe_b64encode( hashlib.sha256(key_authorization.encode()).digest() ).rstrip(b'=').decode() # Hosted Zone 찾기 hosted_zone_id = self._get_hosted_zone_id(domain) # TXT 레코드 이름 record_name = f"_acme-challenge.{domain}" # Change batch response = self.route53.change_resource_record_sets( HostedZoneId=hosted_zone_id, ChangeBatch={ 'Comment': f'ACME challenge for {domain}', 'Changes': [{ 'Action': 'UPSERT', 'ResourceRecordSet': { 'Name': record_name, 'Type': 'TXT', 'TTL': 60, 'ResourceRecords': [{'Value': f'"{dns_value}"'}] } }] } ) change_id = response['ChangeInfo']['Id'] print(f"✓ DNS TXT record created: {record_name} = {dns_value}") return change_id, dns_value def wait_for_propagation(self, change_id): """DNS 변경 전파 대기 (Route53 API 사용)""" print("⏳ Waiting for DNS propagation...") waiter = self.route53.get_waiter('resource_record_sets_changed') waiter.wait( Id=change_id, WaiterConfig={'Delay': 10, 'MaxAttempts': 60} ) print("✓ DNS change propagated (Route53 confirmed)") def verify_dns_record(self, domain, expected_value): """실제 DNS 조회로 검증 (Public DNS 사용)""" import dns.resolver record_name = f"_acme-challenge.{domain}" # 여러 public DNS에서 확인 resolvers = ['8.8.8.8', '1.1.1.1', '208.67.222.222'] # Google, Cloudflare, OpenDNS for resolver_ip in resolvers: resolver = dns.resolver.Resolver() resolver.nameservers = [resolver_ip] try: answers = resolver.resolve(record_name, 'TXT') for rdata in answers: txt_value = rdata.strings[0].decode().strip('"') if txt_value == expected_value: print(f"✓ DNS verified via {resolver_ip}: {txt_value}") return True except Exception as e: print(f"⚠ DNS query failed via {resolver_ip}: {e}") return False def cleanup_txt_record(self, domain): """TXT 레코드 삭제""" hosted_zone_id = self._get_hosted_zone_id(domain) record_name = f"_acme-challenge.{domain}" # 기존 레코드 조회 response = self.route53.list_resource_record_sets( HostedZoneId=hosted_zone_id, StartRecordName=record_name, StartRecordType='TXT', MaxItems='1' ) if response['ResourceRecordSets']: record = response['ResourceRecordSets'][0] if record['Name'].rstrip('.') == record_name: self.route53.change_resource_record_sets( HostedZoneId=hosted_zone_id, ChangeBatch={ 'Changes': [{ 'Action': 'DELETE', 'ResourceRecordSet': record }] } ) print(f"✓ DNS TXT record deleted: {record_name}") ``` ### Cloudflare 통합 ```python class DNS01ChallengeCloudflare: def __init__(self, api_token): self.api_token = api_token self.base_url = "https://api.cloudflare.com/client/v4" def _get_zone_id(self, domain): """Zone ID 조회""" import requests # example.com 추출 parts = domain.split('.') root_domain = '.'.join(parts[-2:]) response = requests.get( f"{self.base_url}/zones", headers={ "Authorization": f"Bearer {self.api_token}", "Content-Type": "application/json" }, params={"name": root_domain} ) zones = response.json()['result'] if not zones: raise ValueError(f"Zone not found: {root_domain}") return zones[0]['id'] def create_txt_record(self, domain, dns_value): """Cloudflare API로 TXT 레코드 생성""" import requests zone_id = self._get_zone_id(domain) record_name = f"_acme-challenge.{domain}" response = requests.post( f"{self.base_url}/zones/{zone_id}/dns_records", headers={ "Authorization": f"Bearer {self.api_token}", "Content-Type": "application/json" }, json={ "type": "TXT", "name": record_name, "content": dns_value, "ttl": 120 # 최소 TTL } ) record_id = response.json()['result']['id'] print(f"✓ Cloudflare DNS TXT record created: {record_name}") return record_id def cleanup_txt_record(self, domain, record_id): """TXT 레코드 삭제""" import requests zone_id = self._get_zone_id(domain) requests.delete( f"{self.base_url}/zones/{zone_id}/dns_records/{record_id}", headers={"Authorization": f"Bearer {self.api_token}"} ) ``` ## TLS-ALPN-01 Challenge (고급) TLS-ALPN-01은 포트 443에서 TLS 핸드셰이크 시 ALPN 확장을 사용한다. ```python class TLSALPN01Challenge: """TLS-ALPN-01 Challenge (고급 사용자용)""" def create_self_signed_cert(self, domain, key_authorization): """ACME-TLS/1 프로토콜용 자체 서명 인증서 생성""" from cryptography import x509 from cryptography.x509.oid import ExtensionOID from cryptography.hazmat.primitives import hashes # acmeIdentifier extension acme_identifier = hashlib.sha256(key_authorization.encode()).digest() # 자체 서명 인증서 생성 private_key = rsa.generate_private_key(65537, 2048) cert = x509.CertificateBuilder().subject_name( x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, domain)]) ).issuer_name( x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, domain)]) ).public_key( private_key.public_key() ).serial_number( x509.random_serial_number() ).not_valid_before( datetime.utcnow() ).not_valid_after( datetime.utcnow() + timedelta(days=1) ).add_extension( x509.SubjectAlternativeName([x509.DNSName(domain)]), critical=False ).add_extension( # OID 1.3.6.1.5.5.7.1.31 (acmeIdentifier) x509.UnrecognizedExtension( oid=x509.ObjectIdentifier("1.3.6.1.5.5.7.1.31"), value=acme_identifier ), critical=True ).sign(private_key, hashes.SHA256()) return cert, private_key def configure_nginx_alpn(self, domain, cert, private_key): """Nginx에 TLS-ALPN-01용 임시 설정 적용""" # 실제로는 매우 복잡 - Nginx 동적 재설정 필요 # HAProxy가 더 적합 (runtime API 사용) pass ``` ## 멀티 도메인 병렬 검증 최적화 ```python import asyncio from concurrent.futures import ThreadPoolExecutor class ParallelChallengeHandler: def __init__(self, acme_client): self.acme_client = acme_client self.dns_handler = DNS01ChallengeRoute53() self.http_handler = HTTP01Challenge() async def handle_challenges_parallel(self, authorizations): """모든 도메인의 challenge를 병렬 처리""" tasks = [] for authz in authorizations: domain = authz.body.identifier.value # DNS-01 우선 (와일드카드 지원) for challenge in authz.body.challenges: if isinstance(challenge.chall, challenges.DNS01): tasks.append(self._handle_dns_challenge(authz, challenge, domain)) break # 병렬 실행 results = await asyncio.gather(*tasks, return_exceptions=True) for i, result in enumerate(results): if isinstance(result, Exception): print(f"✗ Challenge failed: {result}") raise result print(f"✓ All {len(tasks)} challenges validated") async def _handle_dns_challenge(self, authz, challenge, domain): """단일 DNS challenge 비동기 처리""" # 1. TXT 레코드 생성 validation = challenge.validation(self.acme_client.account_key) change_id, dns_value = self.dns_handler.create_txt_record( domain, challenge.token, validation ) # 2. DNS 전파 대기 (비동기) await asyncio.to_thread(self.dns_handler.wait_for_propagation, change_id) # 3. 검증 확인 if not self.dns_handler.verify_dns_record(domain, dns_value): raise Exception(f"DNS verification failed for {domain}") # 4. Challenge 응답 await asyncio.to_thread( self.acme_client.answer_challenge, challenge, challenge.response(self.acme_client.account_key) ) # 5. Validation 완료 대기 await asyncio.to_thread(self.acme_client.poll_authorization, authz) return domain ``` ## 다음 단계 v5에서는 ACME 클라이언트를 프로덕션 수준으로 구현한다: - 에러 핸들링과 재시도 로직 - Rate limit 대응 - 증명서 갱신 스케줄링 (cron vs 데몬) - 로깅과 모니터링 통합 --- **시리즈 목차** 1. SSL 증명서 자동화 필요성과 프로젝트 개요 2. ACME 프로토콜 이해와 동작 원리 3. GlobalSign Atlas ACME 연동 실습 4. **[현재글] 도메인 검증 방식 구현 (HTTP-01, DNS-01)** 5. ACME 클라이언트 선정과 커스텀 구현 6. 증명서 관리 DB 설계와 API 구현 7. 증명서 배포 자동화 파이프라인 구축 8. 승인 워크플로우와 거버넌스 구현 9. PQC 대응 설계와 미래 확장성 10. 전체 시스템 통합과 프로덕션 배포 가이드