学习目标

通过本章学习,您将掌握: - 聚合分析的基本概念和类型 - 桶聚合(Bucket Aggregations)的使用 - 指标聚合(Metric Aggregations)的应用 - 管道聚合(Pipeline Aggregations)的高级用法 - 矩阵聚合(Matrix Aggregations)的特殊场景 - 聚合的嵌套和组合使用 - 实际业务场景中的聚合分析案例

1. 聚合分析基础

1.1 聚合概念

Elasticsearch聚合分析是对数据进行分组、统计和计算的强大功能,类似于SQL中的GROUP BY和聚合函数。

1.2 聚合类型

  • 桶聚合(Bucket Aggregations):将文档分组到不同的桶中
  • 指标聚合(Metric Aggregations):对数据进行数学计算
  • 管道聚合(Pipeline Aggregations):对其他聚合的结果进行计算
  • 矩阵聚合(Matrix Aggregations):对多个字段进行矩阵运算

1.3 基本语法结构

GET /index_name/_search
{
  "size": 0,  # 不返回文档,只返回聚合结果
  "aggs": {
    "aggregation_name": {
      "aggregation_type": {
        # 聚合配置
      },
      "aggs": {
        # 子聚合
      }
    }
  }
}

2. 桶聚合(Bucket Aggregations)

2.1 Terms聚合

# 基本terms聚合
GET /products/_search
{
  "size": 0,
  "aggs": {
    "categories": {
      "terms": {
        "field": "category.keyword",
        "size": 10
      }
    }
  }
}

# 高级terms聚合
GET /products/_search
{
  "size": 0,
  "aggs": {
    "top_brands": {
      "terms": {
        "field": "brand.keyword",
        "size": 5,
        "order": {
          "avg_price": "desc"
        },
        "min_doc_count": 10,
        "include": ["Apple", "Samsung", "Google"],
        "exclude": ["Unknown"]
      },
      "aggs": {
        "avg_price": {
          "avg": {
            "field": "price"
          }
        }
      }
    }
  }
}

# 多字段terms聚合
GET /products/_search
{
  "size": 0,
  "aggs": {
    "brand_category": {
      "multi_terms": {
        "terms": [
          {
            "field": "brand.keyword"
          },
          {
            "field": "category.keyword"
          }
        ],
        "size": 10
      }
    }
  }
}

2.2 Range聚合

# 价格范围聚合
GET /products/_search
{
  "size": 0,
  "aggs": {
    "price_ranges": {
      "range": {
        "field": "price",
        "ranges": [
          {
            "key": "cheap",
            "to": 100
          },
          {
            "key": "moderate",
            "from": 100,
            "to": 500
          },
          {
            "key": "expensive",
            "from": 500
          }
        ]
      },
      "aggs": {
        "avg_rating": {
          "avg": {
            "field": "rating"
          }
        }
      }
    }
  }
}

# 日期范围聚合
GET /orders/_search
{
  "size": 0,
  "aggs": {
    "sales_over_time": {
      "date_range": {
        "field": "order_date",
        "format": "yyyy-MM-dd",
        "ranges": [
          {
            "key": "last_week",
            "from": "now-7d/d",
            "to": "now/d"
          },
          {
            "key": "last_month",
            "from": "now-1M/M",
            "to": "now/M"
          },
          {
            "key": "last_year",
            "from": "now-1y/y",
            "to": "now/y"
          }
        ]
      }
    }
  }
}

2.3 Histogram聚合

# 价格直方图
GET /products/_search
{
  "size": 0,
  "aggs": {
    "price_histogram": {
      "histogram": {
        "field": "price",
        "interval": 100,
        "min_doc_count": 1,
        "extended_bounds": {
          "min": 0,
          "max": 2000
        }
      }
    }
  }
}

# 日期直方图
GET /logs/_search
{
  "size": 0,
  "aggs": {
    "requests_over_time": {
      "date_histogram": {
        "field": "@timestamp",
        "calendar_interval": "1h",
        "time_zone": "+08:00",
        "min_doc_count": 0,
        "extended_bounds": {
          "min": "now-24h",
          "max": "now"
        }
      },
      "aggs": {
        "error_rate": {
          "filter": {
            "term": {
              "level.keyword": "ERROR"
            }
          }
        },
        "avg_response_time": {
          "avg": {
            "field": "response_time"
          }
        }
      }
    }
  }
}

# 自动间隔日期直方图
GET /logs/_search
{
  "size": 0,
  "aggs": {
    "auto_date_histogram": {
      "auto_date_histogram": {
        "field": "@timestamp",
        "buckets": 20,
        "minimum_interval": "minute"
      }
    }
  }
}

2.4 Filter聚合

# 单个过滤器聚合
GET /products/_search
{
  "size": 0,
  "aggs": {
    "high_rated_products": {
      "filter": {
        "range": {
          "rating": {
            "gte": 4.0
          }
        }
      },
      "aggs": {
        "avg_price": {
          "avg": {
            "field": "price"
          }
        }
      }
    }
  }
}

# 多个过滤器聚合
GET /products/_search
{
  "size": 0,
  "aggs": {
    "product_segments": {
      "filters": {
        "filters": {
          "electronics": {
            "term": {
              "category.keyword": "Electronics"
            }
          },
          "books": {
            "term": {
              "category.keyword": "Books"
            }
          },
          "expensive": {
            "range": {
              "price": {
                "gte": 1000
              }
            }
          }
        }
      },
      "aggs": {
        "avg_rating": {
          "avg": {
            "field": "rating"
          }
        }
      }
    }
  }
}

2.5 Nested聚合

# 嵌套对象聚合
GET /products/_search
{
  "size": 0,
  "aggs": {
    "review_analysis": {
      "nested": {
        "path": "reviews"
      },
      "aggs": {
        "avg_rating": {
          "avg": {
            "field": "reviews.rating"
          }
        },
        "rating_distribution": {
          "histogram": {
            "field": "reviews.rating",
            "interval": 1,
            "min_doc_count": 0
          }
        },
        "top_reviewers": {
          "terms": {
            "field": "reviews.author.keyword",
            "size": 5
          }
        }
      }
    }
  }
}

2.6 地理位置聚合

# 地理距离聚合
GET /stores/_search
{
  "size": 0,
  "aggs": {
    "distance_from_center": {
      "geo_distance": {
        "field": "location",
        "origin": {
          "lat": 40.7128,
          "lon": -74.0060
        },
        "ranges": [
          {
            "key": "nearby",
            "to": 5000
          },
          {
            "key": "medium",
            "from": 5000,
            "to": 20000
          },
          {
            "key": "far",
            "from": 20000
          }
        ]
      }
    }
  }
}

# 地理网格聚合
GET /stores/_search
{
  "size": 0,
  "aggs": {
    "store_grid": {
      "geohash_grid": {
        "field": "location",
        "precision": 5
      }
    }
  }
}

# 地理边界聚合
GET /stores/_search
{
  "size": 0,
  "aggs": {
    "store_bounds": {
      "geo_bounds": {
        "field": "location"
      }
    }
  }
}

2.7 采样聚合

# 采样聚合(提高大数据集性能)
GET /large_dataset/_search
{
  "size": 0,
  "aggs": {
    "sample": {
      "sampler": {
        "shard_size": 1000
      },
      "aggs": {
        "top_categories": {
          "terms": {
            "field": "category.keyword",
            "size": 10
          }
        }
      }
    }
  }
}

# 多样性采样聚合
GET /products/_search
{
  "size": 0,
  "aggs": {
    "diverse_sample": {
      "diversified_sampler": {
        "shard_size": 1000,
        "field": "category.keyword"
      },
      "aggs": {
        "avg_price": {
          "avg": {
            "field": "price"
          }
        }
      }
    }
  }
}

3. 指标聚合(Metric Aggregations)

3.1 基础统计聚合

# 多种统计指标
GET /products/_search
{
  "size": 0,
  "aggs": {
    "price_stats": {
      "stats": {
        "field": "price"
      }
    },
    "price_extended_stats": {
      "extended_stats": {
        "field": "price",
        "sigma": 2
      }
    },
    "avg_price": {
      "avg": {
        "field": "price"
      }
    },
    "max_price": {
      "max": {
        "field": "price"
      }
    },
    "min_price": {
      "min": {
        "field": "price"
      }
    },
    "sum_price": {
      "sum": {
        "field": "price"
      }
    },
    "value_count": {
      "value_count": {
        "field": "price"
      }
    }
  }
}

3.2 百分位数聚合

# 百分位数分析
GET /response_times/_search
{
  "size": 0,
  "aggs": {
    "response_time_percentiles": {
      "percentiles": {
        "field": "response_time",
        "percents": [25, 50, 75, 90, 95, 99],
        "keyed": false
      }
    },
    "response_time_percentile_ranks": {
      "percentile_ranks": {
        "field": "response_time",
        "values": [100, 500, 1000]
      }
    }
  }
}

# T-Digest百分位数(更精确)
GET /response_times/_search
{
  "size": 0,
  "aggs": {
    "response_time_tdigest": {
      "tdigest_percentiles": {
        "field": "response_time",
        "percents": [50, 90, 95, 99],
        "compression": 200
      }
    }
  }
}

# HDR百分位数(高动态范围)
GET /response_times/_search
{
  "size": 0,
  "aggs": {
    "response_time_hdr": {
      "hdr_percentiles": {
        "field": "response_time",
        "percents": [50, 90, 95, 99],
        "number_of_significant_value_digits": 3
      }
    }
  }
}

3.3 基数聚合

# 唯一值计数
GET /logs/_search
{
  "size": 0,
  "aggs": {
    "unique_users": {
      "cardinality": {
        "field": "user_id.keyword",
        "precision_threshold": 10000
      }
    },
    "unique_ips": {
      "cardinality": {
        "field": "client_ip.keyword"
      }
    }
  }
}

3.4 脚本聚合

# 脚本指标聚合
GET /products/_search
{
  "size": 0,
  "aggs": {
    "profit_margin": {
      "avg": {
        "script": {
          "source": "(doc['price'].value - doc['cost'].value) / doc['price'].value * 100"
        }
      }
    },
    "weighted_rating": {
      "sum": {
        "script": {
          "source": "doc['rating'].value * doc['review_count'].value",
          "params": {
            "factor": 1.2
          }
        }
      }
    }
  }
}

3.5 Top Hits聚合

# 获取每个分组的顶部文档
GET /products/_search
{
  "size": 0,
  "aggs": {
    "categories": {
      "terms": {
        "field": "category.keyword",
        "size": 5
      },
      "aggs": {
        "top_products": {
          "top_hits": {
            "sort": [
              {
                "rating": {
                  "order": "desc"
                }
              }
            ],
            "_source": {
              "includes": ["title", "price", "rating"]
            },
            "size": 3
          }
        }
      }
    }
  }
}

4. 管道聚合(Pipeline Aggregations)

4.1 父级管道聚合

# 平均桶值
GET /sales/_search
{
  "size": 0,
  "aggs": {
    "monthly_sales": {
      "date_histogram": {
        "field": "date",
        "calendar_interval": "month"
      },
      "aggs": {
        "total_sales": {
          "sum": {
            "field": "amount"
          }
        }
      }
    },
    "avg_monthly_sales": {
      "avg_bucket": {
        "buckets_path": "monthly_sales>total_sales"
      }
    },
    "max_monthly_sales": {
      "max_bucket": {
        "buckets_path": "monthly_sales>total_sales"
      }
    },
    "min_monthly_sales": {
      "min_bucket": {
        "buckets_path": "monthly_sales>total_sales"
      }
    },
    "sum_monthly_sales": {
      "sum_bucket": {
        "buckets_path": "monthly_sales>total_sales"
      }
    }
  }
}

# 桶排序
GET /products/_search
{
  "size": 0,
  "aggs": {
    "categories": {
      "terms": {
        "field": "category.keyword",
        "size": 10
      },
      "aggs": {
        "avg_price": {
          "avg": {
            "field": "price"
          }
        }
      }
    },
    "categories_sorted_by_avg_price": {
      "bucket_sort": {
        "sort": [
          {
            "avg_price": {
              "order": "desc"
            }
          }
        ],
        "size": 5
      }
    }
  }
}

# 桶脚本
GET /sales/_search
{
  "size": 0,
  "aggs": {
    "monthly_sales": {
      "date_histogram": {
        "field": "date",
        "calendar_interval": "month"
      },
      "aggs": {
        "total_sales": {
          "sum": {
            "field": "amount"
          }
        },
        "sales_growth": {
          "bucket_script": {
            "buckets_path": {
              "current": "total_sales"
            },
            "script": "params.current * 1.1"
          }
        }
      }
    }
  }
}

4.2 同级管道聚合

# 累积和
GET /sales/_search
{
  "size": 0,
  "aggs": {
    "daily_sales": {
      "date_histogram": {
        "field": "date",
        "calendar_interval": "day"
      },
      "aggs": {
        "total_sales": {
          "sum": {
            "field": "amount"
          }
        },
        "cumulative_sales": {
          "cumulative_sum": {
            "buckets_path": "total_sales"
          }
        }
      }
    }
  }
}

# 移动平均
GET /sales/_search
{
  "size": 0,
  "aggs": {
    "daily_sales": {
      "date_histogram": {
        "field": "date",
        "calendar_interval": "day"
      },
      "aggs": {
        "total_sales": {
          "sum": {
            "field": "amount"
          }
        },
        "moving_avg": {
          "moving_avg": {
            "buckets_path": "total_sales",
            "window": 7,
            "model": "linear"
          }
        }
      }
    }
  }
}

# 导数(变化率)
GET /sales/_search
{
  "size": 0,
  "aggs": {
    "daily_sales": {
      "date_histogram": {
        "field": "date",
        "calendar_interval": "day"
      },
      "aggs": {
        "total_sales": {
          "sum": {
            "field": "amount"
          }
        },
        "sales_derivative": {
          "derivative": {
            "buckets_path": "total_sales"
          }
        }
      }
    }
  }
}

# 序列差分
GET /sales/_search
{
  "size": 0,
  "aggs": {
    "daily_sales": {
      "date_histogram": {
        "field": "date",
        "calendar_interval": "day"
      },
      "aggs": {
        "total_sales": {
          "sum": {
            "field": "amount"
          }
        },
        "sales_diff": {
          "serial_diff": {
            "buckets_path": "total_sales",
            "lag": 1
          }
        }
      }
    }
  }
}

5. 矩阵聚合(Matrix Aggregations)

# 矩阵统计
GET /products/_search
{
  "size": 0,
  "aggs": {
    "price_rating_stats": {
      "matrix_stats": {
        "fields": ["price", "rating", "review_count"]
      }
    }
  }
}

6. 复杂聚合组合

6.1 多层嵌套聚合

# 复杂的电商分析
GET /orders/_search
{
  "size": 0,
  "aggs": {
    "monthly_analysis": {
      "date_histogram": {
        "field": "order_date",
        "calendar_interval": "month"
      },
      "aggs": {
        "total_revenue": {
          "sum": {
            "field": "total_amount"
          }
        },
        "order_count": {
          "value_count": {
            "field": "order_id"
          }
        },
        "avg_order_value": {
          "avg": {
            "field": "total_amount"
          }
        },
        "customer_segments": {
          "terms": {
            "field": "customer_segment.keyword",
            "size": 5
          },
          "aggs": {
            "segment_revenue": {
              "sum": {
                "field": "total_amount"
              }
            },
            "avg_segment_order_value": {
              "avg": {
                "field": "total_amount"
              }
            },
            "top_products": {
              "terms": {
                "field": "product_category.keyword",
                "size": 3
              },
              "aggs": {
                "category_revenue": {
                  "sum": {
                    "field": "total_amount"
                  }
                }
              }
            }
          }
        },
        "payment_methods": {
          "terms": {
            "field": "payment_method.keyword"
          },
          "aggs": {
            "method_revenue": {
              "sum": {
                "field": "total_amount"
              }
            }
          }
        }
      }
    },
    "overall_stats": {
      "global": {},
      "aggs": {
        "total_customers": {
          "cardinality": {
            "field": "customer_id.keyword"
          }
        },
        "total_revenue": {
          "sum": {
            "field": "total_amount"
          }
        }
      }
    }
  }
}

6.2 条件聚合

# 基于条件的聚合分析
GET /products/_search
{
  "size": 0,
  "aggs": {
    "product_analysis": {
      "filters": {
        "filters": {
          "high_end": {
            "range": {
              "price": {
                "gte": 1000
              }
            }
          },
          "mid_range": {
            "range": {
              "price": {
                "gte": 100,
                "lt": 1000
              }
            }
          },
          "budget": {
            "range": {
              "price": {
                "lt": 100
              }
            }
          }
        }
      },
      "aggs": {
        "avg_rating": {
          "avg": {
            "field": "rating"
          }
        },
        "brand_distribution": {
          "terms": {
            "field": "brand.keyword",
            "size": 5
          }
        },
        "rating_percentiles": {
          "percentiles": {
            "field": "rating",
            "percents": [25, 50, 75, 90, 95]
          }
        }
      }
    }
  }
}

7. 实践案例

7.1 电商业务分析系统

#!/usr/bin/env python3
# ecommerce_analytics.py

from elasticsearch import Elasticsearch
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional
import json

class EcommerceAnalytics:
    def __init__(self, es_host="localhost:9200", username=None, password=None):
        if username and password:
            self.es = Elasticsearch(
                [es_host],
                http_auth=(username, password)
            )
        else:
            self.es = Elasticsearch([es_host])
    
    def sales_dashboard(self, 
                       start_date: str = None, 
                       end_date: str = None) -> Dict[str, Any]:
        """销售仪表板数据"""
        
        # 构建时间过滤器
        time_filter = []
        if start_date or end_date:
            time_range = {}
            if start_date:
                time_range["gte"] = start_date
            if end_date:
                time_range["lte"] = end_date
            time_filter.append({
                "range": {"order_date": time_range}
            })
        
        search_body = {
            "size": 0,
            "query": {
                "bool": {
                    "filter": time_filter
                }
            } if time_filter else {"match_all": {}},
            "aggs": {
                # 总体指标
                "total_revenue": {
                    "sum": {"field": "total_amount"}
                },
                "total_orders": {
                    "value_count": {"field": "order_id"}
                },
                "unique_customers": {
                    "cardinality": {"field": "customer_id.keyword"}
                },
                "avg_order_value": {
                    "avg": {"field": "total_amount"}
                },
                
                # 时间趋势
                "daily_sales": {
                    "date_histogram": {
                        "field": "order_date",
                        "calendar_interval": "day"
                    },
                    "aggs": {
                        "revenue": {"sum": {"field": "total_amount"}},
                        "orders": {"value_count": {"field": "order_id"}},
                        "customers": {"cardinality": {"field": "customer_id.keyword"}}
                    }
                },
                
                # 产品分析
                "top_categories": {
                    "terms": {
                        "field": "product_category.keyword",
                        "size": 10,
                        "order": {"category_revenue": "desc"}
                    },
                    "aggs": {
                        "category_revenue": {"sum": {"field": "total_amount"}},
                        "category_orders": {"value_count": {"field": "order_id"}},
                        "avg_order_value": {"avg": {"field": "total_amount"}}
                    }
                },
                
                # 客户分析
                "customer_segments": {
                    "terms": {
                        "field": "customer_segment.keyword"
                    },
                    "aggs": {
                        "segment_revenue": {"sum": {"field": "total_amount"}},
                        "segment_customers": {"cardinality": {"field": "customer_id.keyword"}},
                        "avg_customer_value": {
                            "bucket_script": {
                                "buckets_path": {
                                    "revenue": "segment_revenue",
                                    "customers": "segment_customers"
                                },
                                "script": "params.revenue / params.customers"
                            }
                        }
                    }
                },
                
                # 地理分析
                "sales_by_region": {
                    "terms": {
                        "field": "shipping_region.keyword",
                        "size": 20
                    },
                    "aggs": {
                        "region_revenue": {"sum": {"field": "total_amount"}},
                        "region_orders": {"value_count": {"field": "order_id"}}
                    }
                },
                
                # 支付方式分析
                "payment_methods": {
                    "terms": {
                        "field": "payment_method.keyword"
                    },
                    "aggs": {
                        "method_revenue": {"sum": {"field": "total_amount"}},
                        "method_orders": {"value_count": {"field": "order_id"}}
                    }
                },
                
                # 订单价值分布
                "order_value_distribution": {
                    "histogram": {
                        "field": "total_amount",
                        "interval": 50,
                        "min_doc_count": 1
                    }
                },
                
                # 复购分析
                "repeat_customers": {
                    "scripted_metric": {
                        "init_script": "state.customers = [:]",
                        "map_script": """
                            String customerId = doc['customer_id.keyword'].value;
                            if (state.customers.containsKey(customerId)) {
                                state.customers[customerId]++;
                            } else {
                                state.customers[customerId] = 1;
                            }
                        """,
                        "combine_script": "return state.customers",
                        "reduce_script": """
                            Map combined = [:];
                            for (state in states) {
                                for (entry in state.entrySet()) {
                                    if (combined.containsKey(entry.getKey())) {
                                        combined[entry.getKey()] += entry.getValue();
                                    } else {
                                        combined[entry.getKey()] = entry.getValue();
                                    }
                                }
                            }
                            int repeatCustomers = 0;
                            for (entry in combined.entrySet()) {
                                if (entry.getValue() > 1) {
                                    repeatCustomers++;
                                }
                            }
                            return repeatCustomers;
                        """
                    }
                }
            }
        }
        
        try:
            response = self.es.search(
                index="orders",
                body=search_body
            )
            
            return {
                "success": True,
                "data": response["aggregations"],
                "took": response["took"]
            }
        
        except Exception as e:
            return {
                "success": False,
                "error": str(e)
            }
    
    def product_performance(self, category: str = None) -> Dict[str, Any]:
        """产品性能分析"""
        
        filters = []
        if category:
            filters.append({
                "term": {"product_category.keyword": category}
            })
        
        search_body = {
            "size": 0,
            "query": {
                "bool": {"filter": filters}
            } if filters else {"match_all": {}},
            "aggs": {
                "products": {
                    "terms": {
                        "field": "product_id.keyword",
                        "size": 100,
                        "order": {"revenue": "desc"}
                    },
                    "aggs": {
                        "revenue": {"sum": {"field": "total_amount"}},
                        "quantity_sold": {"sum": {"field": "quantity"}},
                        "orders": {"value_count": {"field": "order_id"}},
                        "avg_price": {"avg": {"field": "unit_price"}},
                        "first_sale": {"min": {"field": "order_date"}},
                        "last_sale": {"max": {"field": "order_date"}},
                        "product_details": {
                            "top_hits": {
                                "size": 1,
                                "_source": ["product_name", "product_category", "brand"]
                            }
                        }
                    }
                },
                
                "performance_metrics": {
                    "global": {},
                    "aggs": {
                        "total_products": {
                            "cardinality": {"field": "product_id.keyword"}
                        },
                        "revenue_percentiles": {
                            "percentiles": {
                                "field": "total_amount",
                                "percents": [25, 50, 75, 90, 95, 99]
                            }
                        }
                    }
                }
            }
        }
        
        try:
            response = self.es.search(
                index="order_items",
                body=search_body
            )
            
            return {
                "success": True,
                "data": response["aggregations"],
                "took": response["took"]
            }
        
        except Exception as e:
            return {
                "success": False,
                "error": str(e)
            }
    
    def customer_analysis(self) -> Dict[str, Any]:
        """客户分析"""
        
        search_body = {
            "size": 0,
            "aggs": {
                # 客户价值分析
                "customer_value": {
                    "terms": {
                        "field": "customer_id.keyword",
                        "size": 1000,
                        "order": {"total_spent": "desc"}
                    },
                    "aggs": {
                        "total_spent": {"sum": {"field": "total_amount"}},
                        "order_count": {"value_count": {"field": "order_id"}},
                        "avg_order_value": {"avg": {"field": "total_amount"}},
                        "first_order": {"min": {"field": "order_date"}},
                        "last_order": {"max": {"field": "order_date"}},
                        "customer_lifetime": {
                            "bucket_script": {
                                "buckets_path": {
                                    "first": "first_order",
                                    "last": "last_order"
                                },
                                "script": "(params.last - params.first) / (1000 * 60 * 60 * 24)"
                            }
                        }
                    }
                },
                
                # 客户分层
                "customer_segments_by_value": {
                    "range": {
                        "script": {
                            "source": """
                                def totalSpent = 0;
                                for (doc in params._source.orders) {
                                    totalSpent += doc.total_amount;
                                }
                                return totalSpent;
                            """
                        },
                        "ranges": [
                            {"key": "low_value", "to": 100},
                            {"key": "medium_value", "from": 100, "to": 500},
                            {"key": "high_value", "from": 500, "to": 2000},
                            {"key": "vip", "from": 2000}
                        ]
                    }
                },
                
                # 购买频率分析
                "purchase_frequency": {
                    "histogram": {
                        "script": {
                            "source": """
                                return doc['order_count'].value
                            """
                        },
                        "interval": 1,
                        "min_doc_count": 1
                    }
                },
                
                # 客户获取时间分析
                "customer_acquisition": {
                    "date_histogram": {
                        "script": {
                            "source": "doc['first_order_date'].value"
                        },
                        "calendar_interval": "month"
                    }
                },
                
                # 客户留存分析
                "customer_retention": {
                    "date_histogram": {
                        "field": "order_date",
                        "calendar_interval": "month"
                    },
                    "aggs": {
                        "new_customers": {
                            "cardinality": {
                                "field": "customer_id.keyword"
                            }
                        },
                        "returning_customers": {
                            "filter": {
                                "script": {
                                    "source": """
                                        // 检查是否为回头客的逻辑
                                        return true;
                                    """
                                }
                            }
                        }
                    }
                }
            }
        }
        
        try:
            response = self.es.search(
                index="orders",
                body=search_body
            )
            
            return {
                "success": True,
                "data": response["aggregations"],
                "took": response["took"]
            }
        
        except Exception as e:
            return {
                "success": False,
                "error": str(e)
            }
    
    def cohort_analysis(self, period: str = "month") -> Dict[str, Any]:
        """队列分析"""
        
        search_body = {
            "size": 0,
            "aggs": {
                "cohorts": {
                    "composite": {
                        "sources": [
                            {
                                "customer": {
                                    "terms": {"field": "customer_id.keyword"}
                                }
                            },
                            {
                                "order_period": {
                                    "date_histogram": {
                                        "field": "order_date",
                                        "calendar_interval": period
                                    }
                                }
                            }
                        ],
                        "size": 10000
                    },
                    "aggs": {
                        "first_order": {
                            "min": {"field": "order_date"}
                        },
                        "revenue": {
                            "sum": {"field": "total_amount"}
                        }
                    }
                }
            }
        }
        
        try:
            response = self.es.search(
                index="orders",
                body=search_body
            )
            
            return {
                "success": True,
                "data": response["aggregations"],
                "took": response["took"]
            }
        
        except Exception as e:
            return {
                "success": False,
                "error": str(e)
            }

# 使用示例
if __name__ == "__main__":
    analytics = EcommerceAnalytics()
    
    # 销售仪表板
    dashboard = analytics.sales_dashboard(
        start_date="2024-01-01",
        end_date="2024-12-31"
    )
    
    if dashboard["success"]:
        data = dashboard["data"]
        print(f"总收入: ${data['total_revenue']['value']:,.2f}")
        print(f"总订单数: {data['total_orders']['value']:,}")
        print(f"独立客户数: {data['unique_customers']['value']:,}")
        print(f"平均订单价值: ${data['avg_order_value']['value']:.2f}")
        
        print("\n热门分类:")
        for bucket in data["top_categories"]["buckets"][:5]:
            print(f"- {bucket['key']}: ${bucket['category_revenue']['value']:,.2f}")
    
    # 产品性能分析
    product_perf = analytics.product_performance(category="Electronics")
    
    if product_perf["success"]:
        print("\n电子产品性能分析:")
        for bucket in product_perf["data"]["products"]["buckets"][:5]:
            product_name = bucket["product_details"]["hits"]["hits"][0]["_source"]["product_name"]
            revenue = bucket["revenue"]["value"]
            quantity = bucket["quantity_sold"]["value"]
            print(f"- {product_name}: ${revenue:,.2f} ({quantity} 件)")
    
    # 客户分析
    customer_analysis = analytics.customer_analysis()
    
    if customer_analysis["success"]:
        print("\n客户价值分析:")
        for bucket in customer_analysis["data"]["customer_value"]["buckets"][:5]:
            customer_id = bucket["key"]
            total_spent = bucket["total_spent"]["value"]
            order_count = bucket["order_count"]["value"]
            print(f"- 客户 {customer_id}: ${total_spent:,.2f} ({order_count} 订单)")

7.2 日志分析系统

#!/usr/bin/env python3
# log_analytics.py

from elasticsearch import Elasticsearch
from datetime import datetime, timedelta
from typing import Dict, List, Any

class LogAnalytics:
    def __init__(self, es_host="localhost:9200", username=None, password=None):
        if username and password:
            self.es = Elasticsearch(
                [es_host],
                http_auth=(username, password)
            )
        else:
            self.es = Elasticsearch([es_host])
    
    def system_health_dashboard(self, time_range: str = "1h") -> Dict[str, Any]:
        """系统健康状况仪表板"""
        
        search_body = {
            "size": 0,
            "query": {
                "bool": {
                    "filter": [
                        {
                            "range": {
                                "@timestamp": {
                                    "gte": f"now-{time_range}"
                                }
                            }
                        }
                    ]
                }
            },
            "aggs": {
                # 日志级别分布
                "log_levels": {
                    "terms": {
                        "field": "level.keyword"
                    }
                },
                
                # 服务状态
                "services": {
                    "terms": {
                        "field": "service.name.keyword",
                        "size": 20
                    },
                    "aggs": {
                        "error_rate": {
                            "filter": {
                                "term": {"level.keyword": "ERROR"}
                            }
                        },
                        "avg_response_time": {
                            "avg": {
                                "field": "http.response_time"
                            }
                        },
                        "request_count": {
                            "value_count": {
                                "field": "http.request_id"
                            }
                        }
                    }
                },
                
                # 主机状态
                "hosts": {
                    "terms": {
                        "field": "host.name.keyword",
                        "size": 50
                    },
                    "aggs": {
                        "cpu_usage": {
                            "avg": {
                                "field": "system.cpu.usage"
                            }
                        },
                        "memory_usage": {
                            "avg": {
                                "field": "system.memory.usage"
                            }
                        },
                        "disk_usage": {
                            "avg": {
                                "field": "system.disk.usage"
                            }
                        },
                        "error_count": {
                            "filter": {
                                "term": {"level.keyword": "ERROR"}
                            }
                        }
                    }
                },
                
                # 时间线分析
                "timeline": {
                    "date_histogram": {
                        "field": "@timestamp",
                        "calendar_interval": "5m"
                    },
                    "aggs": {
                        "total_logs": {
                            "value_count": {
                                "field": "@timestamp"
                            }
                        },
                        "error_logs": {
                            "filter": {
                                "term": {"level.keyword": "ERROR"}
                            }
                        },
                        "warning_logs": {
                            "filter": {
                                "term": {"level.keyword": "WARN"}
                            }
                        },
                        "avg_response_time": {
                            "avg": {
                                "field": "http.response_time"
                            }
                        }
                    }
                },
                
                # HTTP状态码分析
                "http_status": {
                    "terms": {
                        "field": "http.status_code"
                    }
                },
                
                # 响应时间分析
                "response_time_stats": {
                    "stats": {
                        "field": "http.response_time"
                    }
                },
                "response_time_percentiles": {
                    "percentiles": {
                        "field": "http.response_time",
                        "percents": [50, 90, 95, 99]
                    }
                },
                
                # 慢请求分析
                "slow_requests": {
                    "filter": {
                        "range": {
                            "http.response_time": {
                                "gte": 1000
                            }
                        }
                    },
                    "aggs": {
                        "slow_endpoints": {
                            "terms": {
                                "field": "http.url.keyword",
                                "size": 10
                            },
                            "aggs": {
                                "avg_response_time": {
                                    "avg": {
                                        "field": "http.response_time"
                                    }
                                }
                            }
                        }
                    }
                },
                
                # 错误分析
                "error_analysis": {
                    "filter": {
                        "term": {"level.keyword": "ERROR"}
                    },
                    "aggs": {
                        "error_types": {
                            "terms": {
                                "field": "error.type.keyword",
                                "size": 10
                            }
                        },
                        "error_messages": {
                            "terms": {
                                "field": "error.message.keyword",
                                "size": 10
                            }
                        },
                        "affected_services": {
                            "terms": {
                                "field": "service.name.keyword"
                            }
                        }
                    }
                },
                
                # 用户活动分析
                "user_activity": {
                    "filter": {
                        "exists": {
                            "field": "user.id"
                        }
                    },
                    "aggs": {
                        "unique_users": {
                            "cardinality": {
                                "field": "user.id.keyword"
                            }
                        },
                        "top_users": {
                            "terms": {
                                "field": "user.id.keyword",
                                "size": 10
                            },
                            "aggs": {
                                "request_count": {
                                    "value_count": {
                                        "field": "http.request_id"
                                    }
                                }
                            }
                        }
                    }
                }
            }
        }
        
        try:
            response = self.es.search(
                index="logs-*",
                body=search_body
            )
            
            return {
                "success": True,
                "data": response["aggregations"],
                "took": response["took"]
            }
        
        except Exception as e:
            return {
                "success": False,
                "error": str(e)
            }
    
    def security_analysis(self, time_range: str = "24h") -> Dict[str, Any]:
        """安全分析"""
        
        search_body = {
            "size": 0,
            "query": {
                "bool": {
                    "filter": [
                        {
                            "range": {
                                "@timestamp": {
                                    "gte": f"now-{time_range}"
                                }
                            }
                        }
                    ]
                }
            },
            "aggs": {
                # 可疑IP分析
                "suspicious_ips": {
                    "terms": {
                        "field": "client.ip.keyword",
                        "size": 100,
                        "order": {"request_count": "desc"}
                    },
                    "aggs": {
                        "request_count": {
                            "value_count": {
                                "field": "http.request_id"
                            }
                        },
                        "error_rate": {
                            "filter": {
                                "range": {
                                    "http.status_code": {
                                        "gte": 400
                                    }
                                }
                            }
                        },
                        "unique_endpoints": {
                            "cardinality": {
                                "field": "http.url.keyword"
                            }
                        },
                        "user_agents": {
                            "terms": {
                                "field": "http.user_agent.keyword",
                                "size": 5
                            }
                        }
                    }
                },
                
                # 失败登录分析
                "failed_logins": {
                    "filter": {
                        "bool": {
                            "must": [
                                {"term": {"event.action.keyword": "login"}},
                                {"term": {"event.outcome.keyword": "failure"}}
                            ]
                        }
                    },
                    "aggs": {
                        "by_ip": {
                            "terms": {
                                "field": "client.ip.keyword",
                                "size": 20
                            }
                        },
                        "by_user": {
                            "terms": {
                                "field": "user.name.keyword",
                                "size": 20
                            }
                        },
                        "timeline": {
                            "date_histogram": {
                                "field": "@timestamp",
                                "calendar_interval": "1h"
                            }
                        }
                    }
                },
                
                # 攻击模式分析
                "attack_patterns": {
                    "filters": {
                        "filters": {
                            "sql_injection": {
                                "regexp": {
                                    "http.url.keyword": ".*('|(\\x27)|(\\x2D\\x2D)|(%27)|(%2D%2D)).*"
                                }
                            },
                            "xss_attempts": {
                                "regexp": {
                                    "http.url.keyword": ".*(script|javascript|vbscript).*"
                                }
                            },
                            "path_traversal": {
                                "regexp": {
                                    "http.url.keyword": ".*(\\.\\./).*"
                                }
                            },
                            "brute_force": {
                                "bool": {
                                    "must": [
                                        {"range": {"http.status_code": {"gte": 400, "lt": 500}}},
                                        {"term": {"http.method.keyword": "POST"}}
                                    ]
                                }
                            }
                        }
                    }
                },
                
                # 地理位置分析
                "geo_analysis": {
                    "terms": {
                        "field": "client.geo.country_name.keyword",
                        "size": 20
                    },
                    "aggs": {
                        "request_count": {
                            "value_count": {
                                "field": "http.request_id"
                            }
                        },
                        "cities": {
                            "terms": {
                                "field": "client.geo.city_name.keyword",
                                "size": 10
                            }
                        }
                    }
                }
            }
        }
        
        try:
            response = self.es.search(
                index="logs-*",
                body=search_body
            )
            
            return {
                "success": True,
                "data": response["aggregations"],
                "took": response["took"]
            }
        
        except Exception as e:
            return {
                "success": False,
                "error": str(e)
            }
    
    def performance_analysis(self, service: str = None) -> Dict[str, Any]:
        """性能分析"""
        
        filters = []
        if service:
            filters.append({
                "term": {"service.name.keyword": service}
            })
        
        search_body = {
            "size": 0,
            "query": {
                "bool": {"filter": filters}
            } if filters else {"match_all": {}},
            "aggs": {
                # 响应时间分析
                "response_time_analysis": {
                    "histogram": {
                        "field": "http.response_time",
                        "interval": 100
                    }
                },
                
                # 端点性能分析
                "endpoint_performance": {
                    "terms": {
                        "field": "http.url.keyword",
                        "size": 50,
                        "order": {"avg_response_time": "desc"}
                    },
                    "aggs": {
                        "avg_response_time": {
                            "avg": {"field": "http.response_time"}
                        },
                        "max_response_time": {
                            "max": {"field": "http.response_time"}
                        },
                        "request_count": {
                            "value_count": {"field": "http.request_id"}
                        },
                        "error_rate": {
                            "filter": {
                                "range": {"http.status_code": {"gte": 400}}
                            }
                        },
                        "response_time_percentiles": {
                            "percentiles": {
                                "field": "http.response_time",
                                "percents": [50, 90, 95, 99]
                            }
                        }
                    }
                },
                
                # 数据库查询性能
                "database_performance": {
                    "filter": {
                        "exists": {"field": "database.query_time"}
                    },
                    "aggs": {
                        "avg_query_time": {
                            "avg": {"field": "database.query_time"}
                        },
                        "slow_queries": {
                            "filter": {
                                "range": {"database.query_time": {"gte": 1000}}
                            },
                            "aggs": {
                                "query_types": {
                                    "terms": {
                                        "field": "database.query_type.keyword"
                                    }
                                }
                            }
                        }
                    }
                }
            }
        }
        
        try:
            response = self.es.search(
                index="logs-*",
                body=search_body
            )
            
            return {
                "success": True,
                "data": response["aggregations"],
                "took": response["took"]
            }
        
        except Exception as e:
            return {
                "success": False,
                "error": str(e)
            }

# 使用示例
if __name__ == "__main__":
    log_analytics = LogAnalytics()
    
    # 系统健康状况
    health = log_analytics.system_health_dashboard("1h")
    if health["success"]:
        data = health["data"]
        print("系统健康状况:")
        
        # 日志级别分布
        print("\n日志级别分布:")
        for bucket in data["log_levels"]["buckets"]:
            print(f"- {bucket['key']}: {bucket['doc_count']}")
        
        # 服务状态
        print("\n服务状态:")
        for bucket in data["services"]["buckets"][:5]:
            service = bucket["key"]
            error_count = bucket["error_rate"]["doc_count"]
            total_requests = bucket["request_count"]["value"]
            error_rate = (error_count / total_requests * 100) if total_requests > 0 else 0
            avg_response = bucket["avg_response_time"]["value"] or 0
            print(f"- {service}: 错误率 {error_rate:.2f}%, 平均响应时间 {avg_response:.2f}ms")
    
    # 安全分析
    security = log_analytics.security_analysis("24h")
    if security["success"]:
        data = security["data"]
        print("\n安全分析:")
        
        # 可疑IP
        print("\n可疑IP (请求量最高):")
        for bucket in data["suspicious_ips"]["buckets"][:5]:
            ip = bucket["key"]
            requests = bucket["request_count"]["value"]
            errors = bucket["error_rate"]["doc_count"]
            print(f"- {ip}: {requests} 请求, {errors} 错误")
        
        # 攻击模式
        print("\n检测到的攻击模式:")
        for pattern, bucket in data["attack_patterns"]["buckets"].items():
            if bucket["doc_count"] > 0:
                print(f"- {pattern}: {bucket['doc_count']} 次")

8. 聚合性能优化

8.1 性能优化技巧

# 使用采样减少计算量
GET /large_index/_search
{
  "size": 0,
  "aggs": {
    "sample": {
      "sampler": {
        "shard_size": 1000
      },
      "aggs": {
        "expensive_aggregation": {
          "terms": {
            "field": "category.keyword"
          }
        }
      }
    }
  }
}

# 使用composite聚合处理大量桶
GET /products/_search
{
  "size": 0,
  "aggs": {
    "product_combinations": {
      "composite": {
        "size": 1000,
        "sources": [
          {
            "category": {
              "terms": {"field": "category.keyword"}
            }
          },
          {
            "brand": {
              "terms": {"field": "brand.keyword"}
            }
          }
        ]
      },
      "aggs": {
        "avg_price": {
          "avg": {"field": "price"}
        }
      }
    }
  }
}

# 使用过滤器减少聚合范围
GET /logs/_search
{
  "size": 0,
  "query": {
    "bool": {
      "filter": [
        {
          "range": {
            "@timestamp": {
              "gte": "now-1h"
            }
          }
        },
        {
          "term": {
            "service.name.keyword": "web-service"
          }
        }
      ]
    }
  },
  "aggs": {
    "response_time_stats": {
      "stats": {
        "field": "http.response_time"
      }
    }
  }
}

8.2 聚合缓存

# 启用聚合缓存
PUT /products/_settings
{
  "index.requests.cache.enable": true
}

# 使用缓存的聚合查询
GET /products/_search?request_cache=true
{
  "size": 0,
  "aggs": {
    "categories": {
      "terms": {
        "field": "category.keyword"
      }
    }
  }
}

本章总结

本章详细介绍了Elasticsearch的聚合分析功能:

  1. 聚合基础:掌握了聚合的基本概念和语法结构
  2. 桶聚合:学习了terms、range、histogram、filter等桶聚合
  3. 指标聚合:了解了stats、percentiles、cardinality等指标聚合
  4. 管道聚合:掌握了对聚合结果进行二次计算的管道聚合
  5. 复杂聚合:学习了多层嵌套和组合聚合的使用
  6. 实践案例:通过电商分析和日志分析案例学习了实际应用
  7. 性能优化:了解了聚合性能优化的技巧和方法

下一章我们将学习Elasticsearch的性能优化和调优技巧。

练习题

  1. 实现一个销售报表聚合,包含月度趋势、产品分类分析和客户分层
  2. 编写一个日志监控聚合,统计错误率、响应时间分布和异常检测
  3. 创建一个用户行为分析聚合,包含访问路径、停留时间和转化漏斗
  4. 实现一个地理位置聚合分析,统计不同地区的业务指标