# PlanitAI KPI - KPI 트리 엔진 설계 **시리즈**: PlanitAI KPI 개발기 v6/16 **작성일**: 2025-12-07 **작성자**: GemEgg Dev Team --- ## 1. KPI 트리 엔진 개요 ### 1.1 엔진의 역할 ``` ┌─────────────────────────────────────────────────────────────┐ │ KPI Tree Engine │ ├─────────────────────────────────────────────────────────────┤ │ │ │ 입력 처리 출력 │ │ ════ ════ ════ │ │ │ │ ┌─────────┐ ┌─────────────────┐ ┌─────────┐ │ │ │KPI 정의 │─────▶│ Tree Builder │─────▶│트리 구조│ │ │ └─────────┘ └─────────────────┘ └─────────┘ │ │ │ │ ┌─────────┐ ┌─────────────────┐ ┌─────────┐ │ │ │실적 데이터│─────▶│ Formula Parser │─────▶│계산 결과│ │ │ └─────────┘ └─────────────────┘ └─────────┘ │ │ │ │ ┌─────────┐ ┌─────────────────┐ ┌─────────┐ │ │ │트리 구조 │─────▶│ Analyzer │─────▶│분석 결과│ │ │ └─────────┘ └─────────────────┘ └─────────┘ │ │ │ │ 주요 기능: │ │ 1. 트리 구축 (Tree Builder) │ │ 2. 수식 파싱/계산 (Formula Parser) │ │ 3. 검증 (Validator) │ │ 4. 분석 (Analyzer) │ │ 5. 시뮬레이션 (Simulator) │ │ │ └─────────────────────────────────────────────────────────────┘ ``` --- ## 2. 트리 빌더 (Tree Builder) ### 2.1 트리 구축 알고리즘 ```python from typing import Dict, List, Optional from collections import defaultdict class KPITreeBuilder: """KPI 트리 구축기""" def __init__(self): self.nodes: Dict[str, KPINode] = {} self.adjacency: Dict[str, List[str]] = defaultdict(list) self.root_id: Optional[str] = None def build_from_flat_list(self, nodes: List[KPINode]) -> KPITree: """ 플랫 리스트에서 트리 구축 입력: [ {"id": "kpi_001", "parent_id": None, ...}, {"id": "kpi_002", "parent_id": "kpi_001", ...}, ... ] """ # 1. 노드 맵 생성 for node in nodes: self.nodes[node.id] = node # 2. 부모-자식 관계 구축 for node in nodes: if node.parent_id: self.adjacency[node.parent_id].append(node.id) # 자식 목록 업데이트 if node.parent_id in self.nodes: self.nodes[node.parent_id].children_ids.append(node.id) else: self.root_id = node.id # 3. 레벨 계산 self._calculate_levels() # 4. 검증 self._validate_tree() return KPITree( id=f"tree_{self.root_id}", organization_id="", # 외부에서 설정 name="", fiscal_year="", root_node_id=self.root_id, nodes=self.nodes ) def _calculate_levels(self): """BFS로 레벨 계산""" if not self.root_id: return queue = [(self.root_id, 0)] visited = set() while queue: node_id, level = queue.pop(0) if node_id in visited: continue visited.add(node_id) self.nodes[node_id].level = level for child_id in self.adjacency[node_id]: queue.append((child_id, level + 1)) def _validate_tree(self): """트리 검증""" # 순환 참조 검사 if self._has_cycle(): raise ValueError("Circular reference detected in KPI tree") # 고아 노드 검사 orphans = self._find_orphans() if orphans: raise ValueError(f"Orphan nodes found: {orphans}") def _has_cycle(self) -> bool: """DFS로 순환 참조 검사""" visited = set() rec_stack = set() def dfs(node_id: str) -> bool: visited.add(node_id) rec_stack.add(node_id) for child_id in self.adjacency.get(node_id, []): if child_id not in visited: if dfs(child_id): return True elif child_id in rec_stack: return True rec_stack.remove(node_id) return False for node_id in self.nodes: if node_id not in visited: if dfs(node_id): return True return False def _find_orphans(self) -> List[str]: """루트에서 도달 불가능한 노드 찾기""" if not self.root_id: return list(self.nodes.keys()) reachable = set() queue = [self.root_id] while queue: node_id = queue.pop(0) if node_id in reachable: continue reachable.add(node_id) queue.extend(self.adjacency.get(node_id, [])) return [nid for nid in self.nodes if nid not in reachable] ``` ### 2.2 트리 시각화 ```python class KPITreeVisualizer: """KPI 트리 시각화""" def to_ascii(self, tree: KPITree) -> str: """ASCII 트리 출력""" if not tree.root_node_id: return "Empty tree" lines = [] self._build_ascii(tree, tree.root_node_id, "", True, lines) return "\n".join(lines) def _build_ascii( self, tree: KPITree, node_id: str, prefix: str, is_last: bool, lines: List[str] ): node = tree.get_node(node_id) if not node: return connector = "└── " if is_last else "├── " lines.append(f"{prefix}{connector}{node.name} ({node.unit_label})") children = tree.get_children(node_id) for i, child in enumerate(children): is_child_last = (i == len(children) - 1) new_prefix = prefix + (" " if is_last else "│ ") self._build_ascii(tree, child.id, new_prefix, is_child_last, lines) # 사용 예시 """ └── 売上高 (円) ├── 契約数 (件) │ ├── 成約率 (%) │ └── 商談数 (件) │ ├── 商談化率 (%) │ └── リード数 (件) └── 契約単価 (円) """ ``` --- ## 3. 수식 파서 (Formula Parser) ### 3.1 수식 문법 ``` ┌─────────────────────────────────────────────────────────────┐ │ Formula Grammar │ ├─────────────────────────────────────────────────────────────┤ │ │ │ 지원 연산자: │ │ ├── * (곱셈) : 契約数 * 契約単価 │ │ ├── + (덧셈) : 新規売上 + 継続売上 │ │ ├── - (뺄셈) : 売上高 - 売上原価 │ │ ├── / (나눗셈) : 売上高 / 契約数 │ │ └── () (괄호) : (売上高 - 原価) / 売上高 * 100 │ │ │ │ 예시: │ │ ├── "kpi_002 * kpi_003" │ │ ├── "kpi_004 + kpi_005 + kpi_006" │ │ ├── "(kpi_002 - kpi_003) / kpi_002 * 100" │ │ └── "SUM(kpi_004, kpi_005, kpi_006)" │ │ │ │ 내장 함수: │ │ ├── SUM(...) : 합계 │ │ ├── AVG(...) : 평균 │ │ ├── MAX(...) : 최대값 │ │ ├── MIN(...) : 최소값 │ │ └── IF(cond, t, f): 조건부 │ │ │ └─────────────────────────────────────────────────────────────┘ ``` ### 3.2 수식 파서 구현 ```python import re from typing import Dict, Any, Callable import ast import operator class FormulaParser: """KPI 수식 파서""" OPERATORS = { ast.Add: operator.add, ast.Sub: operator.sub, ast.Mult: operator.mul, ast.Div: operator.truediv, } FUNCTIONS = { 'SUM': sum, 'AVG': lambda x: sum(x) / len(x) if x else 0, 'MAX': max, 'MIN': min, } def __init__(self): self.variables: Dict[str, float] = {} def parse_and_evaluate( self, formula: str, variables: Dict[str, float] ) -> float: """ 수식 파싱 및 계산 Args: formula: "kpi_002 * kpi_003" variables: {"kpi_002": 100, "kpi_003": 50000} Returns: 5000000 """ self.variables = variables # 변수명을 값으로 치환 expression = self._substitute_variables(formula) # 함수 처리 expression = self._handle_functions(expression) # 안전한 eval return self._safe_eval(expression) def _substitute_variables(self, formula: str) -> str: """변수명을 값으로 치환""" # kpi_xxx 형태의 변수명 찾기 pattern = r'\b(kpi_\w+)\b' def replacer(match): var_name = match.group(1) if var_name in self.variables: return str(self.variables[var_name]) raise ValueError(f"Unknown variable: {var_name}") return re.sub(pattern, replacer, formula) def _handle_functions(self, expression: str) -> str: """내장 함수 처리""" # SUM(a, b, c) -> (a + b + c) for func_name, func in self.FUNCTIONS.items(): pattern = rf'{func_name}\(([^)]+)\)' match = re.search(pattern, expression) if match: args = match.group(1).split(',') args = [arg.strip() for arg in args] values = [float(arg) for arg in args] result = func(values) expression = re.sub(pattern, str(result), expression, count=1) return expression def _safe_eval(self, expression: str) -> float: """안전한 수식 계산 (AST 사용)""" try: tree = ast.parse(expression, mode='eval') return self._eval_node(tree.body) except Exception as e: raise ValueError(f"Invalid formula: {expression}, Error: {e}") def _eval_node(self, node: ast.AST) -> float: """AST 노드 평가""" if isinstance(node, ast.Constant): return float(node.value) elif isinstance(node, ast.Num): # Python 3.7 호환 return float(node.n) elif isinstance(node, ast.BinOp): left = self._eval_node(node.left) right = self._eval_node(node.right) op = self.OPERATORS.get(type(node.op)) if op: return op(left, right) raise ValueError(f"Unsupported operator: {type(node.op)}") elif isinstance(node, ast.UnaryOp): operand = self._eval_node(node.operand) if isinstance(node.op, ast.USub): return -operand elif isinstance(node.op, ast.UAdd): return operand raise ValueError(f"Unsupported node type: {type(node)}") def get_dependencies(self, formula: str) -> List[str]: """수식에서 의존 KPI 추출""" pattern = r'\b(kpi_\w+)\b' return list(set(re.findall(pattern, formula))) # 사용 예시 parser = FormulaParser() # 기본 계산 result = parser.parse_and_evaluate( formula="kpi_002 * kpi_003", variables={"kpi_002": 100, "kpi_003": 50000} ) print(result) # 5000000 # 복잡한 수식 result = parser.parse_and_evaluate( formula="(kpi_001 - kpi_002) / kpi_001 * 100", variables={"kpi_001": 2000000, "kpi_002": 1500000} ) print(result) # 25.0 (영업이익률 25%) ``` --- ## 4. 계산 엔진 (Calculation Engine) ### 4.1 트리 순회 계산 ```python class KPICalculationEngine: """KPI 계산 엔진""" def __init__(self, tree: KPITree): self.tree = tree self.parser = FormulaParser() self.cache: Dict[str, float] = {} def calculate_all( self, data: Dict[str, float], # node_id -> actual value period: str = "current" ) -> Dict[str, float]: """ 전체 트리 계산 리프 노드부터 상향식(Bottom-Up)으로 계산 """ self.cache = data.copy() # 1. 위상 정렬 (리프 -> 루트 순서) order = self._topological_sort() # 2. 순서대로 계산 for node_id in order: node = self.tree.get_node(node_id) if not node: continue # 리프 노드는 이미 데이터에 있음 if node_id in self.cache: continue # 수식이 있으면 계산 if node.formula: try: value = self.parser.parse_and_evaluate( node.formula, self.cache ) self.cache[node_id] = value except Exception as e: print(f"Error calculating {node_id}: {e}") self.cache[node_id] = 0 return self.cache def _topological_sort(self) -> List[str]: """위상 정렬 (리프 -> 루트)""" in_degree = {nid: 0 for nid in self.tree.nodes} # 부모 방향으로의 간선 카운트 for node in self.tree.nodes.values(): if node.parent_id: in_degree[node.parent_id] = in_degree.get(node.parent_id, 0) + 1 # 리프 노드부터 시작 queue = [nid for nid, deg in in_degree.items() if deg == 0] result = [] while queue: node_id = queue.pop(0) result.append(node_id) node = self.tree.get_node(node_id) if node and node.parent_id: in_degree[node.parent_id] -= 1 if in_degree[node.parent_id] == 0: queue.append(node.parent_id) return result def calculate_variance( self, plan: Dict[str, float], actual: Dict[str, float] ) -> Dict[str, Dict[str, float]]: """예실 차이 계산""" result = {} plan_calculated = self.calculate_all(plan, "plan") actual_calculated = self.calculate_all(actual, "actual") for node_id in self.tree.nodes: p = plan_calculated.get(node_id, 0) a = actual_calculated.get(node_id, 0) variance = a - p variance_rate = (variance / p * 100) if p != 0 else 0 achievement_rate = (a / p * 100) if p != 0 else 0 result[node_id] = { "plan": p, "actual": a, "variance": variance, "variance_rate": variance_rate, "achievement_rate": achievement_rate } return result ``` ### 4.2 병목 분석 ```python class KPIBottleneckAnalyzer: """KPI 병목 분석기""" def __init__(self, tree: KPITree): self.tree = tree def find_bottlenecks( self, variance_data: Dict[str, Dict[str, float]], threshold: float = -10.0 # 달성률 90% 미만 ) -> List[Dict[str, Any]]: """ 병목 KPI 찾기 달성률이 낮은 KPI 중에서 자식 노드보다 성과가 나쁜 노드를 병목으로 판단 """ bottlenecks = [] for node_id, data in variance_data.items(): node = self.tree.get_node(node_id) if not node: continue achievement = data.get("achievement_rate", 100) # 달성률 기준 미달 if achievement < (100 + threshold): # 자식들의 평균 달성률 계산 children = self.tree.get_children(node_id) if children: child_achievements = [ variance_data.get(c.id, {}).get("achievement_rate", 100) for c in children ] avg_child_achievement = sum(child_achievements) / len(child_achievements) # 자식보다 성과가 나쁘면 병목 if achievement < avg_child_achievement: bottlenecks.append({ "node_id": node_id, "node_name": node.name, "category": node.category.value, "achievement_rate": achievement, "variance_rate": data.get("variance_rate", 0), "impact_level": self._calculate_impact(node), "suggested_actions": self._suggest_actions(node, data) }) else: # 리프 노드면서 달성률 낮음 = 직접 병목 bottlenecks.append({ "node_id": node_id, "node_name": node.name, "category": node.category.value, "achievement_rate": achievement, "variance_rate": data.get("variance_rate", 0), "impact_level": self._calculate_impact(node), "suggested_actions": self._suggest_actions(node, data) }) # 영향도 순으로 정렬 return sorted(bottlenecks, key=lambda x: x["impact_level"], reverse=True) def _calculate_impact(self, node: KPINode) -> float: """KGI에 미치는 영향도 계산""" # 레벨이 낮을수록 (루트에 가까울수록) 영향도 높음 return 100 / (node.level + 1) def _suggest_actions( self, node: KPINode, data: Dict[str, float] ) -> List[str]: """개선 액션 제안""" actions = [] category = node.category achievement = data.get("achievement_rate", 100) if category == KPICategory.MARKETING: if "リード" in node.name: actions.append("広告予算の増額を検討") actions.append("新しいチャネルの開拓") elif "CVR" in node.name or "率" in node.name: actions.append("LPの改善") actions.append("ターゲティングの見直し") elif category == KPICategory.SALES: if "成約" in node.name: actions.append("営業トークの改善") actions.append("提案資料の強化") elif "商談" in node.name: actions.append("インサイドセールスの強化") actions.append("ナーチャリング施策の実施") elif category == KPICategory.HR: if "離職" in node.name or "退職" in node.name: actions.append("1on1ミーティングの強化") actions.append("報酬・福利厚生の見直し") if not actions: actions.append(f"{node.name}の詳細分析が必要") return actions ``` --- ## 5. 시뮬레이션 엔진 ### 5.1 What-If 분석 ```python class KPISimulator: """KPI 시뮬레이션 엔진""" def __init__(self, tree: KPITree, calculation_engine: KPICalculationEngine): self.tree = tree self.engine = calculation_engine def simulate_what_if( self, base_data: Dict[str, float], changes: Dict[str, float] # node_id -> new value ) -> Dict[str, Any]: """ What-If 분석 특정 KPI를 변경했을 때 KGI(매출 등)에 미치는 영향 계산 Args: base_data: 현재 데이터 changes: {"kpi_007": 450} # 리드수를 450으로 변경 Returns: { "before": {"kpi_001": 1800000, ...}, "after": {"kpi_001": 2025000, ...}, "impact": {"kpi_001": 225000, ...} } """ # 변경 전 계산 before = self.engine.calculate_all(base_data) # 변경 적용 modified_data = base_data.copy() modified_data.update(changes) # 변경 후 계산 after = self.engine.calculate_all(modified_data) # 영향도 계산 impact = {} for node_id in before: impact[node_id] = { "absolute": after.get(node_id, 0) - before.get(node_id, 0), "percentage": ( (after.get(node_id, 0) - before.get(node_id, 0)) / before.get(node_id, 1) * 100 ) if before.get(node_id, 0) != 0 else 0 } return { "before": before, "after": after, "impact": impact, "summary": self._generate_summary(before, after, changes) } def find_required_improvement( self, current_data: Dict[str, float], target_kgi: str, target_value: float, adjustable_kpis: List[str] ) -> Dict[str, float]: """ 목표 달성을 위한 필요 개선량 계산 Args: current_data: 현재 데이터 target_kgi: "kpi_001" (매출) target_value: 2000000 (목표 매출) adjustable_kpis: ["kpi_007"] (조정 가능한 KPI = 리드수) Returns: {"kpi_007": 420} # 리드수를 420으로 올려야 함 """ current_kgi = self.engine.calculate_all(current_data).get(target_kgi, 0) gap = target_value - current_kgi if gap <= 0: return {} # 이미 달성 # 각 KPI의 민감도 분석 sensitivities = {} for kpi_id in adjustable_kpis: current_value = current_data.get(kpi_id, 0) if current_value == 0: continue # 1% 변경 시 KGI 변화량 test_data = current_data.copy() test_data[kpi_id] = current_value * 1.01 new_kgi = self.engine.calculate_all(test_data).get(target_kgi, 0) sensitivity = (new_kgi - current_kgi) / (current_value * 0.01) sensitivities[kpi_id] = sensitivity # 민감도가 높은 KPI부터 개선 필요량 계산 required = {} remaining_gap = gap for kpi_id in sorted(sensitivities, key=sensitivities.get, reverse=True): if remaining_gap <= 0: break sensitivity = sensitivities[kpi_id] current_value = current_data.get(kpi_id, 0) # 필요 증가량 increase_needed = remaining_gap / sensitivity if sensitivity != 0 else 0 new_value = current_value + increase_needed required[kpi_id] = { "current": current_value, "required": new_value, "increase": increase_needed, "increase_rate": (increase_needed / current_value * 100) if current_value != 0 else 0 } remaining_gap = 0 # 단순화: 하나의 KPI로 달성 가정 return required def _generate_summary( self, before: Dict[str, float], after: Dict[str, float], changes: Dict[str, float] ) -> str: """시뮬레이션 결과 요약""" lines = [] lines.append("=== シミュレーション結果 ===") for kpi_id, new_value in changes.items(): node = self.tree.get_node(kpi_id) if node: old_value = before.get(kpi_id, 0) lines.append(f"変更: {node.name} {old_value} → {new_value}") # KGI 변화 if self.tree.root_node_id: root = self.tree.get_node(self.tree.root_node_id) if root: old_kgi = before.get(self.tree.root_node_id, 0) new_kgi = after.get(self.tree.root_node_id, 0) change = new_kgi - old_kgi rate = (change / old_kgi * 100) if old_kgi != 0 else 0 lines.append(f"結果: {root.name} {old_kgi:,.0f} → {new_kgi:,.0f} ({rate:+.1f}%)") return "\n".join(lines) ``` --- ## 6. 엔진 통합 ### 6.1 KPI Engine Facade ```python class KPIEngine: """KPI 엔진 파사드""" def __init__(self, tree: KPITree): self.tree = tree self.builder = KPITreeBuilder() self.calculator = KPICalculationEngine(tree) self.analyzer = KPIBottleneckAnalyzer(tree) self.simulator = KPISimulator(tree, self.calculator) self.visualizer = KPITreeVisualizer() def calculate( self, plan_data: Dict[str, float], actual_data: Dict[str, float] ) -> Dict[str, Any]: """전체 계산 실행""" return { "plan": self.calculator.calculate_all(plan_data), "actual": self.calculator.calculate_all(actual_data), "variance": self.calculator.calculate_variance(plan_data, actual_data) } def analyze(self, variance_data: Dict[str, Dict[str, float]]) -> Dict[str, Any]: """분석 실행""" bottlenecks = self.analyzer.find_bottlenecks(variance_data) return { "bottlenecks": bottlenecks, "tree_visualization": self.visualizer.to_ascii(self.tree) } def simulate( self, base_data: Dict[str, float], changes: Dict[str, float] ) -> Dict[str, Any]: """시뮬레이션 실행""" return self.simulator.simulate_what_if(base_data, changes) def validate(self) -> List[str]: """트리 검증""" errors = [] # 순환 참조 검사 if self.builder._has_cycle(): errors.append("Circular reference detected") # 수식 검증 for node in self.tree.nodes.values(): if node.formula: deps = FormulaParser().get_dependencies(node.formula) for dep in deps: if dep not in self.tree.nodes: errors.append(f"Unknown dependency '{dep}' in {node.id}") return errors ``` --- ## 7. 결론 ### KPI 트리 엔진 핵심 기능 | 컴포넌트 | 역할 | 핵심 알고리즘 | |----------|------|---------------| | TreeBuilder | 트리 구축 | BFS, 순환 검사 | | FormulaParser | 수식 계산 | AST 파싱, 안전한 eval | | CalculationEngine | 전체 계산 | 위상 정렬, 상향식 계산 | | BottleneckAnalyzer | 병목 분석 | 달성률 비교, 영향도 | | Simulator | What-If | 민감도 분석 | ### 설계 특징 1. **안전한 수식 계산**: AST 기반 파싱 (eval 보안 문제 회피) 2. **효율적인 계산**: 위상 정렬로 의존성 순서 계산 3. **확장 가능**: 새로운 함수/연산자 추가 용이 --- **다음 편: [v7] API 설계** *RESTful API 스펙, 엔드포인트 정의, 인증/권한 처리를 다룹니다.*