1. 电商搜索系统案例

1.1 系统架构设计

# 电商搜索系统架构
from elasticsearch import Elasticsearch
from elasticsearch_dsl import Document, Text, Keyword, Integer, Float, Date, Boolean, Completion, Nested
from elasticsearch.helpers import bulk
import redis
import json
from datetime import datetime, timedelta
import logging
from typing import List, Dict, Optional

class EcommerceSearchSystem:
    """电商搜索系统"""
    
    def __init__(self, es_hosts, redis_host='localhost', redis_port=6379):
        self.es = Elasticsearch(es_hosts)
        self.redis = redis.Redis(host=redis_host, port=redis_port, decode_responses=True)
        self.logger = logging.getLogger(__name__)
        
    def setup_indices(self):
        """设置索引"""
        # 产品索引
        product_mapping = {
            "settings": {
                "number_of_shards": 3,
                "number_of_replicas": 1,
                "analysis": {
                    "analyzer": {
                        "product_analyzer": {
                            "type": "custom",
                            "tokenizer": "standard",
                            "filter": ["lowercase", "stop", "snowball", "synonym"]
                        },
                        "search_analyzer": {
                            "type": "custom",
                            "tokenizer": "standard",
                            "filter": ["lowercase", "stop", "synonym"]
                        }
                    },
                    "filter": {
                        "synonym": {
                            "type": "synonym",
                            "synonyms": [
                                "手机,mobile,phone",
                                "电脑,computer,pc",
                                "笔记本,laptop,notebook"
                            ]
                        }
                    }
                }
            },
            "mappings": {
                "properties": {
                    "id": {"type": "keyword"},
                    "title": {
                        "type": "text",
                        "analyzer": "product_analyzer",
                        "search_analyzer": "search_analyzer",
                        "fields": {
                            "keyword": {"type": "keyword"},
                            "suggest": {"type": "completion"}
                        }
                    },
                    "description": {
                        "type": "text",
                        "analyzer": "product_analyzer"
                    },
                    "category": {
                        "type": "nested",
                        "properties": {
                            "id": {"type": "keyword"},
                            "name": {"type": "keyword"},
                            "level": {"type": "integer"}
                        }
                    },
                    "brand": {"type": "keyword"},
                    "price": {"type": "double"},
                    "original_price": {"type": "double"},
                    "discount": {"type": "float"},
                    "stock": {"type": "integer"},
                    "sales_count": {"type": "integer"},
                    "rating": {"type": "float"},
                    "review_count": {"type": "integer"},
                    "attributes": {
                        "type": "nested",
                        "properties": {
                            "name": {"type": "keyword"},
                            "value": {"type": "keyword"},
                            "unit": {"type": "keyword"}
                        }
                    },
                    "tags": {"type": "keyword"},
                    "images": {"type": "keyword"},
                    "status": {"type": "keyword"},
                    "created_at": {"type": "date"},
                    "updated_at": {"type": "date"},
                    "suggest": {
                        "type": "completion",
                        "analyzer": "simple",
                        "preserve_separators": True,
                        "preserve_position_increments": True,
                        "max_input_length": 50
                    },
                    "location": {"type": "geo_point"},
                    "popularity_score": {"type": "float"}
                }
            }
        }
        
        # 用户行为索引
        behavior_mapping = {
            "settings": {
                "number_of_shards": 5,
                "number_of_replicas": 1
            },
            "mappings": {
                "properties": {
                    "user_id": {"type": "keyword"},
                    "session_id": {"type": "keyword"},
                    "action": {"type": "keyword"},
                    "product_id": {"type": "keyword"},
                    "query": {"type": "text"},
                    "category": {"type": "keyword"},
                    "timestamp": {"type": "date"},
                    "ip": {"type": "ip"},
                    "user_agent": {"type": "text"},
                    "referer": {"type": "keyword"},
                    "duration": {"type": "integer"},
                    "position": {"type": "integer"}
                }
            }
        }
        
        # 创建索引
        self.es.indices.create(index='products', body=product_mapping, ignore=400)
        self.es.indices.create(index='user_behaviors', body=behavior_mapping, ignore=400)
        
        self.logger.info("索引创建完成")

class ProductSearchService:
    """产品搜索服务"""
    
    def __init__(self, search_system: EcommerceSearchSystem):
        self.es = search_system.es
        self.redis = search_system.redis
        self.logger = search_system.logger
        
    def search_products(self, query: str, filters: Dict = None, 
                       sort: str = 'relevance', page: int = 1, 
                       size: int = 20, user_id: str = None) -> Dict:
        """搜索产品"""
        try:
            # 构建搜索查询
            search_body = self._build_search_query(query, filters, sort, page, size)
            
            # 执行搜索
            response = self.es.search(
                index='products',
                body=search_body
            )
            
            # 处理结果
            result = self._process_search_result(response)
            
            # 记录用户行为
            if user_id:
                self._log_search_behavior(user_id, query, filters, result['total'])
            
            # 缓存热门搜索结果
            if result['total'] > 0:
                cache_key = f"search:{hash(query + str(filters))}"
                self.redis.setex(cache_key, 300, json.dumps(result))
            
            return result
            
        except Exception as e:
            self.logger.error(f"搜索失败: {e}")
            return {'total': 0, 'products': [], 'aggregations': {}}
    
    def _build_search_query(self, query: str, filters: Dict, 
                           sort: str, page: int, size: int) -> Dict:
        """构建搜索查询"""
        search_body = {
            "query": {
                "bool": {
                    "must": [],
                    "filter": [
                        {"term": {"status": "active"}},
                        {"range": {"stock": {"gt": 0}}}
                    ],
                    "should": [],
                    "must_not": []
                }
            },
            "from": (page - 1) * size,
            "size": size,
            "aggs": self._build_aggregations(),
            "highlight": {
                "fields": {
                    "title": {},
                    "description": {}
                }
            }
        }
        
        # 添加文本查询
        if query:
            text_query = {
                "bool": {
                    "should": [
                        {
                            "multi_match": {
                                "query": query,
                                "fields": [
                                    "title^3",
                                    "description^1",
                                    "brand^2",
                                    "tags^1.5"
                                ],
                                "type": "best_fields",
                                "fuzziness": "AUTO"
                            }
                        },
                        {
                            "nested": {
                                "path": "attributes",
                                "query": {
                                    "multi_match": {
                                        "query": query,
                                        "fields": ["attributes.value"]
                                    }
                                }
                            }
                        }
                    ]
                }
            }
            search_body["query"]["bool"]["must"].append(text_query)
        else:
            search_body["query"]["bool"]["must"].append({"match_all": {}})
        
        # 添加过滤器
        if filters:
            self._add_filters(search_body, filters)
        
        # 添加排序
        search_body["sort"] = self._build_sort(sort, query)
        
        # 添加个性化推荐权重
        if query:
            search_body["query"]["bool"]["should"].extend([
                {
                    "function_score": {
                        "query": {"match_all": {}},
                        "functions": [
                            {
                                "field_value_factor": {
                                    "field": "popularity_score",
                                    "factor": 1.2,
                                    "modifier": "log1p",
                                    "missing": 1
                                }
                            },
                            {
                                "field_value_factor": {
                                    "field": "sales_count",
                                    "factor": 0.1,
                                    "modifier": "log1p",
                                    "missing": 1
                                }
                            }
                        ],
                        "score_mode": "multiply",
                        "boost_mode": "multiply"
                    }
                }
            ])
        
        return search_body
    
    def _add_filters(self, search_body: Dict, filters: Dict):
        """添加过滤器"""
        filter_queries = search_body["query"]["bool"]["filter"]
        
        # 分类过滤
        if filters.get('category'):
            filter_queries.append({
                "nested": {
                    "path": "category",
                    "query": {
                        "term": {"category.id": filters['category']}
                    }
                }
            })
        
        # 品牌过滤
        if filters.get('brand'):
            if isinstance(filters['brand'], list):
                filter_queries.append({
                    "terms": {"brand": filters['brand']}
                })
            else:
                filter_queries.append({
                    "term": {"brand": filters['brand']}
                })
        
        # 价格范围过滤
        if filters.get('price_min') or filters.get('price_max'):
            price_range = {}
            if filters.get('price_min'):
                price_range['gte'] = filters['price_min']
            if filters.get('price_max'):
                price_range['lte'] = filters['price_max']
            filter_queries.append({
                "range": {"price": price_range}
            })
        
        # 评分过滤
        if filters.get('rating_min'):
            filter_queries.append({
                "range": {"rating": {"gte": filters['rating_min']}}
            })
        
        # 属性过滤
        if filters.get('attributes'):
            for attr_name, attr_value in filters['attributes'].items():
                filter_queries.append({
                    "nested": {
                        "path": "attributes",
                        "query": {
                            "bool": {
                                "must": [
                                    {"term": {"attributes.name": attr_name}},
                                    {"term": {"attributes.value": attr_value}}
                                ]
                            }
                        }
                    }
                })
        
        # 地理位置过滤
        if filters.get('location') and filters.get('distance'):
            filter_queries.append({
                "geo_distance": {
                    "distance": filters['distance'],
                    "location": filters['location']
                }
            })
    
    def _build_sort(self, sort: str, query: str = None) -> List[Dict]:
        """构建排序"""
        sort_options = {
            'relevance': [{"_score": {"order": "desc"}}],
            'price_asc': [{"price": {"order": "asc"}}],
            'price_desc': [{"price": {"order": "desc"}}],
            'rating': [{"rating": {"order": "desc"}}],
            'sales': [{"sales_count": {"order": "desc"}}],
            'newest': [{"created_at": {"order": "desc"}}],
            'popularity': [{"popularity_score": {"order": "desc"}}]
        }
        
        base_sort = sort_options.get(sort, sort_options['relevance'])
        
        # 如果有查询词,优先按相关性排序
        if query and sort != 'relevance':
            base_sort.insert(0, {"_score": {"order": "desc"}})
        
        return base_sort
    
    def _build_aggregations(self) -> Dict:
        """构建聚合"""
        return {
            "categories": {
                "nested": {"path": "category"},
                "aggs": {
                    "category_names": {
                        "terms": {
                            "field": "category.name",
                            "size": 20
                        }
                    }
                }
            },
            "brands": {
                "terms": {
                    "field": "brand",
                    "size": 20
                }
            },
            "price_ranges": {
                "range": {
                    "field": "price",
                    "ranges": [
                        {"to": 100},
                        {"from": 100, "to": 500},
                        {"from": 500, "to": 1000},
                        {"from": 1000, "to": 5000},
                        {"from": 5000}
                    ]
                }
            },
            "ratings": {
                "range": {
                    "field": "rating",
                    "ranges": [
                        {"from": 4.5},
                        {"from": 4.0, "to": 4.5},
                        {"from": 3.5, "to": 4.0},
                        {"from": 3.0, "to": 3.5},
                        {"to": 3.0}
                    ]
                }
            },
            "attributes": {
                "nested": {"path": "attributes"},
                "aggs": {
                    "attribute_names": {
                        "terms": {
                            "field": "attributes.name",
                            "size": 10
                        },
                        "aggs": {
                            "attribute_values": {
                                "terms": {
                                    "field": "attributes.value",
                                    "size": 20
                                }
                            }
                        }
                    }
                }
            }
        }
    
    def _process_search_result(self, response: Dict) -> Dict:
        """处理搜索结果"""
        hits = response['hits']['hits']
        total = response['hits']['total']['value']
        
        products = []
        for hit in hits:
            product = hit['_source']
            product['score'] = hit['_score']
            
            # 添加高亮
            if 'highlight' in hit:
                product['highlight'] = hit['highlight']
            
            products.append(product)
        
        # 处理聚合结果
        aggregations = {}
        if 'aggregations' in response:
            aggs = response['aggregations']
            
            # 处理分类聚合
            if 'categories' in aggs:
                categories = []
                for bucket in aggs['categories']['category_names']['buckets']:
                    categories.append({
                        'name': bucket['key'],
                        'count': bucket['doc_count']
                    })
                aggregations['categories'] = categories
            
            # 处理品牌聚合
            if 'brands' in aggs:
                brands = []
                for bucket in aggs['brands']['buckets']:
                    brands.append({
                        'name': bucket['key'],
                        'count': bucket['doc_count']
                    })
                aggregations['brands'] = brands
            
            # 处理价格范围聚合
            if 'price_ranges' in aggs:
                price_ranges = []
                for bucket in aggs['price_ranges']['buckets']:
                    price_range = {
                        'count': bucket['doc_count']
                    }
                    if 'from' in bucket:
                        price_range['from'] = bucket['from']
                    if 'to' in bucket:
                        price_range['to'] = bucket['to']
                    price_ranges.append(price_range)
                aggregations['price_ranges'] = price_ranges
            
            # 处理评分聚合
            if 'ratings' in aggs:
                ratings = []
                for bucket in aggs['ratings']['buckets']:
                    rating_range = {
                        'count': bucket['doc_count']
                    }
                    if 'from' in bucket:
                        rating_range['from'] = bucket['from']
                    if 'to' in bucket:
                        rating_range['to'] = bucket['to']
                    ratings.append(rating_range)
                aggregations['ratings'] = ratings
            
            # 处理属性聚合
            if 'attributes' in aggs:
                attributes = []
                for attr_bucket in aggs['attributes']['attribute_names']['buckets']:
                    attr_name = attr_bucket['key']
                    attr_values = []
                    for value_bucket in attr_bucket['attribute_values']['buckets']:
                        attr_values.append({
                            'value': value_bucket['key'],
                            'count': value_bucket['doc_count']
                        })
                    attributes.append({
                        'name': attr_name,
                        'values': attr_values
                    })
                aggregations['attributes'] = attributes
        
        return {
            'total': total,
            'products': products,
            'aggregations': aggregations
        }
    
    def _log_search_behavior(self, user_id: str, query: str, 
                           filters: Dict, total_results: int):
        """记录搜索行为"""
        behavior = {
            'user_id': user_id,
            'action': 'search',
            'query': query,
            'filters': json.dumps(filters) if filters else None,
            'results_count': total_results,
            'timestamp': datetime.now().isoformat()
        }
        
        try:
            self.es.index(
                index='user_behaviors',
                body=behavior
            )
        except Exception as e:
            self.logger.error(f"记录用户行为失败: {e}")
    
    def get_suggestions(self, query: str, size: int = 10) -> List[Dict]:
        """获取搜索建议"""
        try:
            # 检查缓存
            cache_key = f"suggest:{query}"
            cached_result = self.redis.get(cache_key)
            if cached_result:
                return json.loads(cached_result)
            
            # 构建建议查询
            suggest_body = {
                "suggest": {
                    "product_suggest": {
                        "prefix": query,
                        "completion": {
                            "field": "suggest",
                            "size": size,
                            "skip_duplicates": True
                        }
                    }
                }
            }
            
            response = self.es.search(
                index='products',
                body=suggest_body
            )
            
            suggestions = []
            if 'suggest' in response:
                for option in response['suggest']['product_suggest'][0]['options']:
                    suggestions.append({
                        'text': option['text'],
                        'score': option['_score']
                    })
            
            # 缓存结果
            self.redis.setex(cache_key, 600, json.dumps(suggestions))
            
            return suggestions
            
        except Exception as e:
            self.logger.error(f"获取建议失败: {e}")
            return []
    
    def get_recommendations(self, user_id: str, size: int = 10) -> List[Dict]:
        """获取个性化推荐"""
        try:
            # 获取用户行为历史
            user_behaviors = self._get_user_behaviors(user_id)
            
            # 基于用户行为构建推荐查询
            recommend_body = self._build_recommendation_query(user_behaviors, size)
            
            response = self.es.search(
                index='products',
                body=recommend_body
            )
            
            recommendations = []
            for hit in response['hits']['hits']:
                product = hit['_source']
                product['score'] = hit['_score']
                product['reason'] = self._get_recommendation_reason(product, user_behaviors)
                recommendations.append(product)
            
            return recommendations
            
        except Exception as e:
            self.logger.error(f"获取推荐失败: {e}")
            return []
    
    def _get_user_behaviors(self, user_id: str, days: int = 30) -> List[Dict]:
        """获取用户行为历史"""
        try:
            query_body = {
                "query": {
                    "bool": {
                        "must": [
                            {"term": {"user_id": user_id}},
                            {
                                "range": {
                                    "timestamp": {
                                        "gte": (datetime.now() - timedelta(days=days)).isoformat()
                                    }
                                }
                            }
                        ]
                    }
                },
                "sort": [{"timestamp": {"order": "desc"}}],
                "size": 1000
            }
            
            response = self.es.search(
                index='user_behaviors',
                body=query_body
            )
            
            return [hit['_source'] for hit in response['hits']['hits']]
            
        except Exception as e:
            self.logger.error(f"获取用户行为失败: {e}")
            return []
    
    def _build_recommendation_query(self, user_behaviors: List[Dict], size: int) -> Dict:
        """构建推荐查询"""
        # 分析用户偏好
        preferred_categories = {}
        preferred_brands = {}
        viewed_products = set()
        
        for behavior in user_behaviors:
            if behavior.get('category'):
                preferred_categories[behavior['category']] = preferred_categories.get(behavior['category'], 0) + 1
            if behavior.get('brand'):
                preferred_brands[behavior['brand']] = preferred_brands.get(behavior['brand'], 0) + 1
            if behavior.get('product_id'):
                viewed_products.add(behavior['product_id'])
        
        # 构建推荐查询
        should_queries = []
        
        # 基于分类偏好
        for category, weight in sorted(preferred_categories.items(), key=lambda x: x[1], reverse=True)[:5]:
            should_queries.append({
                "nested": {
                    "path": "category",
                    "query": {
                        "term": {"category.name": category}
                    },
                    "boost": weight * 0.1
                }
            })
        
        # 基于品牌偏好
        for brand, weight in sorted(preferred_brands.items(), key=lambda x: x[1], reverse=True)[:5]:
            should_queries.append({
                "term": {
                    "brand": {
                        "value": brand,
                        "boost": weight * 0.1
                    }
                }
            })
        
        recommend_body = {
            "query": {
                "bool": {
                    "should": should_queries,
                    "filter": [
                        {"term": {"status": "active"}},
                        {"range": {"stock": {"gt": 0}}}
                    ],
                    "must_not": [
                        {"terms": {"id": list(viewed_products)}}
                    ]
                }
            },
            "sort": [
                {"_score": {"order": "desc"}},
                {"popularity_score": {"order": "desc"}}
            ],
            "size": size
        }
        
        return recommend_body
    
    def _get_recommendation_reason(self, product: Dict, user_behaviors: List[Dict]) -> str:
        """获取推荐理由"""
        reasons = []
        
        # 检查分类匹配
        user_categories = [b.get('category') for b in user_behaviors if b.get('category')]
        if product.get('category') and any(cat['name'] in user_categories for cat in product['category']):
            reasons.append("基于您的浏览分类")
        
        # 检查品牌匹配
        user_brands = [b.get('brand') for b in user_behaviors if b.get('brand')]
        if product.get('brand') in user_brands:
            reasons.append("基于您喜欢的品牌")
        
        # 检查热门程度
        if product.get('popularity_score', 0) > 0.8:
            reasons.append("热门商品")
        
        # 检查高评分
        if product.get('rating', 0) >= 4.5:
            reasons.append("高评分商品")
        
        return "、".join(reasons) if reasons else "为您推荐"

class SearchAnalyticsService:
    """搜索分析服务"""
    
    def __init__(self, search_system: EcommerceSearchSystem):
        self.es = search_system.es
        self.redis = search_system.redis
        self.logger = search_system.logger
    
    def get_search_analytics(self, start_date: str, end_date: str) -> Dict:
        """获取搜索分析数据"""
        try:
            analytics_body = {
                "query": {
                    "bool": {
                        "must": [
                            {"term": {"action": "search"}},
                            {
                                "range": {
                                    "timestamp": {
                                        "gte": start_date,
                                        "lte": end_date
                                    }
                                }
                            }
                        ]
                    }
                },
                "aggs": {
                    "total_searches": {
                        "value_count": {"field": "query"}
                    },
                    "unique_users": {
                        "cardinality": {"field": "user_id"}
                    },
                    "top_queries": {
                        "terms": {
                            "field": "query.keyword",
                            "size": 20
                        }
                    },
                    "zero_result_queries": {
                        "filter": {"term": {"results_count": 0}},
                        "aggs": {
                            "queries": {
                                "terms": {
                                    "field": "query.keyword",
                                    "size": 10
                                }
                            }
                        }
                    },
                    "search_trends": {
                        "date_histogram": {
                            "field": "timestamp",
                            "calendar_interval": "1d"
                        }
                    },
                    "avg_results_count": {
                        "avg": {"field": "results_count"}
                    }
                },
                "size": 0
            }
            
            response = self.es.search(
                index='user_behaviors',
                body=analytics_body
            )
            
            aggs = response['aggregations']
            
            return {
                'total_searches': aggs['total_searches']['value'],
                'unique_users': aggs['unique_users']['value'],
                'top_queries': [
                    {'query': bucket['key'], 'count': bucket['doc_count']}
                    for bucket in aggs['top_queries']['buckets']
                ],
                'zero_result_queries': [
                    {'query': bucket['key'], 'count': bucket['doc_count']}
                    for bucket in aggs['zero_result_queries']['queries']['buckets']
                ],
                'search_trends': [
                    {
                        'date': bucket['key_as_string'],
                        'count': bucket['doc_count']
                    }
                    for bucket in aggs['search_trends']['buckets']
                ],
                'avg_results_count': aggs['avg_results_count']['value']
            }
            
        except Exception as e:
            self.logger.error(f"获取搜索分析失败: {e}")
            return {}
    
    def get_product_analytics(self, start_date: str, end_date: str) -> Dict:
        """获取产品分析数据"""
        try:
            analytics_body = {
                "query": {
                    "bool": {
                        "must": [
                            {"terms": {"action": ["view", "click", "purchase"]}},
                            {
                                "range": {
                                    "timestamp": {
                                        "gte": start_date,
                                        "lte": end_date
                                    }
                                }
                            }
                        ]
                    }
                },
                "aggs": {
                    "top_viewed_products": {
                        "filter": {"term": {"action": "view"}},
                        "aggs": {
                            "products": {
                                "terms": {
                                    "field": "product_id",
                                    "size": 20
                                }
                            }
                        }
                    },
                    "top_clicked_products": {
                        "filter": {"term": {"action": "click"}},
                        "aggs": {
                            "products": {
                                "terms": {
                                    "field": "product_id",
                                    "size": 20
                                }
                            }
                        }
                    },
                    "top_purchased_products": {
                        "filter": {"term": {"action": "purchase"}},
                        "aggs": {
                            "products": {
                                "terms": {
                                    "field": "product_id",
                                    "size": 20
                                }
                            }
                        }
                    },
                    "conversion_funnel": {
                        "filters": {
                            "filters": {
                                "views": {"term": {"action": "view"}},
                                "clicks": {"term": {"action": "click"}},
                                "purchases": {"term": {"action": "purchase"}}
                            }
                        }
                    }
                },
                "size": 0
            }
            
            response = self.es.search(
                index='user_behaviors',
                body=analytics_body
            )
            
            aggs = response['aggregations']
            
            return {
                'top_viewed_products': [
                    {'product_id': bucket['key'], 'views': bucket['doc_count']}
                    for bucket in aggs['top_viewed_products']['products']['buckets']
                ],
                'top_clicked_products': [
                    {'product_id': bucket['key'], 'clicks': bucket['doc_count']}
                    for bucket in aggs['top_clicked_products']['products']['buckets']
                ],
                'top_purchased_products': [
                    {'product_id': bucket['key'], 'purchases': bucket['doc_count']}
                    for bucket in aggs['top_purchased_products']['products']['buckets']
                ],
                'conversion_funnel': {
                    'views': aggs['conversion_funnel']['buckets']['views']['doc_count'],
                    'clicks': aggs['conversion_funnel']['buckets']['clicks']['doc_count'],
                    'purchases': aggs['conversion_funnel']['buckets']['purchases']['doc_count']
                }
            }
            
        except Exception as e:
            self.logger.error(f"获取产品分析失败: {e}")
            return {}

# 使用示例
if __name__ == "__main__":
    # 初始化系统
    search_system = EcommerceSearchSystem(
        es_hosts=['localhost:9200'],
        redis_host='localhost'
    )
    
    # 设置索引
    search_system.setup_indices()
    
    # 初始化服务
    search_service = ProductSearchService(search_system)
    analytics_service = SearchAnalyticsService(search_system)
    
    # 搜索产品
    results = search_service.search_products(
        query="手机",
        filters={
            'brand': ['Apple', 'Samsung'],
            'price_min': 1000,
            'price_max': 8000,
            'rating_min': 4.0
        },
        sort='popularity',
        page=1,
        size=20,
        user_id='user123'
    )
    
    print(f"搜索结果: {results['total']} 个产品")
    
    # 获取建议
    suggestions = search_service.get_suggestions("手机")
    print(f"搜索建议: {suggestions}")
    
    # 获取推荐
    recommendations = search_service.get_recommendations("user123")
    print(f"个性化推荐: {len(recommendations)} 个产品")
    
    # 获取分析数据
    analytics = analytics_service.get_search_analytics(
        start_date="2024-01-01",
        end_date="2024-01-31"
    )
    print(f"搜索分析: {analytics}")

1.2 性能优化策略

# 性能优化工具类
class SearchPerformanceOptimizer:
    """搜索性能优化器"""
    
    def __init__(self, search_system: EcommerceSearchSystem):
        self.es = search_system.es
        self.redis = search_system.redis
        self.logger = search_system.logger
    
    def optimize_index_settings(self, index_name: str):
        """优化索引设置"""
        try:
            # 更新索引设置
            settings = {
                "index": {
                    "refresh_interval": "30s",  # 降低刷新频率
                    "number_of_replicas": 1,
                    "translog": {
                        "flush_threshold_size": "1gb",
                        "sync_interval": "30s"
                    },
                    "merge": {
                        "policy": {
                            "max_merge_at_once": 5,
                            "segments_per_tier": 10
                        }
                    },
                    "codec": "best_compression",  # 启用压缩
                    "routing": {
                        "allocation": {
                            "total_shards_per_node": 3
                        }
                    }
                }
            }
            
            self.es.indices.put_settings(
                index=index_name,
                body=settings
            )
            
            self.logger.info(f"索引 {index_name} 设置优化完成")
            
        except Exception as e:
            self.logger.error(f"优化索引设置失败: {e}")
    
    def setup_index_templates(self):
        """设置索引模板"""
        try:
            # 产品索引模板
            product_template = {
                "index_patterns": ["products*"],
                "settings": {
                    "number_of_shards": 3,
                    "number_of_replicas": 1,
                    "refresh_interval": "30s",
                    "codec": "best_compression",
                    "analysis": {
                        "analyzer": {
                            "product_analyzer": {
                                "type": "custom",
                                "tokenizer": "standard",
                                "filter": ["lowercase", "stop", "snowball"]
                            }
                        }
                    }
                },
                "mappings": {
                    "dynamic": "strict",
                    "properties": {
                        "id": {"type": "keyword"},
                        "title": {
                            "type": "text",
                            "analyzer": "product_analyzer",
                            "fields": {
                                "keyword": {"type": "keyword"},
                                "suggest": {"type": "completion"}
                            }
                        },
                        "price": {"type": "double"},
                        "category": {"type": "keyword"},
                        "brand": {"type": "keyword"},
                        "created_at": {"type": "date"}
                    }
                }
            }
            
            # 用户行为索引模板
            behavior_template = {
                "index_patterns": ["user_behaviors*"],
                "settings": {
                    "number_of_shards": 5,
                    "number_of_replicas": 0,  # 行为数据可以不要副本
                    "refresh_interval": "60s",
                    "codec": "best_compression"
                },
                "mappings": {
                    "dynamic": "strict",
                    "properties": {
                        "user_id": {"type": "keyword"},
                        "action": {"type": "keyword"},
                        "product_id": {"type": "keyword"},
                        "timestamp": {"type": "date"}
                    }
                }
            }
            
            self.es.indices.put_template(
                name="product_template",
                body=product_template
            )
            
            self.es.indices.put_template(
                name="behavior_template",
                body=behavior_template
            )
            
            self.logger.info("索引模板设置完成")
            
        except Exception as e:
            self.logger.error(f"设置索引模板失败: {e}")
    
    def setup_caching_strategy(self):
        """设置缓存策略"""
        try:
            # 启用查询缓存
            cache_settings = {
                "index": {
                    "queries": {
                        "cache": {
                            "enabled": True
                        }
                    },
                    "requests": {
                        "cache": {
                            "enable": True
                        }
                    }
                }
            }
            
            self.es.indices.put_settings(
                index="products",
                body=cache_settings
            )
            
            self.logger.info("缓存策略设置完成")
            
        except Exception as e:
            self.logger.error(f"设置缓存策略失败: {e}")
    
    def monitor_performance(self) -> Dict:
        """监控性能指标"""
        try:
            # 获取集群统计信息
            cluster_stats = self.es.cluster.stats()
            
            # 获取节点统计信息
            nodes_stats = self.es.nodes.stats()
            
            # 获取索引统计信息
            indices_stats = self.es.indices.stats()
            
            performance_metrics = {
                'cluster': {
                    'status': cluster_stats['status'],
                    'nodes_count': cluster_stats['nodes']['count']['total'],
                    'indices_count': cluster_stats['indices']['count'],
                    'docs_count': cluster_stats['indices']['docs']['count'],
                    'store_size': cluster_stats['indices']['store']['size_in_bytes']
                },
                'indices': {},
                'nodes': {}
            }
            
            # 处理索引统计信息
            for index_name, index_stats in indices_stats['indices'].items():
                performance_metrics['indices'][index_name] = {
                    'docs_count': index_stats['total']['docs']['count'],
                    'store_size': index_stats['total']['store']['size_in_bytes'],
                    'search_time': index_stats['total']['search']['time_in_millis'],
                    'search_count': index_stats['total']['search']['query_total'],
                    'indexing_time': index_stats['total']['indexing']['time_in_millis'],
                    'indexing_count': index_stats['total']['indexing']['index_total']
                }
            
            # 处理节点统计信息
            for node_id, node_stats in nodes_stats['nodes'].items():
                performance_metrics['nodes'][node_id] = {
                    'name': node_stats['name'],
                    'cpu_percent': node_stats['os']['cpu']['percent'],
                    'memory_used_percent': node_stats['jvm']['mem']['heap_used_percent'],
                    'disk_available': node_stats['fs']['total']['available_in_bytes'],
                    'gc_time': node_stats['jvm']['gc']['collectors']['young']['collection_time_in_millis']
                }
            
            return performance_metrics
            
        except Exception as e:
            self.logger.error(f"监控性能失败: {e}")
            return {}
    
    def optimize_search_queries(self, query_body: Dict) -> Dict:
        """优化搜索查询"""
        try:
            optimized_query = query_body.copy()
            
            # 添加查询缓存
            optimized_query['request_cache'] = True
            
            # 限制返回字段
            if '_source' not in optimized_query:
                optimized_query['_source'] = [
                    'id', 'title', 'price', 'brand', 'rating', 'image'
                ]
            
            # 添加超时设置
            optimized_query['timeout'] = '5s'
            
            # 优化聚合
            if 'aggs' in optimized_query:
                for agg_name, agg_body in optimized_query['aggs'].items():
                    if 'terms' in agg_body:
                        # 限制聚合大小
                        if 'size' not in agg_body['terms']:
                            agg_body['terms']['size'] = 10
                        
                        # 添加执行提示
                        agg_body['terms']['execution_hint'] = 'map'
            
            # 添加搜索类型优化
            if 'query' in optimized_query and 'bool' in optimized_query['query']:
                bool_query = optimized_query['query']['bool']
                
                # 将高选择性过滤器放在前面
                if 'filter' in bool_query:
                    filters = bool_query['filter']
                    # 按选择性排序过滤器
                    high_selectivity_filters = []
                    low_selectivity_filters = []
                    
                    for filter_clause in filters:
                        if 'term' in filter_clause or 'range' in filter_clause:
                            high_selectivity_filters.append(filter_clause)
                        else:
                            low_selectivity_filters.append(filter_clause)
                    
                    bool_query['filter'] = high_selectivity_filters + low_selectivity_filters
            
            return optimized_query
            
        except Exception as e:
            self.logger.error(f"优化查询失败: {e}")
            return query_body
    
    def setup_monitoring_alerts(self):
        """设置监控告警"""
        try:
            # 创建监控脚本
            monitoring_script = """
import requests
import json
from datetime import datetime

def check_cluster_health():
    try:
        response = requests.get('http://localhost:9200/_cluster/health')
        health = response.json()
        
        if health['status'] != 'green':
            send_alert(f"集群状态异常: {health['status']}")
        
        if health['number_of_nodes'] < 3:
            send_alert(f"节点数量不足: {health['number_of_nodes']}")
            
    except Exception as e:
        send_alert(f"健康检查失败: {e}")

def check_performance_metrics():
    try:
        response = requests.get('http://localhost:9200/_nodes/stats')
        stats = response.json()
        
        for node_id, node_stats in stats['nodes'].items():
            # 检查内存使用率
            memory_percent = node_stats['jvm']['mem']['heap_used_percent']
            if memory_percent > 85:
                send_alert(f"节点 {node_stats['name']} 内存使用率过高: {memory_percent}%")
            
            # 检查磁盘空间
            disk_available = node_stats['fs']['total']['available_in_bytes']
            if disk_available < 10 * 1024 * 1024 * 1024:  # 10GB
                send_alert(f"节点 {node_stats['name']} 磁盘空间不足: {disk_available / 1024 / 1024 / 1024:.2f}GB")
                
    except Exception as e:
        send_alert(f"性能检查失败: {e}")

def send_alert(message):
    # 发送告警通知(邮件、短信、钉钉等)
    print(f"[ALERT] {datetime.now()}: {message}")
    
    # 这里可以集成实际的告警系统
    # 例如:发送邮件、调用钉钉机器人、发送短信等

if __name__ == "__main__":
    check_cluster_health()
    check_performance_metrics()
"""
            
            # 保存监控脚本
            with open('elasticsearch_monitor.py', 'w', encoding='utf-8') as f:
                f.write(monitoring_script)
            
            self.logger.info("监控告警设置完成")
            
        except Exception as e:
            self.logger.error(f"设置监控告警失败: {e}")

# 使用示例
if __name__ == "__main__":
    search_system = EcommerceSearchSystem(['localhost:9200'])
    optimizer = SearchPerformanceOptimizer(search_system)
    
    # 优化索引设置
    optimizer.optimize_index_settings('products')
    
    # 设置索引模板
    optimizer.setup_index_templates()
    
    # 设置缓存策略
    optimizer.setup_caching_strategy()
    
    # 监控性能
    metrics = optimizer.monitor_performance()
    print(f"性能指标: {json.dumps(metrics, indent=2)}")
    
    # 设置监控告警
    optimizer.setup_monitoring_alerts()

2. 日志分析系统案例

2.1 系统架构

# 日志分析系统
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
import json
import re
from datetime import datetime, timedelta
from typing import Dict, List, Optional
import geoip2.database
import user_agents
import hashlib

class LogAnalysisSystem:
    """日志分析系统"""
    
    def __init__(self, es_hosts: List[str]):
        self.es = Elasticsearch(es_hosts)
        self.geoip_reader = geoip2.database.Reader('GeoLite2-City.mmdb')
        
    def setup_log_indices(self):
        """设置日志索引"""
        # Web访问日志索引
        access_log_mapping = {
            "settings": {
                "number_of_shards": 5,
                "number_of_replicas": 1,
                "index": {
                    "lifecycle": {
                        "name": "access_logs_policy",
                        "rollover_alias": "access_logs"
                    }
                }
            },
            "mappings": {
                "properties": {
                    "timestamp": {"type": "date"},
                    "ip": {"type": "ip"},
                    "method": {"type": "keyword"},
                    "url": {
                        "type": "text",
                        "fields": {
                            "keyword": {"type": "keyword"}
                        }
                    },
                    "status_code": {"type": "integer"},
                    "response_size": {"type": "long"},
                    "response_time": {"type": "float"},
                    "user_agent": {
                        "type": "text",
                        "fields": {
                            "keyword": {"type": "keyword"}
                        }
                    },
                    "referer": {"type": "keyword"},
                    "user_id": {"type": "keyword"},
                    "session_id": {"type": "keyword"},
                    "geo": {
                        "properties": {
                            "country": {"type": "keyword"},
                            "region": {"type": "keyword"},
                            "city": {"type": "keyword"},
                            "location": {"type": "geo_point"}
                        }
                    },
                    "device": {
                        "properties": {
                            "family": {"type": "keyword"},
                            "brand": {"type": "keyword"},
                            "model": {"type": "keyword"}
                        }
                    },
                    "browser": {
                        "properties": {
                            "family": {"type": "keyword"},
                            "version": {"type": "keyword"}
                        }
                    },
                    "os": {
                        "properties": {
                            "family": {"type": "keyword"},
                            "version": {"type": "keyword"}
                        }
                    },
                    "is_bot": {"type": "boolean"},
                    "is_mobile": {"type": "boolean"},
                    "security": {
                        "properties": {
                            "threat_level": {"type": "keyword"},
                            "attack_type": {"type": "keyword"},
                            "is_suspicious": {"type": "boolean"}
                        }
                    }
                }
            }
        }
        
        # 应用日志索引
        app_log_mapping = {
            "settings": {
                "number_of_shards": 3,
                "number_of_replicas": 1
            },
            "mappings": {
                "properties": {
                    "timestamp": {"type": "date"},
                    "level": {"type": "keyword"},
                    "logger": {"type": "keyword"},
                    "message": {
                        "type": "text",
                        "analyzer": "standard"
                    },
                    "exception": {
                        "properties": {
                            "class": {"type": "keyword"},
                            "message": {"type": "text"},
                            "stack_trace": {"type": "text"}
                        }
                    },
                    "thread": {"type": "keyword"},
                    "host": {"type": "keyword"},
                    "service": {"type": "keyword"},
                    "version": {"type": "keyword"},
                    "environment": {"type": "keyword"},
                    "request_id": {"type": "keyword"},
                    "user_id": {"type": "keyword"},
                    "duration": {"type": "float"},
                    "tags": {"type": "keyword"}
                }
            }
        }
        
        # 创建索引
        self.es.indices.create(
            index='access_logs-000001',
            body=access_log_mapping,
            ignore=400
        )
        
        self.es.indices.create(
            index='app_logs',
            body=app_log_mapping,
            ignore=400
        )
        
        # 创建别名
        self.es.indices.put_alias(
            index='access_logs-000001',
            name='access_logs'
        )
        
    def setup_ilm_policies(self):
        """设置索引生命周期管理策略"""
        # 访问日志ILM策略
        access_logs_policy = {
            "policy": {
                "phases": {
                    "hot": {
                        "actions": {
                            "rollover": {
                                "max_size": "10GB",
                                "max_age": "1d"
                            }
                        }
                    },
                    "warm": {
                        "min_age": "7d",
                        "actions": {
                            "allocate": {
                                "number_of_replicas": 0
                            },
                            "forcemerge": {
                                "max_num_segments": 1
                            }
                        }
                    },
                    "cold": {
                        "min_age": "30d",
                        "actions": {
                            "allocate": {
                                "number_of_replicas": 0,
                                "require": {
                                    "box_type": "cold"
                                }
                            }
                        }
                    },
                    "delete": {
                        "min_age": "90d",
                        "actions": {
                            "delete": {}
                        }
                    }
                }
            }
        }
        
        # 应用日志ILM策略
        app_logs_policy = {
            "policy": {
                "phases": {
                    "hot": {
                        "actions": {
                            "rollover": {
                                "max_size": "5GB",
                                "max_age": "7d"
                            }
                        }
                    },
                    "warm": {
                        "min_age": "30d",
                        "actions": {
                            "allocate": {
                                "number_of_replicas": 0
                            }
                        }
                    },
                    "delete": {
                        "min_age": "180d",
                        "actions": {
                            "delete": {}
                        }
                    }
                }
            }
        }
        
        # 创建ILM策略
        self.es.ilm.put_lifecycle(
            policy='access_logs_policy',
            body=access_logs_policy
        )
        
        self.es.ilm.put_lifecycle(
            policy='app_logs_policy',
            body=app_logs_policy
        )

class LogParser:
    """日志解析器"""
    
    def __init__(self, log_system: LogAnalysisSystem):
        self.es = log_system.es
        self.geoip_reader = log_system.geoip_reader
        
        # 常见的攻击模式
        self.attack_patterns = {
            'sql_injection': [
                r"('|(\-\-)|(;)|(\||\|)|(\*|\*))",
                r"(union|select|insert|delete|update|drop|create|alter)",
                r"(script|javascript|vbscript|onload|onerror)"
            ],
            'xss': [
                r"<script[^>]*>.*?</script>",
                r"javascript:",
                r"on\w+\s*="
            ],
            'path_traversal': [
                r"\.\./",
                r"%2e%2e%2f",
                r"\\\.\.\\|"
            ],
            'command_injection': [
                r"(;|\||&|`|\$\(|\${)",
                r"(cat|ls|pwd|whoami|id|uname)"
            ]
        }
    
    def parse_access_log(self, log_line: str) -> Dict:
        """解析访问日志"""
        # Nginx访问日志格式解析
        log_pattern = r'(\S+) - (\S+) \[(.*?)\] "(\S+) (.*?) (\S+)" (\d+) (\d+) "(.*?)" "(.*?)" "(.*?)" (\S+)'
        
        match = re.match(log_pattern, log_line)
        if not match:
            return None
        
        ip, user, timestamp_str, method, url, protocol, status_code, response_size, referer, user_agent, forwarded_for, response_time = match.groups()
        
        # 解析时间戳
        timestamp = datetime.strptime(timestamp_str, '%d/%b/%Y:%H:%M:%S %z')
        
        # 解析User-Agent
        ua = user_agents.parse(user_agent)
        
        # 获取地理位置信息
        geo_info = self._get_geo_info(ip)
        
        # 安全分析
        security_info = self._analyze_security(method, url, user_agent, ip)
        
        parsed_log = {
            'timestamp': timestamp.isoformat(),
            'ip': ip,
            'method': method,
            'url': url,
            'status_code': int(status_code),
            'response_size': int(response_size),
            'response_time': float(response_time),
            'user_agent': user_agent,
            'referer': referer if referer != '-' else None,
            'geo': geo_info,
            'device': {
                'family': ua.device.family,
                'brand': ua.device.brand,
                'model': ua.device.model
            },
            'browser': {
                'family': ua.browser.family,
                'version': ua.browser.version_string
            },
            'os': {
                'family': ua.os.family,
                'version': ua.os.version_string
            },
            'is_bot': ua.is_bot,
            'is_mobile': ua.is_mobile,
            'security': security_info
        }
        
        return parsed_log
    
    def _get_geo_info(self, ip: str) -> Dict:
        """获取地理位置信息"""
        try:
            response = self.geoip_reader.city(ip)
            return {
                'country': response.country.name,
                'region': response.subdivisions.most_specific.name,
                'city': response.city.name,
                'location': {
                    'lat': float(response.location.latitude),
                    'lon': float(response.location.longitude)
                }
            }
        except:
            return {}
    
    def _analyze_security(self, method: str, url: str, user_agent: str, ip: str) -> Dict:
        """安全分析"""
        security_info = {
            'threat_level': 'low',
            'attack_type': None,
            'is_suspicious': False
        }
        
        # 检查攻击模式
        for attack_type, patterns in self.attack_patterns.items():
            for pattern in patterns:
                if re.search(pattern, url, re.IGNORECASE) or re.search(pattern, user_agent, re.IGNORECASE):
                    security_info['attack_type'] = attack_type
                    security_info['threat_level'] = 'high'
                    security_info['is_suspicious'] = True
                    break
            if security_info['is_suspicious']:
                break
        
        # 检查可疑行为
        if not security_info['is_suspicious']:
            # 检查异常状态码
            if method in ['POST', 'PUT', 'DELETE'] and '/admin' in url:
                security_info['threat_level'] = 'medium'
                security_info['is_suspicious'] = True
            
            # 检查异常User-Agent
            if len(user_agent) < 10 or 'bot' in user_agent.lower():
                security_info['threat_level'] = 'medium'
                security_info['is_suspicious'] = True
        
        return security_info
    
    def parse_app_log(self, log_line: str) -> Dict:
        """解析应用日志"""
        try:
            # 假设应用日志是JSON格式
            log_data = json.loads(log_line)
            
            # 标准化字段
            parsed_log = {
                'timestamp': log_data.get('timestamp', datetime.now().isoformat()),
                'level': log_data.get('level', 'INFO'),
                'logger': log_data.get('logger'),
                'message': log_data.get('message'),
                'thread': log_data.get('thread'),
                'host': log_data.get('host'),
                'service': log_data.get('service'),
                'version': log_data.get('version'),
                'environment': log_data.get('environment'),
                'request_id': log_data.get('request_id'),
                'user_id': log_data.get('user_id'),
                'duration': log_data.get('duration'),
                'tags': log_data.get('tags', [])
            }
            
            # 解析异常信息
            if 'exception' in log_data:
                exception = log_data['exception']
                parsed_log['exception'] = {
                    'class': exception.get('class'),
                    'message': exception.get('message'),
                    'stack_trace': exception.get('stack_trace')
                }
            
            return parsed_log
            
        except json.JSONDecodeError:
            # 如果不是JSON格式,尝试正则解析
            return self._parse_text_log(log_line)
    
    def _parse_text_log(self, log_line: str) -> Dict:
        """解析文本格式日志"""
        # 简单的文本日志解析
        pattern = r'(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) \[(\w+)\] (\w+): (.*)'
        match = re.match(pattern, log_line)
        
        if match:
            timestamp_str, level, logger, message = match.groups()
            return {
                'timestamp': datetime.strptime(timestamp_str, '%Y-%m-%d %H:%M:%S').isoformat(),
                'level': level,
                'logger': logger,
                'message': message
            }
        
        return {
            'timestamp': datetime.now().isoformat(),
            'level': 'UNKNOWN',
            'message': log_line
        }

class LogAnalyzer:
    """日志分析器"""
    
    def __init__(self, log_system: LogAnalysisSystem):
        self.es = log_system.es
    
    def analyze_traffic_patterns(self, start_date: str, end_date: str) -> Dict:
        """分析流量模式"""
        query_body = {
            "query": {
                "range": {
                    "timestamp": {
                        "gte": start_date,
                        "lte": end_date
                    }
                }
            },
            "aggs": {
                "traffic_over_time": {
                    "date_histogram": {
                        "field": "timestamp",
                        "calendar_interval": "1h"
                    }
                },
                "top_pages": {
                    "terms": {
                        "field": "url.keyword",
                        "size": 20
                    }
                },
                "status_codes": {
                    "terms": {
                        "field": "status_code",
                        "size": 10
                    }
                },
                "top_ips": {
                    "terms": {
                        "field": "ip",
                        "size": 20
                    }
                },
                "geographic_distribution": {
                    "terms": {
                        "field": "geo.country",
                        "size": 20
                    }
                },
                "device_types": {
                    "terms": {
                        "field": "device.family",
                        "size": 10
                    }
                },
                "browser_distribution": {
                    "terms": {
                        "field": "browser.family",
                        "size": 10
                    }
                },
                "response_time_stats": {
                    "stats": {
                        "field": "response_time"
                    }
                },
                "response_time_percentiles": {
                    "percentiles": {
                        "field": "response_time",
                        "percents": [50, 90, 95, 99]
                    }
                }
            },
            "size": 0
        }
        
        response = self.es.search(
            index='access_logs',
            body=query_body
        )
        
        return self._process_traffic_analysis(response)
    
    def _process_traffic_analysis(self, response: Dict) -> Dict:
        """处理流量分析结果"""
        aggs = response['aggregations']
        
        return {
            'total_requests': response['hits']['total']['value'],
            'traffic_timeline': [
                {
                    'timestamp': bucket['key_as_string'],
                    'requests': bucket['doc_count']
                }
                for bucket in aggs['traffic_over_time']['buckets']
            ],
            'top_pages': [
                {
                    'url': bucket['key'],
                    'requests': bucket['doc_count']
                }
                for bucket in aggs['top_pages']['buckets']
            ],
            'status_distribution': [
                {
                    'status_code': bucket['key'],
                    'count': bucket['doc_count']
                }
                for bucket in aggs['status_codes']['buckets']
            ],
            'top_ips': [
                {
                    'ip': bucket['key'],
                    'requests': bucket['doc_count']
                }
                for bucket in aggs['top_ips']['buckets']
            ],
            'geographic_distribution': [
                {
                    'country': bucket['key'],
                    'requests': bucket['doc_count']
                }
                for bucket in aggs['geographic_distribution']['buckets']
            ],
            'device_distribution': [
                {
                    'device': bucket['key'],
                    'requests': bucket['doc_count']
                }
                for bucket in aggs['device_types']['buckets']
            ],
            'browser_distribution': [
                {
                    'browser': bucket['key'],
                    'requests': bucket['doc_count']
                }
                for bucket in aggs['browser_distribution']['buckets']
            ],
            'response_time_stats': aggs['response_time_stats'],
            'response_time_percentiles': aggs['response_time_percentiles']['values']
        }
    
    def detect_security_threats(self, start_date: str, end_date: str) -> Dict:
        """检测安全威胁"""
        query_body = {
            "query": {
                "bool": {
                    "must": [
                        {
                            "range": {
                                "timestamp": {
                                    "gte": start_date,
                                    "lte": end_date
                                }
                            }
                        },
                        {
                            "term": {
                                "security.is_suspicious": True
                            }
                        }
                    ]
                }
            },
            "aggs": {
                "threat_levels": {
                    "terms": {
                        "field": "security.threat_level",
                        "size": 10
                    }
                },
                "attack_types": {
                    "terms": {
                        "field": "security.attack_type",
                        "size": 10
                    }
                },
                "suspicious_ips": {
                    "terms": {
                        "field": "ip",
                        "size": 20
                    }
                },
                "attack_timeline": {
                    "date_histogram": {
                        "field": "timestamp",
                        "calendar_interval": "1h"
                    }
                },
                "targeted_urls": {
                    "terms": {
                        "field": "url.keyword",
                        "size": 20
                    }
                }
            },
            "size": 100,
            "sort": [
                {"timestamp": {"order": "desc"}}
            ]
        }
        
        response = self.es.search(
            index='access_logs',
            body=query_body
        )
        
        return self._process_security_analysis(response)
    
    def _process_security_analysis(self, response: Dict) -> Dict:
        """处理安全分析结果"""
        aggs = response['aggregations']
        hits = response['hits']['hits']
        
        return {
            'total_threats': response['hits']['total']['value'],
            'recent_threats': [
                {
                    'timestamp': hit['_source']['timestamp'],
                    'ip': hit['_source']['ip'],
                    'url': hit['_source']['url'],
                    'attack_type': hit['_source']['security']['attack_type'],
                    'threat_level': hit['_source']['security']['threat_level']
                }
                for hit in hits
            ],
            'threat_distribution': [
                {
                    'level': bucket['key'],
                    'count': bucket['doc_count']
                }
                for bucket in aggs['threat_levels']['buckets']
            ],
            'attack_types': [
                {
                    'type': bucket['key'],
                    'count': bucket['doc_count']
                }
                for bucket in aggs['attack_types']['buckets']
            ],
            'suspicious_ips': [
                {
                    'ip': bucket['key'],
                    'attempts': bucket['doc_count']
                }
                for bucket in aggs['suspicious_ips']['buckets']
            ],
            'attack_timeline': [
                {
                    'timestamp': bucket['key_as_string'],
                    'attacks': bucket['doc_count']
                }
                for bucket in aggs['attack_timeline']['buckets']
            ],
            'targeted_urls': [
                {
                    'url': bucket['key'],
                    'attacks': bucket['doc_count']
                }
                for bucket in aggs['targeted_urls']['buckets']
            ]
        }
    
    def analyze_application_errors(self, start_date: str, end_date: str) -> Dict:
        """分析应用错误"""
        query_body = {
            "query": {
                "bool": {
                    "must": [
                        {
                            "range": {
                                "timestamp": {
                                    "gte": start_date,
                                    "lte": end_date
                                }
                            }
                        },
                        {
                            "terms": {
                                "level": ["ERROR", "FATAL"]
                            }
                        }
                    ]
                }
            },
            "aggs": {
                "error_levels": {
                    "terms": {
                        "field": "level",
                        "size": 10
                    }
                },
                "error_timeline": {
                    "date_histogram": {
                        "field": "timestamp",
                        "calendar_interval": "1h"
                    }
                },
                "top_loggers": {
                    "terms": {
                        "field": "logger",
                        "size": 20
                    }
                },
                "exception_types": {
                    "terms": {
                        "field": "exception.class",
                        "size": 20
                    }
                },
                "services_with_errors": {
                    "terms": {
                        "field": "service",
                        "size": 20
                    }
                },
                "hosts_with_errors": {
                    "terms": {
                        "field": "host",
                        "size": 20
                    }
                }
            },
            "size": 100,
            "sort": [
                {"timestamp": {"order": "desc"}}
            ]
        }
        
        response = self.es.search(
            index='app_logs',
            body=query_body
        )
        
        return self._process_error_analysis(response)
    
    def _process_error_analysis(self, response: Dict) -> Dict:
        """处理错误分析结果"""
        aggs = response['aggregations']
        hits = response['hits']['hits']
        
        return {
            'total_errors': response['hits']['total']['value'],
            'recent_errors': [
                {
                    'timestamp': hit['_source']['timestamp'],
                    'level': hit['_source']['level'],
                    'logger': hit['_source']['logger'],
                    'message': hit['_source']['message'],
                    'service': hit['_source'].get('service'),
                    'host': hit['_source'].get('host')
                }
                for hit in hits
            ],
            'error_distribution': [
                {
                    'level': bucket['key'],
                    'count': bucket['doc_count']
                }
                for bucket in aggs['error_levels']['buckets']
            ],
            'error_timeline': [
                {
                    'timestamp': bucket['key_as_string'],
                    'errors': bucket['doc_count']
                }
                for bucket in aggs['error_timeline']['buckets']
            ],
            'top_loggers': [
                {
                    'logger': bucket['key'],
                    'errors': bucket['doc_count']
                }
                for bucket in aggs['top_loggers']['buckets']
            ],
            'exception_types': [
                {
                    'exception': bucket['key'],
                    'count': bucket['doc_count']
                }
                for bucket in aggs['exception_types']['buckets']
            ],
            'services_with_errors': [
                {
                    'service': bucket['key'],
                    'errors': bucket['doc_count']
                }
                for bucket in aggs['services_with_errors']['buckets']
            ],
            'hosts_with_errors': [
                {
                    'host': bucket['key'],
                    'errors': bucket['doc_count']
                }
                for bucket in aggs['hosts_with_errors']['buckets']
            ]
        }

# 使用示例
if __name__ == "__main__":
    # 初始化日志分析系统
    log_system = LogAnalysisSystem(['localhost:9200'])
    log_system.setup_log_indices()
    log_system.setup_ilm_policies()
    
    # 初始化解析器和分析器
    parser = LogParser(log_system)
    analyzer = LogAnalyzer(log_system)
    
    # 解析访问日志
    access_log = '192.168.1.100 - - [25/Dec/2023:10:00:00 +0000] "GET /api/products?search=laptop HTTP/1.1" 200 1234 "https://example.com" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" "-" 0.123'
    parsed_access = parser.parse_access_log(access_log)
    print(f"解析的访问日志: {json.dumps(parsed_access, indent=2)}")
    
    # 分析流量模式
    traffic_analysis = analyzer.analyze_traffic_patterns(
        start_date="2024-01-01",
        end_date="2024-01-31"
    )
    print(f"流量分析: {json.dumps(traffic_analysis, indent=2)}")
    
    # 检测安全威胁
    security_analysis = analyzer.detect_security_threats(
        start_date="2024-01-01",
        end_date="2024-01-31"
    )
    print(f"安全分析: {json.dumps(security_analysis, indent=2)}")
    
    # 分析应用错误
    error_analysis = analyzer.analyze_application_errors(
        start_date="2024-01-01",
        end_date="2024-01-31"
    )
    print(f"错误分析: {json.dumps(error_analysis, indent=2)}")

3. 实时监控系统案例

3.1 系统架构

# 实时监控系统
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
import json
import time
import threading
from datetime import datetime, timedelta
from typing import Dict, List, Callable
import psutil
import requests
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart

class RealTimeMonitoringSystem:
    """实时监控系统"""
    
    def __init__(self, es_hosts: List[str]):
        self.es = Elasticsearch(es_hosts)
        self.alert_rules = []
        self.alert_handlers = []
        self.monitoring_active = False
        
    def setup_monitoring_indices(self):
        """设置监控索引"""
        # 系统指标索引
        system_metrics_mapping = {
            "settings": {
                "number_of_shards": 3,
                "number_of_replicas": 1,
                "refresh_interval": "5s"
            },
            "mappings": {
                "properties": {
                    "timestamp": {"type": "date"},
                    "host": {"type": "keyword"},
                    "service": {"type": "keyword"},
                    "metric_type": {"type": "keyword"},
                    "cpu": {
                        "properties": {
                            "usage_percent": {"type": "float"},
                            "load_avg_1m": {"type": "float"},
                            "load_avg_5m": {"type": "float"},
                            "load_avg_15m": {"type": "float"}
                        }
                    },
                    "memory": {
                        "properties": {
                            "total": {"type": "long"},
                            "available": {"type": "long"},
                            "used": {"type": "long"},
                            "usage_percent": {"type": "float"}
                        }
                    },
                    "disk": {
                        "properties": {
                            "total": {"type": "long"},
                            "used": {"type": "long"},
                            "free": {"type": "long"},
                            "usage_percent": {"type": "float"}
                        }
                    },
                    "network": {
                        "properties": {
                            "bytes_sent": {"type": "long"},
                            "bytes_recv": {"type": "long"},
                            "packets_sent": {"type": "long"},
                            "packets_recv": {"type": "long"}
                        }
                    },
                    "elasticsearch": {
                        "properties": {
                            "cluster_status": {"type": "keyword"},
                            "active_shards": {"type": "integer"},
                            "relocating_shards": {"type": "integer"},
                            "initializing_shards": {"type": "integer"},
                            "unassigned_shards": {"type": "integer"},
                            "heap_used_percent": {"type": "float"},
                            "disk_used_percent": {"type": "float"}
                        }
                    }
                }
            }
        }
        
        # 告警记录索引
        alerts_mapping = {
            "settings": {
                "number_of_shards": 1,
                "number_of_replicas": 1
            },
            "mappings": {
                "properties": {
                    "timestamp": {"type": "date"},
                    "alert_id": {"type": "keyword"},
                    "rule_name": {"type": "keyword"},
                    "severity": {"type": "keyword"},
                    "status": {"type": "keyword"},
                    "host": {"type": "keyword"},
                    "service": {"type": "keyword"},
                    "metric": {"type": "keyword"},
                    "threshold": {"type": "float"},
                    "actual_value": {"type": "float"},
                    "message": {"type": "text"},
                    "resolved_at": {"type": "date"},
                    "acknowledged_by": {"type": "keyword"},
                    "acknowledged_at": {"type": "date"}
                }
            }
        }
        
        # 创建索引
        self.es.indices.create(
            index='system_metrics',
            body=system_metrics_mapping,
            ignore=400
        )
        
        self.es.indices.create(
            index='alerts',
            body=alerts_mapping,
            ignore=400
        )

class MetricsCollector:
    """指标收集器"""
    
    def __init__(self, monitoring_system: RealTimeMonitoringSystem):
        self.es = monitoring_system.es
        self.host = psutil.os.uname().nodename
        
    def collect_system_metrics(self) -> Dict:
        """收集系统指标"""
        # CPU指标
        cpu_percent = psutil.cpu_percent(interval=1)
        load_avg = psutil.getloadavg()
        
        # 内存指标
        memory = psutil.virtual_memory()
        
        # 磁盘指标
        disk = psutil.disk_usage('/')
        
        # 网络指标
        network = psutil.net_io_counters()
        
        metrics = {
            'timestamp': datetime.now().isoformat(),
            'host': self.host,
            'metric_type': 'system',
            'cpu': {
                'usage_percent': cpu_percent,
                'load_avg_1m': load_avg[0],
                'load_avg_5m': load_avg[1],
                'load_avg_15m': load_avg[2]
            },
            'memory': {
                'total': memory.total,
                'available': memory.available,
                'used': memory.used,
                'usage_percent': memory.percent
            },
            'disk': {
                'total': disk.total,
                'used': disk.used,
                'free': disk.free,
                'usage_percent': (disk.used / disk.total) * 100
            },
            'network': {
                'bytes_sent': network.bytes_sent,
                'bytes_recv': network.bytes_recv,
                'packets_sent': network.packets_sent,
                'packets_recv': network.packets_recv
            }
        }
        
        return metrics
    
    def collect_elasticsearch_metrics(self, es_host: str = 'localhost:9200') -> Dict:
        """收集Elasticsearch指标"""
        try:
            # 集群健康状态
            health_response = requests.get(f'http://{es_host}/_cluster/health')
            health = health_response.json()
            
            # 节点统计信息
            stats_response = requests.get(f'http://{es_host}/_nodes/stats')
            stats = stats_response.json()
            
            # 计算平均指标
            total_heap_used = 0
            total_heap_max = 0
            total_disk_used = 0
            total_disk_total = 0
            node_count = len(stats['nodes'])
            
            for node_stats in stats['nodes'].values():
                total_heap_used += node_stats['jvm']['mem']['heap_used_in_bytes']
                total_heap_max += node_stats['jvm']['mem']['heap_max_in_bytes']
                total_disk_used += node_stats['fs']['total']['total_in_bytes'] - node_stats['fs']['total']['available_in_bytes']
                total_disk_total += node_stats['fs']['total']['total_in_bytes']
            
            heap_used_percent = (total_heap_used / total_heap_max) * 100 if total_heap_max > 0 else 0
            disk_used_percent = (total_disk_used / total_disk_total) * 100 if total_disk_total > 0 else 0
            
            metrics = {
                'timestamp': datetime.now().isoformat(),
                'host': self.host,
                'service': 'elasticsearch',
                'metric_type': 'elasticsearch',
                'elasticsearch': {
                    'cluster_status': health['status'],
                    'active_shards': health['active_shards'],
                    'relocating_shards': health['relocating_shards'],
                    'initializing_shards': health['initializing_shards'],
                    'unassigned_shards': health['unassigned_shards'],
                    'heap_used_percent': heap_used_percent,
                    'disk_used_percent': disk_used_percent
                }
            }
            
            return metrics
            
        except Exception as e:
            print(f"收集Elasticsearch指标失败: {e}")
            return None
    
    def start_collection(self, interval: int = 30):
        """开始收集指标"""
        def collect_loop():
            while True:
                try:
                    # 收集系统指标
                    system_metrics = self.collect_system_metrics()
                    self.es.index(
                        index='system_metrics',
                        body=system_metrics
                    )
                    
                    # 收集Elasticsearch指标
                    es_metrics = self.collect_elasticsearch_metrics()
                    if es_metrics:
                        self.es.index(
                            index='system_metrics',
                            body=es_metrics
                        )
                    
                    time.sleep(interval)
                    
                except Exception as e:
                    print(f"指标收集失败: {e}")
                    time.sleep(interval)
        
        collection_thread = threading.Thread(target=collect_loop, daemon=True)
        collection_thread.start()
        print(f"指标收集已启动,间隔: {interval}秒")

class AlertRule:
    """告警规则"""
    
    def __init__(self, name: str, condition: str, threshold: float, 
                 severity: str = 'warning', duration: int = 300):
        self.name = name
        self.condition = condition  # 例如: "cpu.usage_percent > 80"
        self.threshold = threshold
        self.severity = severity  # critical, warning, info
        self.duration = duration  # 持续时间(秒)
        self.last_triggered = None
        
    def evaluate(self, metrics: Dict) -> bool:
        """评估告警条件"""
        try:
            # 简单的条件评估
            if 'cpu.usage_percent' in self.condition:
                value = metrics.get('cpu', {}).get('usage_percent', 0)
                if '>' in self.condition:
                    return value > self.threshold
                elif '<' in self.condition:
                    return value < self.threshold
            
            elif 'memory.usage_percent' in self.condition:
                value = metrics.get('memory', {}).get('usage_percent', 0)
                if '>' in self.condition:
                    return value > self.threshold
                elif '<' in self.condition:
                    return value < self.threshold
            
            elif 'disk.usage_percent' in self.condition:
                value = metrics.get('disk', {}).get('usage_percent', 0)
                if '>' in self.condition:
                    return value > self.threshold
                elif '<' in self.condition:
                    return value < self.threshold
            
            elif 'elasticsearch.heap_used_percent' in self.condition:
                value = metrics.get('elasticsearch', {}).get('heap_used_percent', 0)
                if '>' in self.condition:
                    return value > self.threshold
                elif '<' in self.condition:
                    return value < self.threshold
            
            return False
            
        except Exception as e:
            print(f"评估告警条件失败: {e}")
            return False

class AlertManager:
    """告警管理器"""
    
    def __init__(self, monitoring_system: RealTimeMonitoringSystem):
        self.es = monitoring_system.es
        self.alert_rules = []
        self.alert_handlers = []
        self.active_alerts = {}
        
    def add_rule(self, rule: AlertRule):
        """添加告警规则"""
        self.alert_rules.append(rule)
    
    def add_handler(self, handler: Callable):
        """添加告警处理器"""
        self.alert_handlers.append(handler)
    
    def evaluate_metrics(self, metrics: Dict):
        """评估指标并触发告警"""
        current_time = datetime.now()
        
        for rule in self.alert_rules:
            if rule.evaluate(metrics):
                alert_id = f"{rule.name}_{metrics.get('host', 'unknown')}"
                
                # 检查是否已经有活跃的告警
                if alert_id not in self.active_alerts:
                    # 创建新告警
                    alert = {
                        'alert_id': alert_id,
                        'rule_name': rule.name,
                        'severity': rule.severity,
                        'status': 'active',
                        'host': metrics.get('host'),
                        'service': metrics.get('service'),
                        'metric': rule.condition.split('.')[0],
                        'threshold': rule.threshold,
                        'actual_value': self._get_metric_value(metrics, rule.condition),
                        'message': f"{rule.name}: {rule.condition} (阈值: {rule.threshold})",
                        'timestamp': current_time.isoformat()
                    }
                    
                    self.active_alerts[alert_id] = alert
                    self._trigger_alert(alert)
                    
                    # 保存告警记录
                    self.es.index(
                        index='alerts',
                        body=alert
                    )
            else:
                # 检查是否需要解决告警
                alert_id = f"{rule.name}_{metrics.get('host', 'unknown')}"
                if alert_id in self.active_alerts:
                    self._resolve_alert(alert_id)
    
    def _get_metric_value(self, metrics: Dict, condition: str) -> float:
        """获取指标值"""
        try:
            if 'cpu.usage_percent' in condition:
                return metrics.get('cpu', {}).get('usage_percent', 0)
            elif 'memory.usage_percent' in condition:
                return metrics.get('memory', {}).get('usage_percent', 0)
            elif 'disk.usage_percent' in condition:
                return metrics.get('disk', {}).get('usage_percent', 0)
            elif 'elasticsearch.heap_used_percent' in condition:
                return metrics.get('elasticsearch', {}).get('heap_used_percent', 0)
            return 0
        except:
            return 0
    
    def _trigger_alert(self, alert: Dict):
        """触发告警"""
        print(f"[ALERT] {alert['severity'].upper()}: {alert['message']}")
        
        # 调用所有告警处理器
        for handler in self.alert_handlers:
            try:
                handler(alert)
            except Exception as e:
                print(f"告警处理器执行失败: {e}")
    
    def _resolve_alert(self, alert_id: str):
        """解决告警"""
        if alert_id in self.active_alerts:
            alert = self.active_alerts[alert_id]
            alert['status'] = 'resolved'
            alert['resolved_at'] = datetime.now().isoformat()
            
            # 更新告警记录
            self.es.update(
                index='alerts',
                id=alert_id,
                body={'doc': alert}
            )
            
            del self.active_alerts[alert_id]
            print(f"[RESOLVED] 告警已解决: {alert['rule_name']}")
    
    def get_active_alerts(self) -> List[Dict]:
        """获取活跃告警"""
        return list(self.active_alerts.values())
    
    def acknowledge_alert(self, alert_id: str, user: str):
        """确认告警"""
        if alert_id in self.active_alerts:
            alert = self.active_alerts[alert_id]
            alert['acknowledged_by'] = user
            alert['acknowledged_at'] = datetime.now().isoformat()
            
            # 更新告警记录
            self.es.update(
                index='alerts',
                id=alert_id,
                body={'doc': alert}
            )
            
            print(f"告警已确认: {alert_id} by {user}")

class AlertHandlers:
    """告警处理器集合"""
    
    @staticmethod
    def email_handler(smtp_server: str, smtp_port: int, username: str, 
                     password: str, recipients: List[str]):
        """邮件告警处理器"""
        def send_email(alert: Dict):
            try:
                msg = MIMEMultipart()
                msg['From'] = username
                msg['To'] = ', '.join(recipients)
                msg['Subject'] = f"[{alert['severity'].upper()}] {alert['rule_name']}"
                
                body = f"""
告警详情:
- 规则: {alert['rule_name']}
- 严重程度: {alert['severity']}
- 主机: {alert['host']}
- 服务: {alert.get('service', 'N/A')}
- 指标: {alert['metric']}
- 阈值: {alert['threshold']}
- 实际值: {alert['actual_value']}
- 时间: {alert['timestamp']}
- 消息: {alert['message']}
"""
                
                msg.attach(MIMEText(body, 'plain', 'utf-8'))
                
                server = smtplib.SMTP(smtp_server, smtp_port)
                server.starttls()
                server.login(username, password)
                server.send_message(msg)
                server.quit()
                
                print(f"邮件告警已发送: {alert['alert_id']}")
                
            except Exception as e:
                print(f"发送邮件告警失败: {e}")
        
        return send_email
    
    @staticmethod
    def webhook_handler(webhook_url: str):
        """Webhook告警处理器"""
        def send_webhook(alert: Dict):
            try:
                payload = {
                    'alert_id': alert['alert_id'],
                    'rule_name': alert['rule_name'],
                    'severity': alert['severity'],
                    'host': alert['host'],
                    'message': alert['message'],
                    'timestamp': alert['timestamp']
                }
                
                response = requests.post(
                    webhook_url,
                    json=payload,
                    timeout=10
                )
                
                if response.status_code == 200:
                    print(f"Webhook告警已发送: {alert['alert_id']}")
                else:
                    print(f"Webhook告警发送失败: {response.status_code}")
                    
            except Exception as e:
                print(f"发送Webhook告警失败: {e}")
        
        return send_webhook
    
    @staticmethod
    def log_handler(log_file: str):
        """日志告警处理器"""
        def write_log(alert: Dict):
            try:
                with open(log_file, 'a', encoding='utf-8') as f:
                    log_entry = f"{alert['timestamp']} [{alert['severity'].upper()}] {alert['message']}\n"
                    f.write(log_entry)
                
                print(f"告警已记录到日志: {alert['alert_id']}")
                
            except Exception as e:
                print(f"记录告警日志失败: {e}")
        
        return write_log

# 使用示例
if __name__ == "__main__":
    # 初始化监控系统
    monitoring_system = RealTimeMonitoringSystem(['localhost:9200'])
    monitoring_system.setup_monitoring_indices()
    
    # 初始化指标收集器
    collector = MetricsCollector(monitoring_system)
    
    # 初始化告警管理器
    alert_manager = AlertManager(monitoring_system)
    
    # 添加告警规则
    alert_manager.add_rule(AlertRule(
        name="高CPU使用率",
        condition="cpu.usage_percent > 80",
        threshold=80,
        severity="warning"
    ))
    
    alert_manager.add_rule(AlertRule(
        name="高内存使用率",
        condition="memory.usage_percent > 85",
        threshold=85,
        severity="critical"
    ))
    
    alert_manager.add_rule(AlertRule(
        name="磁盘空间不足",
        condition="disk.usage_percent > 90",
        threshold=90,
        severity="critical"
    ))
    
    alert_manager.add_rule(AlertRule(
        name="Elasticsearch堆内存过高",
        condition="elasticsearch.heap_used_percent > 85",
        threshold=85,
        severity="warning"
    ))
    
    # 添加告警处理器
    alert_manager.add_handler(
        AlertHandlers.email_handler(
            smtp_server="smtp.gmail.com",
            smtp_port=587,
            username="your-email@gmail.com",
            password="your-password",
            recipients=["admin@example.com"]
        )
    )
    
    alert_manager.add_handler(
        AlertHandlers.webhook_handler("https://hooks.slack.com/your-webhook-url")
    )
    
    alert_manager.add_handler(
        AlertHandlers.log_handler("/var/log/alerts.log")
    )
    
    # 启动指标收集
    collector.start_collection(interval=30)
    
    # 监控循环
    def monitoring_loop():
        while True:
            try:
                # 获取最新指标
                response = monitoring_system.es.search(
                    index='system_metrics',
                    body={
                        "query": {
                            "range": {
                                "timestamp": {
                                    "gte": "now-1m"
                                }
                            }
                        },
                        "sort": [{"timestamp": {"order": "desc"}}],
                        "size": 10
                    }
                )
                
                # 评估告警
                for hit in response['hits']['hits']:
                    metrics = hit['_source']
                    alert_manager.evaluate_metrics(metrics)
                
                time.sleep(60)  # 每分钟检查一次
                
            except Exception as e:
                print(f"监控循环失败: {e}")
                time.sleep(60)
    
    # 启动监控
    monitoring_thread = threading.Thread(target=monitoring_loop, daemon=True)
    monitoring_thread.start()
    
    print("实时监控系统已启动")
    print("按 Ctrl+C 退出")
    
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        print("\n监控系统已停止")

4. 最佳实践总结

4.1 架构设计原则

4.1.1 分层架构

# 分层架构示例
class ElasticsearchArchitecture:
    """
    Elasticsearch分层架构设计
    
    1. 数据接入层 (Data Ingestion Layer)
       - Logstash, Beats, 自定义采集器
       - 数据清洗、转换、路由
    
    2. 存储层 (Storage Layer)
       - Elasticsearch集群
       - 索引设计、分片策略
       - 数据生命周期管理
    
    3. 计算层 (Compute Layer)
       - 搜索查询
       - 聚合分析
       - 机器学习
    
    4. 应用层 (Application Layer)
       - 业务逻辑
       - API接口
       - 缓存策略
    
    5. 展示层 (Presentation Layer)
       - Kibana仪表板
       - 自定义前端
       - 报表系统
    """
    
    def __init__(self):
        self.layers = {
            'ingestion': self._setup_ingestion_layer(),
            'storage': self._setup_storage_layer(),
            'compute': self._setup_compute_layer(),
            'application': self._setup_application_layer(),
            'presentation': self._setup_presentation_layer()
        }
    
    def _setup_ingestion_layer(self):
        """设置数据接入层"""
        return {
            'logstash': {
                'pipelines': [
                    'access_logs_pipeline',
                    'app_logs_pipeline',
                    'metrics_pipeline'
                ],
                'filters': [
                    'grok', 'date', 'mutate', 'geoip'
                ]
            },
            'beats': {
                'filebeat': 'log_collection',
                'metricbeat': 'system_metrics',
                'heartbeat': 'uptime_monitoring'
            },
            'custom_collectors': [
                'api_metrics_collector',
                'database_metrics_collector'
            ]
        }
    
    def _setup_storage_layer(self):
        """设置存储层"""
        return {
            'cluster_config': {
                'master_nodes': 3,
                'data_nodes': 6,
                'coordinating_nodes': 2
            },
            'index_strategy': {
                'time_based': True,
                'rollover_policy': 'daily',
                'retention_policy': '90d'
            },
            'shard_strategy': {
                'primary_shards': 3,
                'replica_shards': 1,
                'shard_size_target': '30GB'
            }
        }
    
    def _setup_compute_layer(self):
        """设置计算层"""
        return {
            'search_optimization': {
                'query_cache': True,
                'request_cache': True,
                'field_data_cache': True
            },
            'aggregation_optimization': {
                'doc_values': True,
                'eager_global_ordinals': True,
                'composite_aggregations': True
            },
            'ml_features': {
                'anomaly_detection': True,
                'data_frame_analytics': True,
                'inference': True
            }
        }
    
    def _setup_application_layer(self):
        """设置应用层"""
        return {
            'api_design': {
                'restful_apis': True,
                'graphql_support': False,
                'rate_limiting': True,
                'authentication': 'jwt'
            },
            'caching_strategy': {
                'redis_cache': True,
                'application_cache': True,
                'cdn_cache': True
            },
            'error_handling': {
                'circuit_breaker': True,
                'retry_mechanism': True,
                'fallback_strategy': True
            }
        }
    
    def _setup_presentation_layer(self):
        """设置展示层"""
        return {
            'kibana_config': {
                'dashboards': [
                    'system_overview',
                    'application_metrics',
                    'security_monitoring'
                ],
                'visualizations': [
                    'time_series',
                    'heat_maps',
                    'geographic_maps'
                ]
            },
            'custom_ui': {
                'framework': 'react',
                'charts_library': 'd3.js',
                'real_time_updates': 'websockets'
            }
        }

# 使用示例
architecture = ElasticsearchArchitecture()
print("Elasticsearch架构层次:")
for layer, config in architecture.layers.items():
    print(f"- {layer}: {list(config.keys())}")

4.1.2 容量规划

class CapacityPlanner:
    """容量规划工具"""
    
    def __init__(self):
        self.growth_factors = {
            'data_volume': 1.5,  # 年增长率
            'query_load': 1.3,   # 查询负载增长率
            'user_growth': 1.4   # 用户增长率
        }
    
    def calculate_storage_requirements(self, current_data_gb: float, 
                                     retention_days: int, 
                                     daily_growth_gb: float) -> Dict:
        """计算存储需求"""
        # 当前数据量
        current_storage = current_data_gb
        
        # 保留期内的数据量
        retention_storage = daily_growth_gb * retention_days
        
        # 总存储需求(包括副本)
        total_primary = current_storage + retention_storage
        total_with_replicas = total_primary * 2  # 1个副本
        
        # 考虑增长因子
        projected_storage = total_with_replicas * self.growth_factors['data_volume']
        
        # 建议的存储容量(包含20%缓冲)
        recommended_storage = projected_storage * 1.2
        
        return {
            'current_storage_gb': current_storage,
            'retention_storage_gb': retention_storage,
            'total_primary_gb': total_primary,
            'total_with_replicas_gb': total_with_replicas,
            'projected_storage_gb': projected_storage,
            'recommended_storage_gb': recommended_storage,
            'storage_breakdown': {
                'data': total_primary,
                'replicas': total_primary,
                'growth_buffer': projected_storage - total_with_replicas,
                'safety_buffer': recommended_storage - projected_storage
            }
        }
    
    def calculate_compute_requirements(self, avg_queries_per_second: float,
                                     peak_multiplier: float = 3.0,
                                     target_response_time_ms: float = 100) -> Dict:
        """计算计算资源需求"""
        # 峰值查询负载
        peak_qps = avg_queries_per_second * peak_multiplier
        
        # 考虑增长因子
        projected_peak_qps = peak_qps * self.growth_factors['query_load']
        
        # 基于响应时间要求计算节点数
        # 假设每个节点可以处理50 QPS(100ms响应时间)
        queries_per_node = 50
        required_nodes = max(3, int(projected_peak_qps / queries_per_node) + 1)
        
        # CPU和内存需求
        cpu_cores_per_node = 8
        memory_gb_per_node = 32
        
        total_cpu_cores = required_nodes * cpu_cores_per_node
        total_memory_gb = required_nodes * memory_gb_per_node
        
        return {
            'avg_qps': avg_queries_per_second,
            'peak_qps': peak_qps,
            'projected_peak_qps': projected_peak_qps,
            'required_nodes': required_nodes,
            'total_cpu_cores': total_cpu_cores,
            'total_memory_gb': total_memory_gb,
            'per_node_specs': {
                'cpu_cores': cpu_cores_per_node,
                'memory_gb': memory_gb_per_node,
                'disk_type': 'SSD',
                'network': '10Gbps'
            }
        }
    
    def generate_capacity_plan(self, requirements: Dict) -> Dict:
        """生成完整的容量规划"""
        storage_req = self.calculate_storage_requirements(
            requirements['current_data_gb'],
            requirements['retention_days'],
            requirements['daily_growth_gb']
        )
        
        compute_req = self.calculate_compute_requirements(
            requirements['avg_qps'],
            requirements.get('peak_multiplier', 3.0),
            requirements.get('target_response_time_ms', 100)
        )
        
        # 集群架构建议
        total_nodes = compute_req['required_nodes']
        master_nodes = 3
        data_nodes = max(3, total_nodes - master_nodes - 2)
        coordinating_nodes = 2
        
        return {
            'storage_requirements': storage_req,
            'compute_requirements': compute_req,
            'cluster_architecture': {
                'master_nodes': master_nodes,
                'data_nodes': data_nodes,
                'coordinating_nodes': coordinating_nodes,
                'total_nodes': master_nodes + data_nodes + coordinating_nodes
            },
            'cost_estimation': self._estimate_costs(storage_req, compute_req),
            'timeline': {
                'planning_phase': '2 weeks',
                'procurement_phase': '4 weeks',
                'deployment_phase': '3 weeks',
                'testing_phase': '2 weeks',
                'total_timeline': '11 weeks'
            }
        }
    
    def _estimate_costs(self, storage_req: Dict, compute_req: Dict) -> Dict:
        """估算成本"""
        # 存储成本(每GB每月)
        storage_cost_per_gb_month = 0.10
        monthly_storage_cost = storage_req['recommended_storage_gb'] * storage_cost_per_gb_month
        
        # 计算成本(每核心每月)
        compute_cost_per_core_month = 50
        monthly_compute_cost = compute_req['total_cpu_cores'] * compute_cost_per_core_month
        
        # 网络和其他成本
        monthly_network_cost = 500
        monthly_other_cost = 200
        
        total_monthly_cost = (
            monthly_storage_cost + 
            monthly_compute_cost + 
            monthly_network_cost + 
            monthly_other_cost
        )
        
        return {
            'monthly_storage_cost': monthly_storage_cost,
            'monthly_compute_cost': monthly_compute_cost,
            'monthly_network_cost': monthly_network_cost,
            'monthly_other_cost': monthly_other_cost,
            'total_monthly_cost': total_monthly_cost,
            'annual_cost': total_monthly_cost * 12
        }

# 使用示例
planner = CapacityPlanner()

# 定义需求
requirements = {
    'current_data_gb': 1000,
    'retention_days': 90,
    'daily_growth_gb': 50,
    'avg_qps': 100,
    'peak_multiplier': 3.0,
    'target_response_time_ms': 100
}

# 生成容量规划
capacity_plan = planner.generate_capacity_plan(requirements)

print("=== Elasticsearch 容量规划报告 ===")
print(f"存储需求: {capacity_plan['storage_requirements']['recommended_storage_gb']:.0f} GB")
print(f"计算需求: {capacity_plan['compute_requirements']['required_nodes']} 个节点")
print(f"集群架构: {capacity_plan['cluster_architecture']}")
print(f"月度成本: ${capacity_plan['cost_estimation']['total_monthly_cost']:.2f}")
print(f"年度成本: ${capacity_plan['cost_estimation']['annual_cost']:.2f}")

4.2 性能优化策略

4.2.1 索引优化

class IndexOptimizer:
    """索引优化器"""
    
    def __init__(self, es_client):
        self.es = es_client
    
    def optimize_index_settings(self, index_name: str, optimization_type: str = 'balanced'):
        """优化索引设置"""
        
        optimization_configs = {
            'write_heavy': {
                'refresh_interval': '30s',
                'number_of_replicas': 0,
                'translog': {
                    'flush_threshold_size': '1gb',
                    'sync_interval': '30s'
                },
                'merge': {
                    'policy': {
                        'max_merge_at_once': 30,
                        'segments_per_tier': 30
                    }
                }
            },
            'read_heavy': {
                'refresh_interval': '1s',
                'number_of_replicas': 2,
                'cache': {
                    'query_cache_enabled': True,
                    'request_cache_enabled': True
                },
                'merge': {
                    'policy': {
                        'max_merge_at_once': 5,
                        'segments_per_tier': 5
                    }
                }
            },
            'balanced': {
                'refresh_interval': '5s',
                'number_of_replicas': 1,
                'translog': {
                    'flush_threshold_size': '512mb',
                    'sync_interval': '5s'
                },
                'merge': {
                    'policy': {
                        'max_merge_at_once': 10,
                        'segments_per_tier': 10
                    }
                }
            }
        }
        
        config = optimization_configs.get(optimization_type, optimization_configs['balanced'])
        
        # 应用设置
        self.es.indices.put_settings(
            index=index_name,
            body={
                'settings': config
            }
        )
        
        return config
    
    def optimize_mapping(self, index_name: str, field_analysis: Dict) -> Dict:
        """优化映射配置"""
        optimized_mapping = {
            'properties': {}
        }
        
        for field_name, field_info in field_analysis.items():
            field_type = field_info['type']
            cardinality = field_info.get('cardinality', 0)
            search_frequency = field_info.get('search_frequency', 0)
            
            if field_type == 'text':
                # 文本字段优化
                field_config = {
                    'type': 'text',
                    'analyzer': 'standard'
                }
                
                # 高搜索频率的字段添加keyword子字段
                if search_frequency > 0.1:
                    field_config['fields'] = {
                        'keyword': {
                            'type': 'keyword',
                            'ignore_above': 256
                        }
                    }
                
                # 低搜索频率的字段禁用索引
                if search_frequency < 0.01:
                    field_config['index'] = False
                    
            elif field_type == 'keyword':
                # 关键字段优化
                field_config = {
                    'type': 'keyword'
                }
                
                # 高基数字段优化
                if cardinality > 10000:
                    field_config['doc_values'] = False
                    field_config['index'] = search_frequency > 0.01
                
                # 低基数字段启用eager_global_ordinals
                if cardinality < 100 and search_frequency > 0.1:
                    field_config['eager_global_ordinals'] = True
                    
            elif field_type in ['long', 'integer', 'float', 'double']:
                # 数值字段优化
                field_config = {
                    'type': field_type
                }
                
                # 不需要搜索的数值字段
                if search_frequency < 0.01:
                    field_config['index'] = False
                    
            elif field_type == 'date':
                # 日期字段优化
                field_config = {
                    'type': 'date',
                    'format': 'strict_date_optional_time||epoch_millis'
                }
                
            else:
                # 默认配置
                field_config = {
                    'type': field_type
                }
            
            optimized_mapping['properties'][field_name] = field_config
        
        return optimized_mapping
    
    def analyze_index_performance(self, index_name: str) -> Dict:
        """分析索引性能"""
        # 获取索引统计信息
        stats = self.es.indices.stats(index=index_name)
        index_stats = stats['indices'][index_name]
        
        # 获取索引设置
        settings = self.es.indices.get_settings(index=index_name)
        index_settings = settings[index_name]['settings']['index']
        
        # 分析结果
        analysis = {
            'index_size': {
                'total_size_bytes': index_stats['total']['store']['size_in_bytes'],
                'docs_count': index_stats['total']['docs']['count'],
                'avg_doc_size_bytes': index_stats['total']['store']['size_in_bytes'] / max(1, index_stats['total']['docs']['count'])
            },
            'performance_metrics': {
                'search_time_ms': index_stats['total']['search']['query_time_in_millis'],
                'search_count': index_stats['total']['search']['query_total'],
                'avg_search_time_ms': index_stats['total']['search']['query_time_in_millis'] / max(1, index_stats['total']['search']['query_total']),
                'indexing_time_ms': index_stats['total']['indexing']['index_time_in_millis'],
                'indexing_count': index_stats['total']['indexing']['index_total'],
                'avg_indexing_time_ms': index_stats['total']['indexing']['index_time_in_millis'] / max(1, index_stats['total']['indexing']['index_total'])
            },
            'current_settings': {
                'refresh_interval': index_settings.get('refresh_interval', '1s'),
                'number_of_replicas': index_settings.get('number_of_replicas', '1'),
                'number_of_shards': index_settings.get('number_of_shards', '1')
            },
            'recommendations': self._generate_recommendations(index_stats, index_settings)
        }
        
        return analysis
    
    def _generate_recommendations(self, index_stats: Dict, index_settings: Dict) -> List[str]:
        """生成优化建议"""
        recommendations = []
        
        # 文档大小建议
        docs_count = index_stats['total']['docs']['count']
        total_size = index_stats['total']['store']['size_in_bytes']
        avg_doc_size = total_size / max(1, docs_count)
        
        if avg_doc_size > 100 * 1024:  # 100KB
            recommendations.append("文档平均大小较大,考虑拆分大字段或使用嵌套对象")
        
        # 搜索性能建议
        search_count = index_stats['total']['search']['query_total']
        search_time = index_stats['total']['search']['query_time_in_millis']
        avg_search_time = search_time / max(1, search_count)
        
        if avg_search_time > 100:  # 100ms
            recommendations.append("搜索响应时间较慢,考虑优化查询或增加副本")
        
        # 索引性能建议
        indexing_count = index_stats['total']['indexing']['index_total']
        indexing_time = index_stats['total']['indexing']['index_time_in_millis']
        avg_indexing_time = indexing_time / max(1, indexing_count)
        
        if avg_indexing_time > 10:  # 10ms
            recommendations.append("索引写入较慢,考虑调整refresh_interval或减少副本")
        
        # 分片大小建议
        shard_count = int(index_settings.get('number_of_shards', '1'))
        shard_size = total_size / shard_count
        
        if shard_size > 50 * 1024 * 1024 * 1024:  # 50GB
            recommendations.append("分片过大,考虑增加分片数量")
        elif shard_size < 1 * 1024 * 1024 * 1024:  # 1GB
            recommendations.append("分片过小,考虑减少分片数量")
        
        return recommendations

# 使用示例
from elasticsearch import Elasticsearch

es = Elasticsearch(['localhost:9200'])
optimizer = IndexOptimizer(es)

# 分析索引性能
performance_analysis = optimizer.analyze_index_performance('products')
print("索引性能分析:")
print(f"- 索引大小: {performance_analysis['index_size']['total_size_bytes'] / (1024**3):.2f} GB")
print(f"- 平均搜索时间: {performance_analysis['performance_metrics']['avg_search_time_ms']:.2f} ms")
print(f"- 平均索引时间: {performance_analysis['performance_metrics']['avg_indexing_time_ms']:.2f} ms")
print("优化建议:")
for recommendation in performance_analysis['recommendations']:
    print(f"- {recommendation}")

# 优化索引设置
optimized_settings = optimizer.optimize_index_settings('products', 'read_heavy')
print(f"\n已应用读取优化配置: {optimized_settings}")

4.2.2 查询优化

class QueryOptimizer:
    """查询优化器"""
    
    def __init__(self, es_client):
        self.es = es_client
    
    def optimize_query(self, query: Dict, optimization_hints: Dict = None) -> Dict:
        """优化查询"""
        optimized_query = query.copy()
        hints = optimization_hints or {}
        
        # 1. 优化bool查询
        if 'bool' in query.get('query', {}):
            optimized_query['query']['bool'] = self._optimize_bool_query(
                query['query']['bool'], hints
            )
        
        # 2. 添加查询缓存
        if hints.get('enable_cache', True):
            optimized_query['request_cache'] = True
        
        # 3. 优化聚合
        if 'aggs' in query:
            optimized_query['aggs'] = self._optimize_aggregations(
                query['aggs'], hints
            )
        
        # 4. 优化排序
        if 'sort' in query:
            optimized_query['sort'] = self._optimize_sort(
                query['sort'], hints
            )
        
        # 5. 优化字段选择
        if '_source' not in query and not hints.get('need_full_source', False):
            optimized_query['_source'] = False
        
        # 6. 添加超时设置
        if 'timeout' not in query:
            optimized_query['timeout'] = '30s'
        
        return optimized_query
    
    def _optimize_bool_query(self, bool_query: Dict, hints: Dict) -> Dict:
        """优化bool查询"""
        optimized_bool = bool_query.copy()
        
        # 将高选择性的过滤器放在前面
        if 'filter' in bool_query:
            filters = bool_query['filter']
            if isinstance(filters, list):
                # 根据选择性排序过滤器
                sorted_filters = self._sort_filters_by_selectivity(filters)
                optimized_bool['filter'] = sorted_filters
        
        # 将should查询转换为filter(如果可能)
        if 'should' in bool_query and hints.get('convert_should_to_filter', False):
            should_queries = bool_query['should']
            filter_queries = []
            remaining_should = []
            
            for should_query in should_queries:
                if self._can_convert_to_filter(should_query):
                    filter_queries.append(should_query)
                else:
                    remaining_should.append(should_query)
            
            if filter_queries:
                optimized_bool.setdefault('filter', []).extend(filter_queries)
            if remaining_should:
                optimized_bool['should'] = remaining_should
            elif 'should' in optimized_bool:
                del optimized_bool['should']
        
        return optimized_bool
    
    def _sort_filters_by_selectivity(self, filters: List[Dict]) -> List[Dict]:
        """根据选择性排序过滤器"""
        # 简单的启发式排序
        selectivity_order = {
            'term': 1,      # 最高选择性
            'terms': 2,
            'range': 3,
            'exists': 4,
            'prefix': 5,
            'wildcard': 6   # 最低选择性
        }
        
        def get_selectivity(filter_query):
            for query_type in filter_query.keys():
                return selectivity_order.get(query_type, 10)
            return 10
        
        return sorted(filters, key=get_selectivity)
    
    def _can_convert_to_filter(self, query: Dict) -> bool:
        """判断查询是否可以转换为过滤器"""
        # 精确匹配查询可以转换为过滤器
        convertible_types = ['term', 'terms', 'range', 'exists', 'prefix']
        return any(query_type in query for query_type in convertible_types)
    
    def _optimize_aggregations(self, aggs: Dict, hints: Dict) -> Dict:
        """优化聚合"""
        optimized_aggs = {}
        
        for agg_name, agg_config in aggs.items():
            optimized_config = agg_config.copy()
            
            # 优化terms聚合
            if 'terms' in agg_config:
                terms_config = agg_config['terms']
                
                # 添加执行提示
                if 'execution_hint' not in terms_config:
                    cardinality = hints.get(f'{agg_name}_cardinality', 1000)
                    if cardinality > 10000:
                        terms_config['execution_hint'] = 'map'
                    else:
                        terms_config['execution_hint'] = 'global_ordinals'
                
                # 限制聚合大小
                if 'size' not in terms_config:
                    terms_config['size'] = min(100, hints.get('max_agg_size', 100))
                
                optimized_config['terms'] = terms_config
            
            # 优化date_histogram聚合
            elif 'date_histogram' in agg_config:
                date_hist_config = agg_config['date_histogram']
                
                # 使用固定间隔而不是日历间隔(性能更好)
                if 'calendar_interval' in date_hist_config:
                    calendar_interval = date_hist_config['calendar_interval']
                    if calendar_interval in ['1h', '1d', '1w']:
                        del date_hist_config['calendar_interval']
                        interval_map = {'1h': '1h', '1d': '1d', '1w': '7d'}
                        date_hist_config['fixed_interval'] = interval_map[calendar_interval]
                
                optimized_config['date_histogram'] = date_hist_config
            
            # 递归优化子聚合
            if 'aggs' in agg_config:
                optimized_config['aggs'] = self._optimize_aggregations(
                    agg_config['aggs'], hints
                )
            
            optimized_aggs[agg_name] = optimized_config
        
        return optimized_aggs
    
    def _optimize_sort(self, sort: List, hints: Dict) -> List:
        """优化排序"""
        optimized_sort = []
        
        for sort_item in sort:
            if isinstance(sort_item, dict):
                for field, sort_config in sort_item.items():
                    optimized_item = {field: sort_config}
                    
                    # 为数值字段添加数值类型排序
                    if hints.get(f'{field}_type') == 'numeric':
                        if isinstance(sort_config, dict):
                            sort_config['numeric_type'] = 'double'
                        else:
                            optimized_item[field] = {
                                'order': sort_config,
                                'numeric_type': 'double'
                            }
                    
                    optimized_sort.append(optimized_item)
            else:
                optimized_sort.append(sort_item)
        
        return optimized_sort
    
    def analyze_slow_queries(self, index_pattern: str = '*') -> List[Dict]:
        """分析慢查询"""
        # 获取慢查询日志
        slow_queries = []
        
        try:
            # 这里应该从Elasticsearch的慢查询日志中读取
            # 为了演示,我们模拟一些慢查询
            slow_queries = [
                {
                    'query': {
                        'bool': {
                            'should': [
                                {'wildcard': {'title': '*search*'}},
                                {'wildcard': {'description': '*search*'}}
                            ]
                        }
                    },
                    'took_ms': 1500,
                    'index': 'products',
                    'issues': ['使用了低效的wildcard查询', '缺少minimum_should_match参数']
                },
                {
                    'query': {
                        'range': {
                            'timestamp': {
                                'gte': 'now-30d',
                                'lte': 'now'
                            }
                        }
                    },
                    'sort': [{'timestamp': 'desc'}],
                    'size': 10000,
                    'took_ms': 2000,
                    'index': 'logs',
                    'issues': ['返回结果过多', '深度分页性能问题']
                }
            ]
        except Exception as e:
            print(f"获取慢查询失败: {e}")
        
        return slow_queries
    
    def suggest_query_improvements(self, query: Dict) -> List[str]:
        """建议查询改进"""
        suggestions = []
        
        # 检查wildcard查询
        if self._contains_wildcard(query):
            suggestions.append("避免使用wildcard查询,考虑使用match查询或n-gram分析器")
        
        # 检查深度分页
        size = query.get('size', 10)
        from_param = query.get('from', 0)
        if from_param + size > 10000:
            suggestions.append("避免深度分页,使用search_after或scroll API")
        
        # 检查排序字段
        if 'sort' in query:
            for sort_item in query['sort']:
                if isinstance(sort_item, dict):
                    for field in sort_item.keys():
                        if field.endswith('.keyword'):
                            continue
                        suggestions.append(f"排序字段 {field} 应该使用keyword类型")
        
        # 检查聚合大小
        if 'aggs' in query:
            for agg_name, agg_config in query['aggs'].items():
                if 'terms' in agg_config:
                    size = agg_config['terms'].get('size', 10)
                    if size > 1000:
                        suggestions.append(f"聚合 {agg_name} 的size过大,考虑使用composite聚合")
        
        return suggestions
    
    def _contains_wildcard(self, obj) -> bool:
        """检查是否包含wildcard查询"""
        if isinstance(obj, dict):
            if 'wildcard' in obj:
                return True
            return any(self._contains_wildcard(v) for v in obj.values())
        elif isinstance(obj, list):
            return any(self._contains_wildcard(item) for item in obj)
        return False

# 使用示例
query_optimizer = QueryOptimizer(es)

# 原始查询
original_query = {
    "query": {
        "bool": {
            "should": [
                {"term": {"category": "electronics"}},
                {"range": {"price": {"gte": 100, "lte": 500}}}
            ],
            "filter": [
                {"exists": {"field": "description"}},
                {"term": {"status": "active"}}
            ]
        }
    },
    "aggs": {
        "categories": {
            "terms": {
                "field": "category",
                "size": 50
            }
        }
    },
    "size": 20
}

# 优化查询
optimization_hints = {
    'enable_cache': True,
    'convert_should_to_filter': True,
    'categories_cardinality': 100
}

optimized_query = query_optimizer.optimize_query(original_query, optimization_hints)
print("优化后的查询:")
print(json.dumps(optimized_query, indent=2))

# 获取查询改进建议
suggestions = query_optimizer.suggest_query_improvements(original_query)
print("\n查询改进建议:")
for suggestion in suggestions:
    print(f"- {suggestion}")

4.3 监控和告警

class ElasticsearchMonitor:
    """Elasticsearch监控系统"""
    
    def __init__(self, es_hosts: List[str]):
        self.es = Elasticsearch(es_hosts)
        self.metrics_history = []
        
    def collect_cluster_metrics(self) -> Dict:
        """收集集群指标"""
        try:
            # 集群健康状态
            health = self.es.cluster.health()
            
            # 集群统计信息
            stats = self.es.cluster.stats()
            
            # 节点信息
            nodes_info = self.es.nodes.info()
            nodes_stats = self.es.nodes.stats()
            
            # 索引统计
            indices_stats = self.es.indices.stats()
            
            metrics = {
                'timestamp': datetime.now().isoformat(),
                'cluster': {
                    'status': health['status'],
                    'number_of_nodes': health['number_of_nodes'],
                    'number_of_data_nodes': health['number_of_data_nodes'],
                    'active_primary_shards': health['active_primary_shards'],
                    'active_shards': health['active_shards'],
                    'relocating_shards': health['relocating_shards'],
                    'initializing_shards': health['initializing_shards'],
                    'unassigned_shards': health['unassigned_shards'],
                    'delayed_unassigned_shards': health['delayed_unassigned_shards'],
                    'number_of_pending_tasks': health['number_of_pending_tasks'],
                    'number_of_in_flight_fetch': health['number_of_in_flight_fetch'],
                    'task_max_waiting_in_queue_millis': health['task_max_waiting_in_queue_millis'],
                    'active_shards_percent_as_number': health['active_shards_percent_as_number']
                },
                'indices': {
                    'count': stats['indices']['count'],
                    'docs_count': stats['indices']['docs']['count'],
                    'docs_deleted': stats['indices']['docs']['deleted'],
                    'store_size_bytes': stats['indices']['store']['size_in_bytes'],
                    'segments_count': stats['indices']['segments']['count'],
                    'segments_memory_bytes': stats['indices']['segments']['memory_in_bytes']
                },
                'nodes': self._collect_nodes_metrics(nodes_stats),
                'performance': self._collect_performance_metrics(indices_stats)
            }
            
            self.metrics_history.append(metrics)
            return metrics
            
        except Exception as e:
            print(f"收集集群指标失败: {e}")
            return {}
    
    def _collect_nodes_metrics(self, nodes_stats: Dict) -> Dict:
        """收集节点指标"""
        nodes_metrics = {
            'total_nodes': len(nodes_stats['nodes']),
            'nodes_detail': []
        }
        
        total_heap_used = 0
        total_heap_max = 0
        total_cpu_percent = 0
        total_load_avg = 0
        node_count = 0
        
        for node_id, node_stats in nodes_stats['nodes'].items():
            node_metrics = {
                'node_id': node_id,
                'name': node_stats['name'],
                'heap_used_percent': node_stats['jvm']['mem']['heap_used_percent'],
                'heap_used_bytes': node_stats['jvm']['mem']['heap_used_in_bytes'],
                'heap_max_bytes': node_stats['jvm']['mem']['heap_max_in_bytes'],
                'cpu_percent': node_stats['os']['cpu']['percent'],
                'load_average': node_stats['os']['cpu'].get('load_average', {}).get('1m', 0),
                'disk_total_bytes': node_stats['fs']['total']['total_in_bytes'],
                'disk_available_bytes': node_stats['fs']['total']['available_in_bytes'],
                'disk_used_percent': ((node_stats['fs']['total']['total_in_bytes'] - 
                                     node_stats['fs']['total']['available_in_bytes']) / 
                                    node_stats['fs']['total']['total_in_bytes']) * 100
            }
            
            nodes_metrics['nodes_detail'].append(node_metrics)
            
            total_heap_used += node_stats['jvm']['mem']['heap_used_in_bytes']
            total_heap_max += node_stats['jvm']['mem']['heap_max_in_bytes']
            total_cpu_percent += node_stats['os']['cpu']['percent']
            total_load_avg += node_stats['os']['cpu'].get('load_average', {}).get('1m', 0)
            node_count += 1
        
        nodes_metrics['cluster_averages'] = {
            'avg_heap_used_percent': (total_heap_used / total_heap_max) * 100 if total_heap_max > 0 else 0,
            'avg_cpu_percent': total_cpu_percent / node_count if node_count > 0 else 0,
            'avg_load_average': total_load_avg / node_count if node_count > 0 else 0
        }
        
        return nodes_metrics
    
    def _collect_performance_metrics(self, indices_stats: Dict) -> Dict:
        """收集性能指标"""
        all_stats = indices_stats['_all']['total']
        
        return {
            'search': {
                'query_total': all_stats['search']['query_total'],
                'query_time_millis': all_stats['search']['query_time_in_millis'],
                'query_current': all_stats['search']['query_current'],
                'fetch_total': all_stats['search']['fetch_total'],
                'fetch_time_millis': all_stats['search']['fetch_time_in_millis'],
                'fetch_current': all_stats['search']['fetch_current']
            },
            'indexing': {
                'index_total': all_stats['indexing']['index_total'],
                'index_time_millis': all_stats['indexing']['index_time_in_millis'],
                'index_current': all_stats['indexing']['index_current'],
                'delete_total': all_stats['indexing']['delete_total'],
                'delete_time_millis': all_stats['indexing']['delete_time_in_millis'],
                'delete_current': all_stats['indexing']['delete_current']
            },
            'get': {
                'total': all_stats['get']['total'],
                'time_millis': all_stats['get']['time_in_millis'],
                'exists_total': all_stats['get']['exists_total'],
                'exists_time_millis': all_stats['get']['exists_time_in_millis'],
                'missing_total': all_stats['get']['missing_total'],
                'missing_time_millis': all_stats['get']['missing_time_in_millis'],
                'current': all_stats['get']['current']
            },
            'refresh': {
                'total': all_stats['refresh']['total'],
                'total_time_millis': all_stats['refresh']['total_time_in_millis']
            },
            'flush': {
                'total': all_stats['flush']['total'],
                'total_time_millis': all_stats['flush']['total_time_in_millis']
            },
            'merge': {
                'current': all_stats['merges']['current'],
                'current_docs': all_stats['merges']['current_docs'],
                'current_size_bytes': all_stats['merges']['current_size_in_bytes'],
                'total': all_stats['merges']['total'],
                'total_time_millis': all_stats['merges']['total_time_in_millis'],
                'total_docs': all_stats['merges']['total_docs'],
                'total_size_bytes': all_stats['merges']['total_size_in_bytes']
            }
        }
    
    def check_cluster_health(self, metrics: Dict = None) -> Dict:
        """检查集群健康状态"""
        if not metrics:
            metrics = self.collect_cluster_metrics()
        
        health_issues = []
        health_score = 100
        
        cluster = metrics.get('cluster', {})
        nodes = metrics.get('nodes', {})
        
        # 检查集群状态
        if cluster.get('status') == 'red':
            health_issues.append("集群状态为红色,存在不可用的主分片")
            health_score -= 50
        elif cluster.get('status') == 'yellow':
            health_issues.append("集群状态为黄色,存在不可用的副本分片")
            health_score -= 20
        
        # 检查未分配分片
        unassigned_shards = cluster.get('unassigned_shards', 0)
        if unassigned_shards > 0:
            health_issues.append(f"存在 {unassigned_shards} 个未分配的分片")
            health_score -= min(30, unassigned_shards * 5)
        
        # 检查节点资源使用
        cluster_averages = nodes.get('cluster_averages', {})
        
        avg_heap_used = cluster_averages.get('avg_heap_used_percent', 0)
        if avg_heap_used > 85:
            health_issues.append(f"平均堆内存使用率过高: {avg_heap_used:.1f}%")
            health_score -= 20
        elif avg_heap_used > 75:
            health_issues.append(f"平均堆内存使用率较高: {avg_heap_used:.1f}%")
            health_score -= 10
        
        avg_cpu = cluster_averages.get('avg_cpu_percent', 0)
        if avg_cpu > 80:
            health_issues.append(f"平均CPU使用率过高: {avg_cpu:.1f}%")
            health_score -= 15
        
        # 检查磁盘使用
        for node in nodes.get('nodes_detail', []):
            disk_used = node.get('disk_used_percent', 0)
            if disk_used > 90:
                health_issues.append(f"节点 {node['name']} 磁盘使用率过高: {disk_used:.1f}%")
                health_score -= 25
            elif disk_used > 80:
                health_issues.append(f"节点 {node['name']} 磁盘使用率较高: {disk_used:.1f}%")
                health_score -= 10
        
        # 检查待处理任务
        pending_tasks = cluster.get('number_of_pending_tasks', 0)
        if pending_tasks > 100:
            health_issues.append(f"待处理任务过多: {pending_tasks}")
            health_score -= 15
        
        return {
            'health_score': max(0, health_score),
            'status': self._get_health_status(health_score),
            'issues': health_issues,
            'recommendations': self._get_health_recommendations(health_issues)
        }
    
    def _get_health_status(self, score: int) -> str:
        """根据健康分数获取状态"""
        if score >= 90:
            return 'excellent'
        elif score >= 75:
            return 'good'
        elif score >= 60:
            return 'warning'
        elif score >= 40:
            return 'critical'
        else:
            return 'emergency'
    
    def _get_health_recommendations(self, issues: List[str]) -> List[str]:
        """根据问题生成建议"""
        recommendations = []
        
        for issue in issues:
            if '堆内存使用率' in issue:
                recommendations.append("考虑增加堆内存大小或优化查询以减少内存使用")
            elif 'CPU使用率' in issue:
                recommendations.append("考虑增加节点数量或优化查询性能")
            elif '磁盘使用率' in issue:
                recommendations.append("清理旧数据、增加存储容量或启用数据压缩")
            elif '未分配的分片' in issue:
                recommendations.append("检查节点可用性和分片分配设置")
            elif '待处理任务' in issue:
                recommendations.append("检查集群负载和主节点性能")
        
        return list(set(recommendations))  # 去重
    
    def generate_health_report(self) -> str:
        """生成健康报告"""
        metrics = self.collect_cluster_metrics()
        health_check = self.check_cluster_health(metrics)
        
        report = f"""
=== Elasticsearch 集群健康报告 ===
生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}

集群概览:
- 状态: {metrics['cluster']['status'].upper()}
- 节点数: {metrics['cluster']['number_of_nodes']}
- 数据节点数: {metrics['cluster']['number_of_data_nodes']}
- 活跃分片: {metrics['cluster']['active_shards']}
- 主分片: {metrics['cluster']['active_primary_shards']}
- 未分配分片: {metrics['cluster']['unassigned_shards']}

健康评分: {health_check['health_score']}/100 ({health_check['status'].upper()})

资源使用:
- 平均堆内存使用: {metrics['nodes']['cluster_averages']['avg_heap_used_percent']:.1f}%
- 平均CPU使用: {metrics['nodes']['cluster_averages']['avg_cpu_percent']:.1f}%
- 平均负载: {metrics['nodes']['cluster_averages']['avg_load_average']:.2f}

数据统计:
- 索引数量: {metrics['indices']['count']}
- 文档总数: {metrics['indices']['docs_count']:,}
- 存储大小: {metrics['indices']['store_size_bytes'] / (1024**3):.2f} GB
- 段数量: {metrics['indices']['segments_count']:,}

性能指标:
- 搜索查询总数: {metrics['performance']['search']['query_total']:,}
- 平均查询时间: {metrics['performance']['search']['query_time_millis'] / max(1, metrics['performance']['search']['query_total']):.2f} ms
- 索引操作总数: {metrics['performance']['indexing']['index_total']:,}
- 平均索引时间: {metrics['performance']['indexing']['index_time_millis'] / max(1, metrics['performance']['indexing']['index_total']):.2f} ms
"""
        
        if health_check['issues']:
            report += "\n发现的问题:\n"
            for issue in health_check['issues']:
                report += f"- {issue}\n"
        
        if health_check['recommendations']:
            report += "\n建议措施:\n"
            for recommendation in health_check['recommendations']:
                report += f"- {recommendation}\n"
        
        return report

# 使用示例
monitor = ElasticsearchMonitor(['localhost:9200'])

# 生成健康报告
health_report = monitor.generate_health_report()
print(health_report)

# 持续监控
def continuous_monitoring():
    while True:
        try:
            metrics = monitor.collect_cluster_metrics()
            health_check = monitor.check_cluster_health(metrics)
            
            if health_check['health_score'] < 70:
                print(f"[WARNING] 集群健康分数: {health_check['health_score']}")
                for issue in health_check['issues']:
                    print(f"  - {issue}")
            
            time.sleep(60)  # 每分钟检查一次
            
        except Exception as e:
            print(f"监控失败: {e}")
            time.sleep(60)

# 启动持续监控(在实际使用中)
# continuous_monitoring()

5. 章节总结

5.1 核心知识点

  1. 电商搜索系统

    • 多维度搜索和过滤
    • 个性化推荐
    • 性能优化策略
    • 用户行为分析
  2. 日志分析系统

    • 实时日志处理
    • 安全威胁检测
    • 性能监控
    • 异常检测
  3. 实时监控系统

    • 指标收集和存储
    • 告警规则和处理
    • 多渠道通知
    • 自动化运维
  4. 最佳实践

    • 分层架构设计
    • 容量规划方法
    • 性能优化技巧
    • 监控和告警策略

5.2 实践要点

  1. 架构设计

    • 合理的分层架构
    • 可扩展的设计模式
    • 高可用性保障
    • 性能优化考虑
  2. 数据建模

    • 合适的索引结构
    • 优化的映射配置
    • 有效的分片策略
    • 生命周期管理
  3. 查询优化

    • 高效的查询语句
    • 合理的聚合设计
    • 缓存策略应用
    • 性能监控分析
  4. 运维管理

    • 全面的监控体系
    • 及时的告警机制
    • 自动化运维工具
    • 容量规划管理

5.3 练习题

  1. 系统设计题

    • 设计一个支持千万级商品的电商搜索系统
    • 设计一个处理TB级日志的分析系统
    • 设计一个实时监控告警系统
  2. 优化题

    • 优化一个响应时间超过1秒的复杂查询
    • 优化一个写入TPS不足的索引配置
    • 优化一个内存使用过高的聚合查询
  3. 故障排查题

    • 排查集群状态为红色的原因
    • 排查查询性能突然下降的问题
    • 排查索引写入失败的原因
  4. 容量规划题

    • 为一个日增长100GB的系统做容量规划
    • 为一个峰值1000QPS的系统做性能规划
    • 为一个多租户系统做资源分配规划

通过本章的学习,你应该能够: - 设计和实现复杂的Elasticsearch应用系统 - 掌握性能优化的方法和技巧 - 建立完善的监控和告警体系 - 进行有效的容量规划和架构设计 - 解决实际生产环境中的各种问题

这些实际项目案例和最佳实践将帮助你在实际工作中更好地应用Elasticsearch技术,构建高性能、高可用的搜索和分析系统。 “`