# PlanitAI KPI 개발기 v9: AI 분석 엔진 설계 > 시리즈: PlanitAI KPI 개발 여정 (9/16) > 작성일: 2024년 12월 ## 개요 PlanitAI KPI의 핵심 차별화 포인트는 **AI 기반 분석 엔진**입니다. 단순히 KPI 수치를 보여주는 것을 넘어, AI가 데이터를 분석하고 인사이트를 제공합니다. 이번 글에서는 Gemini 2.0을 활용한 AI 분석 엔진의 설계를 다룹니다. --- ## 1. AI 분석 엔진 아키텍처 ### 1.1 전체 구조 ``` ┌─────────────────────────────────────────────────────────────┐ │ AI Analysis Engine │ ├─────────────────────────────────────────────────────────────┤ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────┐ │ │ │ Context │ │ Prompt │ │ Response │ │ │ │ Builder │→ │ Engine │→ │ Processor │ │ │ └─────────────┘ └─────────────┘ └─────────────────────┘ │ │ ↑ ↑ ↓ │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────┐ │ │ │ Data │ │ Template │ │ Insight │ │ │ │ Fetcher │ │ Library │ │ Generator │ │ │ └─────────────┘ └─────────────┘ └─────────────────────┘ │ ├─────────────────────────────────────────────────────────────┤ │ Gemini 2.0 Flash API │ └─────────────────────────────────────────────────────────────┘ ``` ### 1.2 주요 컴포넌트 ```python from abc import ABC, abstractmethod from dataclasses import dataclass from enum import Enum from typing import Optional import google.generativeai as genai class AnalysisType(Enum): """분석 유형""" TREND = "trend" # 추세 분석 ANOMALY = "anomaly" # 이상 감지 CORRELATION = "correlation" # 상관관계 분석 FORECAST = "forecast" # 예측 BOTTLENECK = "bottleneck" # 병목 분석 RECOMMENDATION = "recommendation" # 개선 추천 @dataclass class AnalysisContext: """분석 컨텍스트""" organization_id: str tree_id: str period_start: str period_end: str target_nodes: list[str] analysis_type: AnalysisType language: str = "ja" # 출력 언어 detail_level: str = "executive" # executive, manager, analyst @dataclass class AnalysisResult: """분석 결과""" summary: str insights: list[dict] recommendations: list[dict] confidence: float metadata: dict ``` --- ## 2. Context Builder - 데이터 수집 ### 2.1 분석용 데이터 준비 ```python class AnalysisContextBuilder: """분석 컨텍스트 빌더""" def __init__(self, db_session, tree_service: KPITreeService): self.db = db_session self.tree_service = tree_service async def build(self, context: AnalysisContext) -> dict: """분석용 컨텍스트 구축""" # 1. KPI 트리 구조 조회 tree_structure = await self._get_tree_structure(context.tree_id) # 2. 기간별 데이터 조회 historical_data = await self._get_historical_data( context.tree_id, context.period_start, context.period_end ) # 3. 목표 대비 실적 performance = await self._calculate_performance( context.target_nodes, historical_data ) # 4. 전년 동기 비교 yoy_comparison = await self._get_yoy_comparison( context.tree_id, context.period_start, context.period_end ) # 5. 업계 벤치마크 (있는 경우) benchmarks = await self._get_benchmarks( context.organization_id ) return { "tree_structure": tree_structure, "historical_data": historical_data, "performance": performance, "yoy_comparison": yoy_comparison, "benchmarks": benchmarks, "metadata": { "period": f"{context.period_start} ~ {context.period_end}", "node_count": len(tree_structure.get("nodes", [])), "data_points": len(historical_data) } } async def _get_tree_structure(self, tree_id: str) -> dict: """KPI 트리 구조를 JSON으로 변환""" tree = await self.tree_service.get_tree(tree_id) def node_to_dict(node): return { "id": node.id, "name": node.name, "type": node.node_type.value, "category": node.category.value, "unit": node.unit, "formula": node.formula, "children": [node_to_dict(c) for c in node.children] } return { "name": tree.name, "kgi": node_to_dict(tree.root_node), "nodes": [node_to_dict(n) for n in tree.all_nodes] } async def _calculate_performance( self, target_nodes: list[str], historical_data: list[dict] ) -> list[dict]: """목표 대비 실적 계산""" results = [] for node_id in target_nodes: node_data = [d for d in historical_data if d["node_id"] == node_id] if not node_data: continue latest = node_data[-1] achievement_rate = ( latest["actual"] / latest["target"] * 100 if latest.get("target") else None ) # 추세 계산 (단순 선형 회귀) if len(node_data) >= 3: trend = self._calculate_trend(node_data) else: trend = None results.append({ "node_id": node_id, "node_name": latest.get("node_name"), "actual": latest["actual"], "target": latest.get("target"), "achievement_rate": achievement_rate, "trend": trend, "history": node_data }) return results def _calculate_trend(self, data: list[dict]) -> str: """추세 계산""" values = [d["actual"] for d in data] # 단순 이동 평균 비교 recent_avg = sum(values[-3:]) / 3 older_avg = sum(values[:-3]) / max(len(values) - 3, 1) change_rate = (recent_avg - older_avg) / older_avg * 100 if older_avg else 0 if change_rate > 5: return "improving" elif change_rate < -5: return "declining" else: return "stable" ``` --- ## 3. Prompt Engine - 프롬프트 설계 ### 3.1 프롬프트 템플릿 시스템 ```python class PromptTemplate: """프롬프트 템플릿 기본 클래스""" def __init__(self, template: str, variables: list[str]): self.template = template self.variables = variables def render(self, **kwargs) -> str: """템플릿 렌더링""" # 필수 변수 검증 missing = set(self.variables) - set(kwargs.keys()) if missing: raise ValueError(f"Missing variables: {missing}") return self.template.format(**kwargs) class PromptLibrary: """프롬프트 라이브러리""" SYSTEM_PROMPT = """あなたは企業のKPI分析の専門家です。 以下の役割を担当します: 1. データ分析: KPIデータを分析し、傾向やパターンを特定 2. 問題発見: ボトルネックや異常値を検出 3. 改善提案: 具体的で実行可能な改善策を提案 4. 予測: 将来のパフォーマンスを予測 回答のルール: - 数値は具体的に記載(例:前月比+15.3%) - 日本語で回答 - 経営層にも分かりやすい表現を使用 - 提案は優先順位をつけて記載 """ TREND_ANALYSIS = PromptTemplate( template="""## KPIトレンド分析依頼 ### 分析対象 - 期間: {period} - 対象KPI: {target_kpis} ### KPIツリー構造 ```json {tree_structure} ``` ### 実績データ ```json {performance_data} ``` ### 分析してほしいこと 1. 各KPIの推移傾向 2. 目標達成率の評価 3. 注目すべき変化ポイント 4. 相関関係のあるKPI ### 出力形式 以下のJSON形式で回答してください: ```json {{ "summary": "全体サマリー(2-3文)", "trends": [ {{ "kpi_name": "KPI名", "trend": "improving/stable/declining", "change_rate": "変化率", "insight": "インサイト" }} ], "notable_points": ["注目ポイント1", "注目ポイント2"], "correlations": [ {{ "kpi_a": "KPI A", "kpi_b": "KPI B", "relationship": "相関関係の説明" }} ] }} ``` """, variables=["period", "target_kpis", "tree_structure", "performance_data"] ) BOTTLENECK_ANALYSIS = PromptTemplate( template="""## ボトルネック分析依頼 ### 状況 KGI({kgi_name})の目標達成率が{achievement_rate}%です。 原因となっているKPIを特定してください。 ### KPIツリー構造 ```json {tree_structure} ``` ### 各KPIの達成状況 ```json {performance_data} ``` ### 分析してほしいこと 1. KGI未達の主要因となっているKPI 2. 各要因の影響度(インパクト) 3. 改善の優先順位 4. 具体的な改善アクション ### 出力形式 ```json {{ "bottlenecks": [ {{ "kpi_name": "KPI名", "current_value": 現在値, "target_value": 目標値, "gap": 乖離, "impact_score": 1-10のインパクトスコア, "root_cause": "根本原因の推測", "actions": ["改善アクション1", "改善アクション2"] }} ], "priority_order": ["優先度高いKPI順"], "quick_wins": ["すぐに実行できる施策"], "strategic_initiatives": ["中長期的な取り組み"] }} ``` """, variables=["kgi_name", "achievement_rate", "tree_structure", "performance_data"] ) FORECAST_ANALYSIS = PromptTemplate( template="""## 予測分析依頼 ### 予測対象 - KPI: {target_kpi} - 予測期間: {forecast_period} ### 過去データ ```json {historical_data} ``` ### 外部要因(参考情報) {external_factors} ### 分析してほしいこと 1. 今後の推移予測 2. 目標達成の見通し 3. リスク要因 4. 達成に向けた施策 ### 出力形式 ```json {{ "forecast": [ {{"period": "期間", "predicted_value": 予測値, "confidence": 信頼度}} ], "target_achievement_probability": "目標達成確率(%)", "risks": [ {{"factor": "リスク要因", "probability": "発生確率", "impact": "影響度"}} ], "recommendations": ["推奨施策"] }} ``` """, variables=["target_kpi", "forecast_period", "historical_data", "external_factors"] ) EXECUTIVE_SUMMARY = PromptTemplate( template="""## エグゼクティブサマリー作成依頼 ### 報告対象期間 {period} ### KPI実績サマリー ```json {performance_summary} ``` ### 主要トピックス {key_topics} ### 作成してほしいもの 経営層向けのエグゼクティブサマリーを作成してください。 要件: - 3分で読める長さ - 数字を明確に - 課題と対策をセットで - アクションアイテムを明確に ### 出力形式 ```json {{ "headline": "今月の一言(20字以内)", "performance_overview": "全体パフォーマンス概要(3-4文)", "highlights": [ {{"type": "positive/negative/neutral", "content": "ハイライト内容"}} ], "key_metrics": [ {{"name": "指標名", "value": "値", "vs_target": "対目標", "vs_last_month": "対前月"}} ], "issues_and_actions": [ {{"issue": "課題", "action": "対策", "owner": "担当", "deadline": "期限"}} ], "next_month_focus": ["来月の重点項目"] }} ``` """, variables=["period", "performance_summary", "key_topics"] ) ``` --- ## 4. AI Client - Gemini 連携 ### 4.1 Gemini クライアント実装 ```python import json import re from typing import Optional import google.generativeai as genai class GeminiClient: """Gemini API クライアント""" def __init__(self, api_key: str, model: str = "gemini-2.0-flash-exp"): genai.configure(api_key=api_key) self.model = genai.GenerativeModel(model) # 生成設定 self.generation_config = genai.GenerationConfig( temperature=0.3, # 分析は低めの温度で安定させる top_p=0.8, top_k=40, max_output_tokens=4096, ) # 安全性設定 self.safety_settings = [ {"category": "HARM_CATEGORY_HARASSMENT", "threshold": "BLOCK_NONE"}, {"category": "HARM_CATEGORY_HATE_SPEECH", "threshold": "BLOCK_NONE"}, {"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", "threshold": "BLOCK_NONE"}, {"category": "HARM_CATEGORY_DANGEROUS_CONTENT", "threshold": "BLOCK_NONE"}, ] async def analyze( self, system_prompt: str, user_prompt: str, response_schema: Optional[dict] = None ) -> dict: """分析実行""" # プロンプト構築 full_prompt = f"{system_prompt}\n\n{user_prompt}" try: # API呼び出し response = await self.model.generate_content_async( full_prompt, generation_config=self.generation_config, safety_settings=self.safety_settings ) # レスポンス解析 return self._parse_response(response.text, response_schema) except Exception as e: return { "error": str(e), "success": False } def _parse_response(self, text: str, schema: Optional[dict]) -> dict: """レスポンスをJSONとしてパース""" # JSONブロックを抽出 json_match = re.search(r'```json\s*([\s\S]*?)\s*```', text) if json_match: try: return json.loads(json_match.group(1)) except json.JSONDecodeError: pass # JSONブロックがない場合、全体をパース試行 try: return json.loads(text) except json.JSONDecodeError: # パース失敗時はテキストとして返す return { "raw_response": text, "parsed": False } class AIAnalysisEngine: """AI分析エンジン統合クラス""" def __init__( self, gemini_client: GeminiClient, context_builder: AnalysisContextBuilder, prompt_library: PromptLibrary ): self.gemini = gemini_client self.context_builder = context_builder self.prompts = prompt_library async def analyze_trends(self, context: AnalysisContext) -> AnalysisResult: """トレンド分析""" # コンテキスト構築 data = await self.context_builder.build(context) # プロンプト生成 prompt = self.prompts.TREND_ANALYSIS.render( period=data["metadata"]["period"], target_kpis=json.dumps([n["node_name"] for n in data["performance"]]), tree_structure=json.dumps(data["tree_structure"], ensure_ascii=False), performance_data=json.dumps(data["performance"], ensure_ascii=False) ) # AI分析実行 result = await self.gemini.analyze( system_prompt=self.prompts.SYSTEM_PROMPT, user_prompt=prompt ) return AnalysisResult( summary=result.get("summary", ""), insights=result.get("trends", []), recommendations=result.get("notable_points", []), confidence=0.85, # TODO: 信頼度計算ロジック metadata={"analysis_type": "trend", "raw_response": result} ) async def analyze_bottlenecks(self, context: AnalysisContext) -> AnalysisResult: """ボトルネック分析""" data = await self.context_builder.build(context) # KGI情報取得 kgi = data["tree_structure"]["kgi"] kgi_performance = next( (p for p in data["performance"] if p["node_id"] == kgi["id"]), {"achievement_rate": 0} ) prompt = self.prompts.BOTTLENECK_ANALYSIS.render( kgi_name=kgi["name"], achievement_rate=kgi_performance.get("achievement_rate", 0), tree_structure=json.dumps(data["tree_structure"], ensure_ascii=False), performance_data=json.dumps(data["performance"], ensure_ascii=False) ) result = await self.gemini.analyze( system_prompt=self.prompts.SYSTEM_PROMPT, user_prompt=prompt ) return AnalysisResult( summary=f"KGI達成率 {kgi_performance.get('achievement_rate', 0)}% のボトルネック分析", insights=result.get("bottlenecks", []), recommendations=result.get("quick_wins", []) + result.get("strategic_initiatives", []), confidence=0.80, metadata={"analysis_type": "bottleneck", "priority_order": result.get("priority_order", [])} ) async def generate_executive_summary(self, context: AnalysisContext) -> AnalysisResult: """エグゼクティブサマリー生成""" data = await self.context_builder.build(context) # 主要トピックスを自動生成 key_topics = self._extract_key_topics(data) prompt = self.prompts.EXECUTIVE_SUMMARY.render( period=data["metadata"]["period"], performance_summary=json.dumps(data["performance"], ensure_ascii=False), key_topics="\n".join([f"- {t}" for t in key_topics]) ) result = await self.gemini.analyze( system_prompt=self.prompts.SYSTEM_PROMPT, user_prompt=prompt ) return AnalysisResult( summary=result.get("headline", ""), insights=result.get("highlights", []), recommendations=result.get("issues_and_actions", []), confidence=0.90, metadata={ "analysis_type": "executive_summary", "key_metrics": result.get("key_metrics", []), "next_month_focus": result.get("next_month_focus", []) } ) def _extract_key_topics(self, data: dict) -> list[str]: """主要トピックス自動抽出""" topics = [] for perf in data.get("performance", []): rate = perf.get("achievement_rate") if rate is None: continue if rate >= 120: topics.append(f"{perf['node_name']}が目標を大幅超過({rate:.1f}%)") elif rate < 80: topics.append(f"{perf['node_name']}が目標未達({rate:.1f}%)") return topics[:5] # 上位5つ ``` --- ## 5. Response Processor - 結果加工 ### 5.1 分析結果の後処理 ```python class ResponseProcessor: """AI応答の後処理""" def __init__(self): self.validators = { "trend": self._validate_trend_response, "bottleneck": self._validate_bottleneck_response, "executive_summary": self._validate_summary_response } def process(self, result: AnalysisResult) -> AnalysisResult: """結果の検証と加工""" analysis_type = result.metadata.get("analysis_type") # 1. バリデーション validator = self.validators.get(analysis_type) if validator: is_valid, errors = validator(result) if not is_valid: result.metadata["validation_errors"] = errors result.confidence *= 0.5 # 2. 信頼度調整 result.confidence = self._adjust_confidence(result) # 3. 出力フォーマット統一 result = self._normalize_output(result) return result def _validate_trend_response(self, result: AnalysisResult) -> tuple[bool, list]: """トレンド分析結果の検証""" errors = [] for insight in result.insights: if "trend" in insight: if insight["trend"] not in ["improving", "stable", "declining"]: errors.append(f"Invalid trend value: {insight['trend']}") return len(errors) == 0, errors def _validate_bottleneck_response(self, result: AnalysisResult) -> tuple[bool, list]: """ボトルネック分析結果の検証""" errors = [] for insight in result.insights: if "impact_score" in insight: score = insight["impact_score"] if not isinstance(score, (int, float)) or score < 1 or score > 10: errors.append(f"Invalid impact_score: {score}") return len(errors) == 0, errors def _validate_summary_response(self, result: AnalysisResult) -> tuple[bool, list]: """サマリー結果の検証""" errors = [] if len(result.summary) > 30: # 20字 + バッファ errors.append("Headline too long") return len(errors) == 0, errors def _adjust_confidence(self, result: AnalysisResult) -> float: """信頼度調整""" confidence = result.confidence # データポイントが少ない場合は信頼度を下げる data_points = result.metadata.get("data_points", 0) if data_points < 3: confidence *= 0.7 elif data_points < 6: confidence *= 0.85 # バリデーションエラーがある場合 if result.metadata.get("validation_errors"): confidence *= 0.5 return min(max(confidence, 0.0), 1.0) def _normalize_output(self, result: AnalysisResult) -> AnalysisResult: """出力フォーマット統一""" # 推奨事項を優先度順にソート if result.recommendations: # impact_score がある場合はそれでソート result.recommendations = sorted( result.recommendations, key=lambda x: x.get("impact_score", 0) if isinstance(x, dict) else 0, reverse=True ) return result ``` --- ## 6. キャッシング戦略 ### 6.1 分析結果のキャッシュ ```python import hashlib import json from datetime import datetime, timedelta from typing import Optional import redis class AnalysisCacheManager: """分析結果キャッシュマネージャー""" def __init__(self, redis_client: redis.Redis): self.redis = redis_client self.default_ttl = timedelta(hours=1) # デフォルト1時間 # 分析タイプ別TTL self.ttl_by_type = { "trend": timedelta(hours=6), # トレンドは6時間 "bottleneck": timedelta(hours=2), # ボトルネックは2時間 "forecast": timedelta(hours=12), # 予測は12時間 "executive_summary": timedelta(hours=1) # サマリーは1時間 } def _generate_cache_key(self, context: AnalysisContext) -> str: """キャッシュキー生成""" key_data = { "org": context.organization_id, "tree": context.tree_id, "period": f"{context.period_start}_{context.period_end}", "nodes": sorted(context.target_nodes), "type": context.analysis_type.value, "lang": context.language } key_string = json.dumps(key_data, sort_keys=True) hash_value = hashlib.sha256(key_string.encode()).hexdigest()[:16] return f"analysis:{context.analysis_type.value}:{hash_value}" async def get(self, context: AnalysisContext) -> Optional[AnalysisResult]: """キャッシュ取得""" key = self._generate_cache_key(context) cached = self.redis.get(key) if cached: data = json.loads(cached) return AnalysisResult(**data) return None async def set(self, context: AnalysisContext, result: AnalysisResult) -> None: """キャッシュ保存""" key = self._generate_cache_key(context) ttl = self.ttl_by_type.get( context.analysis_type.value, self.default_ttl ) data = { "summary": result.summary, "insights": result.insights, "recommendations": result.recommendations, "confidence": result.confidence, "metadata": result.metadata, "cached_at": datetime.now().isoformat() } self.redis.setex(key, ttl, json.dumps(data, ensure_ascii=False)) async def invalidate(self, tree_id: str) -> int: """ツリーのキャッシュ無効化""" pattern = f"analysis:*:{tree_id[:8]}*" keys = self.redis.keys(pattern) if keys: return self.redis.delete(*keys) return 0 ``` --- ## 7. 使用例 ### 7.1 APIエンドポイントでの利用 ```python from fastapi import APIRouter, Depends, HTTPException router = APIRouter(prefix="/api/v1/analysis", tags=["analysis"]) @router.post("/trends") async def analyze_trends( request: TrendAnalysisRequest, engine: AIAnalysisEngine = Depends(get_analysis_engine), cache: AnalysisCacheManager = Depends(get_cache_manager) ): """トレンド分析API""" context = AnalysisContext( organization_id=request.organization_id, tree_id=request.tree_id, period_start=request.period_start, period_end=request.period_end, target_nodes=request.target_nodes or [], analysis_type=AnalysisType.TREND ) # キャッシュチェック cached = await cache.get(context) if cached: return {"result": cached, "from_cache": True} # AI分析実行 result = await engine.analyze_trends(context) # キャッシュ保存 await cache.set(context, result) return {"result": result, "from_cache": False} @router.post("/bottlenecks") async def analyze_bottlenecks( request: BottleneckAnalysisRequest, engine: AIAnalysisEngine = Depends(get_analysis_engine), cache: AnalysisCacheManager = Depends(get_cache_manager) ): """ボトルネック分析API""" context = AnalysisContext( organization_id=request.organization_id, tree_id=request.tree_id, period_start=request.period_start, period_end=request.period_end, target_nodes=request.target_nodes or [], analysis_type=AnalysisType.BOTTLENECK ) cached = await cache.get(context) if cached: return {"result": cached, "from_cache": True} result = await engine.analyze_bottlenecks(context) await cache.set(context, result) return {"result": result, "from_cache": False} @router.post("/executive-summary") async def generate_summary( request: ExecutiveSummaryRequest, engine: AIAnalysisEngine = Depends(get_analysis_engine) ): """エグゼクティブサマリー生成API""" context = AnalysisContext( organization_id=request.organization_id, tree_id=request.tree_id, period_start=request.period_start, period_end=request.period_end, target_nodes=[], analysis_type=AnalysisType.RECOMMENDATION ) result = await engine.generate_executive_summary(context) return {"result": result} ``` --- ## 8. まとめ ### 設計のポイント | 観点 | 設計判断 | |------|---------| | **モデル選択** | Gemini 2.0 Flash(コスト効率と速度のバランス) | | **温度設定** | 0.3(分析の一貫性を重視) | | **プロンプト構造** | テンプレート + JSON出力形式 | | **キャッシング** | Redis + 分析タイプ別TTL | | **エラー処理** | フォールバック + 信頼度調整 | ### 次回予告 v10では**テスト戦略**を設計します: - ユニットテスト設計 - 統合テスト設計 - E2Eテスト設計 - AI応答のモック戦略 --- *PlanitAI KPI - AI が あなたの KPI を計画し分析します*