学习目标
通过本章学习,您将掌握: - 聚合分析的基本概念和类型 - 桶聚合(Bucket Aggregations)的使用 - 指标聚合(Metric Aggregations)的应用 - 管道聚合(Pipeline Aggregations)的高级用法 - 矩阵聚合(Matrix Aggregations)的特殊场景 - 聚合的嵌套和组合使用 - 实际业务场景中的聚合分析案例
1. 聚合分析基础
1.1 聚合概念
Elasticsearch聚合分析是对数据进行分组、统计和计算的强大功能,类似于SQL中的GROUP BY和聚合函数。
1.2 聚合类型
- 桶聚合(Bucket Aggregations):将文档分组到不同的桶中
- 指标聚合(Metric Aggregations):对数据进行数学计算
- 管道聚合(Pipeline Aggregations):对其他聚合的结果进行计算
- 矩阵聚合(Matrix Aggregations):对多个字段进行矩阵运算
1.3 基本语法结构
GET /index_name/_search
{
"size": 0, # 不返回文档,只返回聚合结果
"aggs": {
"aggregation_name": {
"aggregation_type": {
# 聚合配置
},
"aggs": {
# 子聚合
}
}
}
}
2. 桶聚合(Bucket Aggregations)
2.1 Terms聚合
# 基本terms聚合
GET /products/_search
{
"size": 0,
"aggs": {
"categories": {
"terms": {
"field": "category.keyword",
"size": 10
}
}
}
}
# 高级terms聚合
GET /products/_search
{
"size": 0,
"aggs": {
"top_brands": {
"terms": {
"field": "brand.keyword",
"size": 5,
"order": {
"avg_price": "desc"
},
"min_doc_count": 10,
"include": ["Apple", "Samsung", "Google"],
"exclude": ["Unknown"]
},
"aggs": {
"avg_price": {
"avg": {
"field": "price"
}
}
}
}
}
}
# 多字段terms聚合
GET /products/_search
{
"size": 0,
"aggs": {
"brand_category": {
"multi_terms": {
"terms": [
{
"field": "brand.keyword"
},
{
"field": "category.keyword"
}
],
"size": 10
}
}
}
}
2.2 Range聚合
# 价格范围聚合
GET /products/_search
{
"size": 0,
"aggs": {
"price_ranges": {
"range": {
"field": "price",
"ranges": [
{
"key": "cheap",
"to": 100
},
{
"key": "moderate",
"from": 100,
"to": 500
},
{
"key": "expensive",
"from": 500
}
]
},
"aggs": {
"avg_rating": {
"avg": {
"field": "rating"
}
}
}
}
}
}
# 日期范围聚合
GET /orders/_search
{
"size": 0,
"aggs": {
"sales_over_time": {
"date_range": {
"field": "order_date",
"format": "yyyy-MM-dd",
"ranges": [
{
"key": "last_week",
"from": "now-7d/d",
"to": "now/d"
},
{
"key": "last_month",
"from": "now-1M/M",
"to": "now/M"
},
{
"key": "last_year",
"from": "now-1y/y",
"to": "now/y"
}
]
}
}
}
}
2.3 Histogram聚合
# 价格直方图
GET /products/_search
{
"size": 0,
"aggs": {
"price_histogram": {
"histogram": {
"field": "price",
"interval": 100,
"min_doc_count": 1,
"extended_bounds": {
"min": 0,
"max": 2000
}
}
}
}
}
# 日期直方图
GET /logs/_search
{
"size": 0,
"aggs": {
"requests_over_time": {
"date_histogram": {
"field": "@timestamp",
"calendar_interval": "1h",
"time_zone": "+08:00",
"min_doc_count": 0,
"extended_bounds": {
"min": "now-24h",
"max": "now"
}
},
"aggs": {
"error_rate": {
"filter": {
"term": {
"level.keyword": "ERROR"
}
}
},
"avg_response_time": {
"avg": {
"field": "response_time"
}
}
}
}
}
}
# 自动间隔日期直方图
GET /logs/_search
{
"size": 0,
"aggs": {
"auto_date_histogram": {
"auto_date_histogram": {
"field": "@timestamp",
"buckets": 20,
"minimum_interval": "minute"
}
}
}
}
2.4 Filter聚合
# 单个过滤器聚合
GET /products/_search
{
"size": 0,
"aggs": {
"high_rated_products": {
"filter": {
"range": {
"rating": {
"gte": 4.0
}
}
},
"aggs": {
"avg_price": {
"avg": {
"field": "price"
}
}
}
}
}
}
# 多个过滤器聚合
GET /products/_search
{
"size": 0,
"aggs": {
"product_segments": {
"filters": {
"filters": {
"electronics": {
"term": {
"category.keyword": "Electronics"
}
},
"books": {
"term": {
"category.keyword": "Books"
}
},
"expensive": {
"range": {
"price": {
"gte": 1000
}
}
}
}
},
"aggs": {
"avg_rating": {
"avg": {
"field": "rating"
}
}
}
}
}
}
2.5 Nested聚合
# 嵌套对象聚合
GET /products/_search
{
"size": 0,
"aggs": {
"review_analysis": {
"nested": {
"path": "reviews"
},
"aggs": {
"avg_rating": {
"avg": {
"field": "reviews.rating"
}
},
"rating_distribution": {
"histogram": {
"field": "reviews.rating",
"interval": 1,
"min_doc_count": 0
}
},
"top_reviewers": {
"terms": {
"field": "reviews.author.keyword",
"size": 5
}
}
}
}
}
}
2.6 地理位置聚合
# 地理距离聚合
GET /stores/_search
{
"size": 0,
"aggs": {
"distance_from_center": {
"geo_distance": {
"field": "location",
"origin": {
"lat": 40.7128,
"lon": -74.0060
},
"ranges": [
{
"key": "nearby",
"to": 5000
},
{
"key": "medium",
"from": 5000,
"to": 20000
},
{
"key": "far",
"from": 20000
}
]
}
}
}
}
# 地理网格聚合
GET /stores/_search
{
"size": 0,
"aggs": {
"store_grid": {
"geohash_grid": {
"field": "location",
"precision": 5
}
}
}
}
# 地理边界聚合
GET /stores/_search
{
"size": 0,
"aggs": {
"store_bounds": {
"geo_bounds": {
"field": "location"
}
}
}
}
2.7 采样聚合
# 采样聚合(提高大数据集性能)
GET /large_dataset/_search
{
"size": 0,
"aggs": {
"sample": {
"sampler": {
"shard_size": 1000
},
"aggs": {
"top_categories": {
"terms": {
"field": "category.keyword",
"size": 10
}
}
}
}
}
}
# 多样性采样聚合
GET /products/_search
{
"size": 0,
"aggs": {
"diverse_sample": {
"diversified_sampler": {
"shard_size": 1000,
"field": "category.keyword"
},
"aggs": {
"avg_price": {
"avg": {
"field": "price"
}
}
}
}
}
}
3. 指标聚合(Metric Aggregations)
3.1 基础统计聚合
# 多种统计指标
GET /products/_search
{
"size": 0,
"aggs": {
"price_stats": {
"stats": {
"field": "price"
}
},
"price_extended_stats": {
"extended_stats": {
"field": "price",
"sigma": 2
}
},
"avg_price": {
"avg": {
"field": "price"
}
},
"max_price": {
"max": {
"field": "price"
}
},
"min_price": {
"min": {
"field": "price"
}
},
"sum_price": {
"sum": {
"field": "price"
}
},
"value_count": {
"value_count": {
"field": "price"
}
}
}
}
3.2 百分位数聚合
# 百分位数分析
GET /response_times/_search
{
"size": 0,
"aggs": {
"response_time_percentiles": {
"percentiles": {
"field": "response_time",
"percents": [25, 50, 75, 90, 95, 99],
"keyed": false
}
},
"response_time_percentile_ranks": {
"percentile_ranks": {
"field": "response_time",
"values": [100, 500, 1000]
}
}
}
}
# T-Digest百分位数(更精确)
GET /response_times/_search
{
"size": 0,
"aggs": {
"response_time_tdigest": {
"tdigest_percentiles": {
"field": "response_time",
"percents": [50, 90, 95, 99],
"compression": 200
}
}
}
}
# HDR百分位数(高动态范围)
GET /response_times/_search
{
"size": 0,
"aggs": {
"response_time_hdr": {
"hdr_percentiles": {
"field": "response_time",
"percents": [50, 90, 95, 99],
"number_of_significant_value_digits": 3
}
}
}
}
3.3 基数聚合
# 唯一值计数
GET /logs/_search
{
"size": 0,
"aggs": {
"unique_users": {
"cardinality": {
"field": "user_id.keyword",
"precision_threshold": 10000
}
},
"unique_ips": {
"cardinality": {
"field": "client_ip.keyword"
}
}
}
}
3.4 脚本聚合
# 脚本指标聚合
GET /products/_search
{
"size": 0,
"aggs": {
"profit_margin": {
"avg": {
"script": {
"source": "(doc['price'].value - doc['cost'].value) / doc['price'].value * 100"
}
}
},
"weighted_rating": {
"sum": {
"script": {
"source": "doc['rating'].value * doc['review_count'].value",
"params": {
"factor": 1.2
}
}
}
}
}
}
3.5 Top Hits聚合
# 获取每个分组的顶部文档
GET /products/_search
{
"size": 0,
"aggs": {
"categories": {
"terms": {
"field": "category.keyword",
"size": 5
},
"aggs": {
"top_products": {
"top_hits": {
"sort": [
{
"rating": {
"order": "desc"
}
}
],
"_source": {
"includes": ["title", "price", "rating"]
},
"size": 3
}
}
}
}
}
}
4. 管道聚合(Pipeline Aggregations)
4.1 父级管道聚合
# 平均桶值
GET /sales/_search
{
"size": 0,
"aggs": {
"monthly_sales": {
"date_histogram": {
"field": "date",
"calendar_interval": "month"
},
"aggs": {
"total_sales": {
"sum": {
"field": "amount"
}
}
}
},
"avg_monthly_sales": {
"avg_bucket": {
"buckets_path": "monthly_sales>total_sales"
}
},
"max_monthly_sales": {
"max_bucket": {
"buckets_path": "monthly_sales>total_sales"
}
},
"min_monthly_sales": {
"min_bucket": {
"buckets_path": "monthly_sales>total_sales"
}
},
"sum_monthly_sales": {
"sum_bucket": {
"buckets_path": "monthly_sales>total_sales"
}
}
}
}
# 桶排序
GET /products/_search
{
"size": 0,
"aggs": {
"categories": {
"terms": {
"field": "category.keyword",
"size": 10
},
"aggs": {
"avg_price": {
"avg": {
"field": "price"
}
}
}
},
"categories_sorted_by_avg_price": {
"bucket_sort": {
"sort": [
{
"avg_price": {
"order": "desc"
}
}
],
"size": 5
}
}
}
}
# 桶脚本
GET /sales/_search
{
"size": 0,
"aggs": {
"monthly_sales": {
"date_histogram": {
"field": "date",
"calendar_interval": "month"
},
"aggs": {
"total_sales": {
"sum": {
"field": "amount"
}
},
"sales_growth": {
"bucket_script": {
"buckets_path": {
"current": "total_sales"
},
"script": "params.current * 1.1"
}
}
}
}
}
}
4.2 同级管道聚合
# 累积和
GET /sales/_search
{
"size": 0,
"aggs": {
"daily_sales": {
"date_histogram": {
"field": "date",
"calendar_interval": "day"
},
"aggs": {
"total_sales": {
"sum": {
"field": "amount"
}
},
"cumulative_sales": {
"cumulative_sum": {
"buckets_path": "total_sales"
}
}
}
}
}
}
# 移动平均
GET /sales/_search
{
"size": 0,
"aggs": {
"daily_sales": {
"date_histogram": {
"field": "date",
"calendar_interval": "day"
},
"aggs": {
"total_sales": {
"sum": {
"field": "amount"
}
},
"moving_avg": {
"moving_avg": {
"buckets_path": "total_sales",
"window": 7,
"model": "linear"
}
}
}
}
}
}
# 导数(变化率)
GET /sales/_search
{
"size": 0,
"aggs": {
"daily_sales": {
"date_histogram": {
"field": "date",
"calendar_interval": "day"
},
"aggs": {
"total_sales": {
"sum": {
"field": "amount"
}
},
"sales_derivative": {
"derivative": {
"buckets_path": "total_sales"
}
}
}
}
}
}
# 序列差分
GET /sales/_search
{
"size": 0,
"aggs": {
"daily_sales": {
"date_histogram": {
"field": "date",
"calendar_interval": "day"
},
"aggs": {
"total_sales": {
"sum": {
"field": "amount"
}
},
"sales_diff": {
"serial_diff": {
"buckets_path": "total_sales",
"lag": 1
}
}
}
}
}
}
5. 矩阵聚合(Matrix Aggregations)
# 矩阵统计
GET /products/_search
{
"size": 0,
"aggs": {
"price_rating_stats": {
"matrix_stats": {
"fields": ["price", "rating", "review_count"]
}
}
}
}
6. 复杂聚合组合
6.1 多层嵌套聚合
# 复杂的电商分析
GET /orders/_search
{
"size": 0,
"aggs": {
"monthly_analysis": {
"date_histogram": {
"field": "order_date",
"calendar_interval": "month"
},
"aggs": {
"total_revenue": {
"sum": {
"field": "total_amount"
}
},
"order_count": {
"value_count": {
"field": "order_id"
}
},
"avg_order_value": {
"avg": {
"field": "total_amount"
}
},
"customer_segments": {
"terms": {
"field": "customer_segment.keyword",
"size": 5
},
"aggs": {
"segment_revenue": {
"sum": {
"field": "total_amount"
}
},
"avg_segment_order_value": {
"avg": {
"field": "total_amount"
}
},
"top_products": {
"terms": {
"field": "product_category.keyword",
"size": 3
},
"aggs": {
"category_revenue": {
"sum": {
"field": "total_amount"
}
}
}
}
}
},
"payment_methods": {
"terms": {
"field": "payment_method.keyword"
},
"aggs": {
"method_revenue": {
"sum": {
"field": "total_amount"
}
}
}
}
}
},
"overall_stats": {
"global": {},
"aggs": {
"total_customers": {
"cardinality": {
"field": "customer_id.keyword"
}
},
"total_revenue": {
"sum": {
"field": "total_amount"
}
}
}
}
}
}
6.2 条件聚合
# 基于条件的聚合分析
GET /products/_search
{
"size": 0,
"aggs": {
"product_analysis": {
"filters": {
"filters": {
"high_end": {
"range": {
"price": {
"gte": 1000
}
}
},
"mid_range": {
"range": {
"price": {
"gte": 100,
"lt": 1000
}
}
},
"budget": {
"range": {
"price": {
"lt": 100
}
}
}
}
},
"aggs": {
"avg_rating": {
"avg": {
"field": "rating"
}
},
"brand_distribution": {
"terms": {
"field": "brand.keyword",
"size": 5
}
},
"rating_percentiles": {
"percentiles": {
"field": "rating",
"percents": [25, 50, 75, 90, 95]
}
}
}
}
}
}
7. 实践案例
7.1 电商业务分析系统
#!/usr/bin/env python3
# ecommerce_analytics.py
from elasticsearch import Elasticsearch
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional
import json
class EcommerceAnalytics:
def __init__(self, es_host="localhost:9200", username=None, password=None):
if username and password:
self.es = Elasticsearch(
[es_host],
http_auth=(username, password)
)
else:
self.es = Elasticsearch([es_host])
def sales_dashboard(self,
start_date: str = None,
end_date: str = None) -> Dict[str, Any]:
"""销售仪表板数据"""
# 构建时间过滤器
time_filter = []
if start_date or end_date:
time_range = {}
if start_date:
time_range["gte"] = start_date
if end_date:
time_range["lte"] = end_date
time_filter.append({
"range": {"order_date": time_range}
})
search_body = {
"size": 0,
"query": {
"bool": {
"filter": time_filter
}
} if time_filter else {"match_all": {}},
"aggs": {
# 总体指标
"total_revenue": {
"sum": {"field": "total_amount"}
},
"total_orders": {
"value_count": {"field": "order_id"}
},
"unique_customers": {
"cardinality": {"field": "customer_id.keyword"}
},
"avg_order_value": {
"avg": {"field": "total_amount"}
},
# 时间趋势
"daily_sales": {
"date_histogram": {
"field": "order_date",
"calendar_interval": "day"
},
"aggs": {
"revenue": {"sum": {"field": "total_amount"}},
"orders": {"value_count": {"field": "order_id"}},
"customers": {"cardinality": {"field": "customer_id.keyword"}}
}
},
# 产品分析
"top_categories": {
"terms": {
"field": "product_category.keyword",
"size": 10,
"order": {"category_revenue": "desc"}
},
"aggs": {
"category_revenue": {"sum": {"field": "total_amount"}},
"category_orders": {"value_count": {"field": "order_id"}},
"avg_order_value": {"avg": {"field": "total_amount"}}
}
},
# 客户分析
"customer_segments": {
"terms": {
"field": "customer_segment.keyword"
},
"aggs": {
"segment_revenue": {"sum": {"field": "total_amount"}},
"segment_customers": {"cardinality": {"field": "customer_id.keyword"}},
"avg_customer_value": {
"bucket_script": {
"buckets_path": {
"revenue": "segment_revenue",
"customers": "segment_customers"
},
"script": "params.revenue / params.customers"
}
}
}
},
# 地理分析
"sales_by_region": {
"terms": {
"field": "shipping_region.keyword",
"size": 20
},
"aggs": {
"region_revenue": {"sum": {"field": "total_amount"}},
"region_orders": {"value_count": {"field": "order_id"}}
}
},
# 支付方式分析
"payment_methods": {
"terms": {
"field": "payment_method.keyword"
},
"aggs": {
"method_revenue": {"sum": {"field": "total_amount"}},
"method_orders": {"value_count": {"field": "order_id"}}
}
},
# 订单价值分布
"order_value_distribution": {
"histogram": {
"field": "total_amount",
"interval": 50,
"min_doc_count": 1
}
},
# 复购分析
"repeat_customers": {
"scripted_metric": {
"init_script": "state.customers = [:]",
"map_script": """
String customerId = doc['customer_id.keyword'].value;
if (state.customers.containsKey(customerId)) {
state.customers[customerId]++;
} else {
state.customers[customerId] = 1;
}
""",
"combine_script": "return state.customers",
"reduce_script": """
Map combined = [:];
for (state in states) {
for (entry in state.entrySet()) {
if (combined.containsKey(entry.getKey())) {
combined[entry.getKey()] += entry.getValue();
} else {
combined[entry.getKey()] = entry.getValue();
}
}
}
int repeatCustomers = 0;
for (entry in combined.entrySet()) {
if (entry.getValue() > 1) {
repeatCustomers++;
}
}
return repeatCustomers;
"""
}
}
}
}
try:
response = self.es.search(
index="orders",
body=search_body
)
return {
"success": True,
"data": response["aggregations"],
"took": response["took"]
}
except Exception as e:
return {
"success": False,
"error": str(e)
}
def product_performance(self, category: str = None) -> Dict[str, Any]:
"""产品性能分析"""
filters = []
if category:
filters.append({
"term": {"product_category.keyword": category}
})
search_body = {
"size": 0,
"query": {
"bool": {"filter": filters}
} if filters else {"match_all": {}},
"aggs": {
"products": {
"terms": {
"field": "product_id.keyword",
"size": 100,
"order": {"revenue": "desc"}
},
"aggs": {
"revenue": {"sum": {"field": "total_amount"}},
"quantity_sold": {"sum": {"field": "quantity"}},
"orders": {"value_count": {"field": "order_id"}},
"avg_price": {"avg": {"field": "unit_price"}},
"first_sale": {"min": {"field": "order_date"}},
"last_sale": {"max": {"field": "order_date"}},
"product_details": {
"top_hits": {
"size": 1,
"_source": ["product_name", "product_category", "brand"]
}
}
}
},
"performance_metrics": {
"global": {},
"aggs": {
"total_products": {
"cardinality": {"field": "product_id.keyword"}
},
"revenue_percentiles": {
"percentiles": {
"field": "total_amount",
"percents": [25, 50, 75, 90, 95, 99]
}
}
}
}
}
}
try:
response = self.es.search(
index="order_items",
body=search_body
)
return {
"success": True,
"data": response["aggregations"],
"took": response["took"]
}
except Exception as e:
return {
"success": False,
"error": str(e)
}
def customer_analysis(self) -> Dict[str, Any]:
"""客户分析"""
search_body = {
"size": 0,
"aggs": {
# 客户价值分析
"customer_value": {
"terms": {
"field": "customer_id.keyword",
"size": 1000,
"order": {"total_spent": "desc"}
},
"aggs": {
"total_spent": {"sum": {"field": "total_amount"}},
"order_count": {"value_count": {"field": "order_id"}},
"avg_order_value": {"avg": {"field": "total_amount"}},
"first_order": {"min": {"field": "order_date"}},
"last_order": {"max": {"field": "order_date"}},
"customer_lifetime": {
"bucket_script": {
"buckets_path": {
"first": "first_order",
"last": "last_order"
},
"script": "(params.last - params.first) / (1000 * 60 * 60 * 24)"
}
}
}
},
# 客户分层
"customer_segments_by_value": {
"range": {
"script": {
"source": """
def totalSpent = 0;
for (doc in params._source.orders) {
totalSpent += doc.total_amount;
}
return totalSpent;
"""
},
"ranges": [
{"key": "low_value", "to": 100},
{"key": "medium_value", "from": 100, "to": 500},
{"key": "high_value", "from": 500, "to": 2000},
{"key": "vip", "from": 2000}
]
}
},
# 购买频率分析
"purchase_frequency": {
"histogram": {
"script": {
"source": """
return doc['order_count'].value
"""
},
"interval": 1,
"min_doc_count": 1
}
},
# 客户获取时间分析
"customer_acquisition": {
"date_histogram": {
"script": {
"source": "doc['first_order_date'].value"
},
"calendar_interval": "month"
}
},
# 客户留存分析
"customer_retention": {
"date_histogram": {
"field": "order_date",
"calendar_interval": "month"
},
"aggs": {
"new_customers": {
"cardinality": {
"field": "customer_id.keyword"
}
},
"returning_customers": {
"filter": {
"script": {
"source": """
// 检查是否为回头客的逻辑
return true;
"""
}
}
}
}
}
}
}
try:
response = self.es.search(
index="orders",
body=search_body
)
return {
"success": True,
"data": response["aggregations"],
"took": response["took"]
}
except Exception as e:
return {
"success": False,
"error": str(e)
}
def cohort_analysis(self, period: str = "month") -> Dict[str, Any]:
"""队列分析"""
search_body = {
"size": 0,
"aggs": {
"cohorts": {
"composite": {
"sources": [
{
"customer": {
"terms": {"field": "customer_id.keyword"}
}
},
{
"order_period": {
"date_histogram": {
"field": "order_date",
"calendar_interval": period
}
}
}
],
"size": 10000
},
"aggs": {
"first_order": {
"min": {"field": "order_date"}
},
"revenue": {
"sum": {"field": "total_amount"}
}
}
}
}
}
try:
response = self.es.search(
index="orders",
body=search_body
)
return {
"success": True,
"data": response["aggregations"],
"took": response["took"]
}
except Exception as e:
return {
"success": False,
"error": str(e)
}
# 使用示例
if __name__ == "__main__":
analytics = EcommerceAnalytics()
# 销售仪表板
dashboard = analytics.sales_dashboard(
start_date="2024-01-01",
end_date="2024-12-31"
)
if dashboard["success"]:
data = dashboard["data"]
print(f"总收入: ${data['total_revenue']['value']:,.2f}")
print(f"总订单数: {data['total_orders']['value']:,}")
print(f"独立客户数: {data['unique_customers']['value']:,}")
print(f"平均订单价值: ${data['avg_order_value']['value']:.2f}")
print("\n热门分类:")
for bucket in data["top_categories"]["buckets"][:5]:
print(f"- {bucket['key']}: ${bucket['category_revenue']['value']:,.2f}")
# 产品性能分析
product_perf = analytics.product_performance(category="Electronics")
if product_perf["success"]:
print("\n电子产品性能分析:")
for bucket in product_perf["data"]["products"]["buckets"][:5]:
product_name = bucket["product_details"]["hits"]["hits"][0]["_source"]["product_name"]
revenue = bucket["revenue"]["value"]
quantity = bucket["quantity_sold"]["value"]
print(f"- {product_name}: ${revenue:,.2f} ({quantity} 件)")
# 客户分析
customer_analysis = analytics.customer_analysis()
if customer_analysis["success"]:
print("\n客户价值分析:")
for bucket in customer_analysis["data"]["customer_value"]["buckets"][:5]:
customer_id = bucket["key"]
total_spent = bucket["total_spent"]["value"]
order_count = bucket["order_count"]["value"]
print(f"- 客户 {customer_id}: ${total_spent:,.2f} ({order_count} 订单)")
7.2 日志分析系统
#!/usr/bin/env python3
# log_analytics.py
from elasticsearch import Elasticsearch
from datetime import datetime, timedelta
from typing import Dict, List, Any
class LogAnalytics:
def __init__(self, es_host="localhost:9200", username=None, password=None):
if username and password:
self.es = Elasticsearch(
[es_host],
http_auth=(username, password)
)
else:
self.es = Elasticsearch([es_host])
def system_health_dashboard(self, time_range: str = "1h") -> Dict[str, Any]:
"""系统健康状况仪表板"""
search_body = {
"size": 0,
"query": {
"bool": {
"filter": [
{
"range": {
"@timestamp": {
"gte": f"now-{time_range}"
}
}
}
]
}
},
"aggs": {
# 日志级别分布
"log_levels": {
"terms": {
"field": "level.keyword"
}
},
# 服务状态
"services": {
"terms": {
"field": "service.name.keyword",
"size": 20
},
"aggs": {
"error_rate": {
"filter": {
"term": {"level.keyword": "ERROR"}
}
},
"avg_response_time": {
"avg": {
"field": "http.response_time"
}
},
"request_count": {
"value_count": {
"field": "http.request_id"
}
}
}
},
# 主机状态
"hosts": {
"terms": {
"field": "host.name.keyword",
"size": 50
},
"aggs": {
"cpu_usage": {
"avg": {
"field": "system.cpu.usage"
}
},
"memory_usage": {
"avg": {
"field": "system.memory.usage"
}
},
"disk_usage": {
"avg": {
"field": "system.disk.usage"
}
},
"error_count": {
"filter": {
"term": {"level.keyword": "ERROR"}
}
}
}
},
# 时间线分析
"timeline": {
"date_histogram": {
"field": "@timestamp",
"calendar_interval": "5m"
},
"aggs": {
"total_logs": {
"value_count": {
"field": "@timestamp"
}
},
"error_logs": {
"filter": {
"term": {"level.keyword": "ERROR"}
}
},
"warning_logs": {
"filter": {
"term": {"level.keyword": "WARN"}
}
},
"avg_response_time": {
"avg": {
"field": "http.response_time"
}
}
}
},
# HTTP状态码分析
"http_status": {
"terms": {
"field": "http.status_code"
}
},
# 响应时间分析
"response_time_stats": {
"stats": {
"field": "http.response_time"
}
},
"response_time_percentiles": {
"percentiles": {
"field": "http.response_time",
"percents": [50, 90, 95, 99]
}
},
# 慢请求分析
"slow_requests": {
"filter": {
"range": {
"http.response_time": {
"gte": 1000
}
}
},
"aggs": {
"slow_endpoints": {
"terms": {
"field": "http.url.keyword",
"size": 10
},
"aggs": {
"avg_response_time": {
"avg": {
"field": "http.response_time"
}
}
}
}
}
},
# 错误分析
"error_analysis": {
"filter": {
"term": {"level.keyword": "ERROR"}
},
"aggs": {
"error_types": {
"terms": {
"field": "error.type.keyword",
"size": 10
}
},
"error_messages": {
"terms": {
"field": "error.message.keyword",
"size": 10
}
},
"affected_services": {
"terms": {
"field": "service.name.keyword"
}
}
}
},
# 用户活动分析
"user_activity": {
"filter": {
"exists": {
"field": "user.id"
}
},
"aggs": {
"unique_users": {
"cardinality": {
"field": "user.id.keyword"
}
},
"top_users": {
"terms": {
"field": "user.id.keyword",
"size": 10
},
"aggs": {
"request_count": {
"value_count": {
"field": "http.request_id"
}
}
}
}
}
}
}
}
try:
response = self.es.search(
index="logs-*",
body=search_body
)
return {
"success": True,
"data": response["aggregations"],
"took": response["took"]
}
except Exception as e:
return {
"success": False,
"error": str(e)
}
def security_analysis(self, time_range: str = "24h") -> Dict[str, Any]:
"""安全分析"""
search_body = {
"size": 0,
"query": {
"bool": {
"filter": [
{
"range": {
"@timestamp": {
"gte": f"now-{time_range}"
}
}
}
]
}
},
"aggs": {
# 可疑IP分析
"suspicious_ips": {
"terms": {
"field": "client.ip.keyword",
"size": 100,
"order": {"request_count": "desc"}
},
"aggs": {
"request_count": {
"value_count": {
"field": "http.request_id"
}
},
"error_rate": {
"filter": {
"range": {
"http.status_code": {
"gte": 400
}
}
}
},
"unique_endpoints": {
"cardinality": {
"field": "http.url.keyword"
}
},
"user_agents": {
"terms": {
"field": "http.user_agent.keyword",
"size": 5
}
}
}
},
# 失败登录分析
"failed_logins": {
"filter": {
"bool": {
"must": [
{"term": {"event.action.keyword": "login"}},
{"term": {"event.outcome.keyword": "failure"}}
]
}
},
"aggs": {
"by_ip": {
"terms": {
"field": "client.ip.keyword",
"size": 20
}
},
"by_user": {
"terms": {
"field": "user.name.keyword",
"size": 20
}
},
"timeline": {
"date_histogram": {
"field": "@timestamp",
"calendar_interval": "1h"
}
}
}
},
# 攻击模式分析
"attack_patterns": {
"filters": {
"filters": {
"sql_injection": {
"regexp": {
"http.url.keyword": ".*('|(\\x27)|(\\x2D\\x2D)|(%27)|(%2D%2D)).*"
}
},
"xss_attempts": {
"regexp": {
"http.url.keyword": ".*(script|javascript|vbscript).*"
}
},
"path_traversal": {
"regexp": {
"http.url.keyword": ".*(\\.\\./).*"
}
},
"brute_force": {
"bool": {
"must": [
{"range": {"http.status_code": {"gte": 400, "lt": 500}}},
{"term": {"http.method.keyword": "POST"}}
]
}
}
}
}
},
# 地理位置分析
"geo_analysis": {
"terms": {
"field": "client.geo.country_name.keyword",
"size": 20
},
"aggs": {
"request_count": {
"value_count": {
"field": "http.request_id"
}
},
"cities": {
"terms": {
"field": "client.geo.city_name.keyword",
"size": 10
}
}
}
}
}
}
try:
response = self.es.search(
index="logs-*",
body=search_body
)
return {
"success": True,
"data": response["aggregations"],
"took": response["took"]
}
except Exception as e:
return {
"success": False,
"error": str(e)
}
def performance_analysis(self, service: str = None) -> Dict[str, Any]:
"""性能分析"""
filters = []
if service:
filters.append({
"term": {"service.name.keyword": service}
})
search_body = {
"size": 0,
"query": {
"bool": {"filter": filters}
} if filters else {"match_all": {}},
"aggs": {
# 响应时间分析
"response_time_analysis": {
"histogram": {
"field": "http.response_time",
"interval": 100
}
},
# 端点性能分析
"endpoint_performance": {
"terms": {
"field": "http.url.keyword",
"size": 50,
"order": {"avg_response_time": "desc"}
},
"aggs": {
"avg_response_time": {
"avg": {"field": "http.response_time"}
},
"max_response_time": {
"max": {"field": "http.response_time"}
},
"request_count": {
"value_count": {"field": "http.request_id"}
},
"error_rate": {
"filter": {
"range": {"http.status_code": {"gte": 400}}
}
},
"response_time_percentiles": {
"percentiles": {
"field": "http.response_time",
"percents": [50, 90, 95, 99]
}
}
}
},
# 数据库查询性能
"database_performance": {
"filter": {
"exists": {"field": "database.query_time"}
},
"aggs": {
"avg_query_time": {
"avg": {"field": "database.query_time"}
},
"slow_queries": {
"filter": {
"range": {"database.query_time": {"gte": 1000}}
},
"aggs": {
"query_types": {
"terms": {
"field": "database.query_type.keyword"
}
}
}
}
}
}
}
}
try:
response = self.es.search(
index="logs-*",
body=search_body
)
return {
"success": True,
"data": response["aggregations"],
"took": response["took"]
}
except Exception as e:
return {
"success": False,
"error": str(e)
}
# 使用示例
if __name__ == "__main__":
log_analytics = LogAnalytics()
# 系统健康状况
health = log_analytics.system_health_dashboard("1h")
if health["success"]:
data = health["data"]
print("系统健康状况:")
# 日志级别分布
print("\n日志级别分布:")
for bucket in data["log_levels"]["buckets"]:
print(f"- {bucket['key']}: {bucket['doc_count']}")
# 服务状态
print("\n服务状态:")
for bucket in data["services"]["buckets"][:5]:
service = bucket["key"]
error_count = bucket["error_rate"]["doc_count"]
total_requests = bucket["request_count"]["value"]
error_rate = (error_count / total_requests * 100) if total_requests > 0 else 0
avg_response = bucket["avg_response_time"]["value"] or 0
print(f"- {service}: 错误率 {error_rate:.2f}%, 平均响应时间 {avg_response:.2f}ms")
# 安全分析
security = log_analytics.security_analysis("24h")
if security["success"]:
data = security["data"]
print("\n安全分析:")
# 可疑IP
print("\n可疑IP (请求量最高):")
for bucket in data["suspicious_ips"]["buckets"][:5]:
ip = bucket["key"]
requests = bucket["request_count"]["value"]
errors = bucket["error_rate"]["doc_count"]
print(f"- {ip}: {requests} 请求, {errors} 错误")
# 攻击模式
print("\n检测到的攻击模式:")
for pattern, bucket in data["attack_patterns"]["buckets"].items():
if bucket["doc_count"] > 0:
print(f"- {pattern}: {bucket['doc_count']} 次")
8. 聚合性能优化
8.1 性能优化技巧
# 使用采样减少计算量
GET /large_index/_search
{
"size": 0,
"aggs": {
"sample": {
"sampler": {
"shard_size": 1000
},
"aggs": {
"expensive_aggregation": {
"terms": {
"field": "category.keyword"
}
}
}
}
}
}
# 使用composite聚合处理大量桶
GET /products/_search
{
"size": 0,
"aggs": {
"product_combinations": {
"composite": {
"size": 1000,
"sources": [
{
"category": {
"terms": {"field": "category.keyword"}
}
},
{
"brand": {
"terms": {"field": "brand.keyword"}
}
}
]
},
"aggs": {
"avg_price": {
"avg": {"field": "price"}
}
}
}
}
}
# 使用过滤器减少聚合范围
GET /logs/_search
{
"size": 0,
"query": {
"bool": {
"filter": [
{
"range": {
"@timestamp": {
"gte": "now-1h"
}
}
},
{
"term": {
"service.name.keyword": "web-service"
}
}
]
}
},
"aggs": {
"response_time_stats": {
"stats": {
"field": "http.response_time"
}
}
}
}
8.2 聚合缓存
# 启用聚合缓存
PUT /products/_settings
{
"index.requests.cache.enable": true
}
# 使用缓存的聚合查询
GET /products/_search?request_cache=true
{
"size": 0,
"aggs": {
"categories": {
"terms": {
"field": "category.keyword"
}
}
}
}
本章总结
本章详细介绍了Elasticsearch的聚合分析功能:
- 聚合基础:掌握了聚合的基本概念和语法结构
- 桶聚合:学习了terms、range、histogram、filter等桶聚合
- 指标聚合:了解了stats、percentiles、cardinality等指标聚合
- 管道聚合:掌握了对聚合结果进行二次计算的管道聚合
- 复杂聚合:学习了多层嵌套和组合聚合的使用
- 实践案例:通过电商分析和日志分析案例学习了实际应用
- 性能优化:了解了聚合性能优化的技巧和方法
下一章我们将学习Elasticsearch的性能优化和调优技巧。
练习题
- 实现一个销售报表聚合,包含月度趋势、产品分类分析和客户分层
- 编写一个日志监控聚合,统计错误率、响应时间分布和异常检测
- 创建一个用户行为分析聚合,包含访问路径、停留时间和转化漏斗
- 实现一个地理位置聚合分析,统计不同地区的业务指标