学习目标

通过本章学习,您将掌握: - Query DSL的基本语法和结构 - 各种查询类型的使用方法 - 复合查询和嵌套查询 - 过滤器和聚合查询 - 搜索结果的排序和分页 - 高亮显示和搜索建议

1. Query DSL基础

1.1 查询结构

Elasticsearch使用JSON格式的Query DSL(Domain Specific Language)来构建查询:

GET /index_name/_search
{
  "query": {
    # 查询条件
  },
  "filter": {
    # 过滤条件
  },
  "sort": [
    # 排序条件
  ],
  "from": 0,
  "size": 10,
  "_source": ["field1", "field2"],
  "highlight": {
    # 高亮设置
  },
  "aggs": {
    # 聚合查询
  }
}

1.2 查询上下文 vs 过滤上下文

  • 查询上下文:计算相关性评分,影响排序
  • 过滤上下文:只判断是否匹配,不计算评分,可缓存
# 查询上下文示例
GET /products/_search
{
  "query": {
    "match": {
      "title": "smartphone"
    }
  }
}

# 过滤上下文示例
GET /products/_search
{
  "query": {
    "bool": {
      "filter": [
        {
          "term": {
            "status": "published"
          }
        },
        {
          "range": {
            "price": {
              "gte": 100,
              "lte": 1000
            }
          }
        }
      ]
    }
  }
}

2. 基础查询类型

2.1 全文搜索查询

Match查询

# 基本match查询
GET /products/_search
{
  "query": {
    "match": {
      "title": "smartphone apple"
    }
  }
}

# 带参数的match查询
GET /products/_search
{
  "query": {
    "match": {
      "title": {
        "query": "smartphone apple",
        "operator": "and",
        "minimum_should_match": "75%",
        "fuzziness": "AUTO",
        "prefix_length": 2,
        "max_expansions": 50
      }
    }
  }
}

Match Phrase查询

# 短语查询
GET /products/_search
{
  "query": {
    "match_phrase": {
      "description": "high quality smartphone"
    }
  }
}

# 带slop的短语查询
GET /products/_search
{
  "query": {
    "match_phrase": {
      "description": {
        "query": "high quality smartphone",
        "slop": 2
      }
    }
  }
}

Match Phrase Prefix查询

# 短语前缀查询(自动补全)
GET /products/_search
{
  "query": {
    "match_phrase_prefix": {
      "title": {
        "query": "smart pho",
        "max_expansions": 10
      }
    }
  }
}

Multi Match查询

# 多字段查询
GET /products/_search
{
  "query": {
    "multi_match": {
      "query": "smartphone",
      "fields": ["title^2", "description", "brand"]
    }
  }
}

# 不同类型的multi_match
GET /products/_search
{
  "query": {
    "multi_match": {
      "query": "apple smartphone",
      "fields": ["title", "description"],
      "type": "best_fields",  # best_fields, most_fields, cross_fields, phrase, phrase_prefix
      "tie_breaker": 0.3,
      "minimum_should_match": "30%"
    }
  }
}

2.2 精确匹配查询

Term查询

# 精确匹配单个值
GET /products/_search
{
  "query": {
    "term": {
      "status": "published"
    }
  }
}

# 带boost的term查询
GET /products/_search
{
  "query": {
    "term": {
      "category": {
        "value": "electronics",
        "boost": 2.0
      }
    }
  }
}

Terms查询

# 匹配多个值
GET /products/_search
{
  "query": {
    "terms": {
      "status": ["published", "featured"]
    }
  }
}

# Terms查询与索引
GET /products/_search
{
  "query": {
    "terms": {
      "category": {
        "index": "categories",
        "id": "1",
        "path": "category_names"
      }
    }
  }
}

Range查询

# 范围查询
GET /products/_search
{
  "query": {
    "range": {
      "price": {
        "gte": 100,
        "lte": 1000,
        "boost": 2.0
      }
    }
  }
}

# 日期范围查询
GET /logs/_search
{
  "query": {
    "range": {
      "@timestamp": {
        "gte": "2024-01-01",
        "lte": "2024-12-31",
        "format": "yyyy-MM-dd",
        "time_zone": "+08:00"
      }
    }
  }
}

# 相对时间查询
GET /logs/_search
{
  "query": {
    "range": {
      "@timestamp": {
        "gte": "now-1d/d",
        "lt": "now/d"
      }
    }
  }
}

Exists查询

# 字段存在查询
GET /products/_search
{
  "query": {
    "exists": {
      "field": "description"
    }
  }
}

Prefix查询

# 前缀查询
GET /products/_search
{
  "query": {
    "prefix": {
      "title": {
        "value": "smart",
        "boost": 2.0
      }
    }
  }
}

Wildcard查询

# 通配符查询
GET /products/_search
{
  "query": {
    "wildcard": {
      "title": {
        "value": "smart*phone",
        "boost": 2.0,
        "rewrite": "constant_score"
      }
    }
  }
}

Regexp查询

# 正则表达式查询
GET /products/_search
{
  "query": {
    "regexp": {
      "title": {
        "value": "smart.*phone",
        "flags": "INTERSECTION|COMPLEMENT|EMPTY",
        "max_determinized_states": 10000,
        "rewrite": "constant_score"
      }
    }
  }
}

2.3 模糊查询

Fuzzy查询

# 模糊查询
GET /products/_search
{
  "query": {
    "fuzzy": {
      "title": {
        "value": "smartphon",
        "fuzziness": "AUTO",
        "max_expansions": 50,
        "prefix_length": 2,
        "transpositions": true
      }
    }
  }
}

3. 复合查询

3.1 Bool查询

# 复杂的bool查询
GET /products/_search
{
  "query": {
    "bool": {
      "must": [
        {
          "match": {
            "title": "smartphone"
          }
        }
      ],
      "must_not": [
        {
          "term": {
            "status": "discontinued"
          }
        }
      ],
      "should": [
        {
          "term": {
            "brand": "apple"
          }
        },
        {
          "term": {
            "brand": "samsung"
          }
        }
      ],
      "filter": [
        {
          "range": {
            "price": {
              "gte": 100,
              "lte": 2000
            }
          }
        },
        {
          "term": {
            "in_stock": true
          }
        }
      ],
      "minimum_should_match": 1,
      "boost": 1.2
    }
  }
}

3.2 Boosting查询

# 提升查询
GET /products/_search
{
  "query": {
    "boosting": {
      "positive": {
        "match": {
          "title": "smartphone"
        }
      },
      "negative": {
        "term": {
          "brand": "unknown"
        }
      },
      "negative_boost": 0.2
    }
  }
}

3.3 Constant Score查询

# 固定评分查询
GET /products/_search
{
  "query": {
    "constant_score": {
      "filter": {
        "term": {
          "category": "electronics"
        }
      },
      "boost": 1.2
    }
  }
}

3.4 Dis Max查询

# 分离最大查询
GET /products/_search
{
  "query": {
    "dis_max": {
      "queries": [
        {
          "match": {
            "title": "smartphone"
          }
        },
        {
          "match": {
            "description": "smartphone"
          }
        }
      ],
      "tie_breaker": 0.7
    }
  }
}

3.5 Function Score查询

# 函数评分查询
GET /products/_search
{
  "query": {
    "function_score": {
      "query": {
        "match": {
          "title": "smartphone"
        }
      },
      "functions": [
        {
          "filter": {
            "term": {
              "brand": "apple"
            }
          },
          "weight": 2
        },
        {
          "field_value_factor": {
            "field": "popularity",
            "factor": 1.2,
            "modifier": "sqrt",
            "missing": 1
          }
        },
        {
          "gauss": {
            "location": {
              "origin": "40.7128,-74.0060",
              "scale": "10km",
              "offset": "2km",
              "decay": 0.5
            }
          }
        },
        {
          "script_score": {
            "script": {
              "source": "Math.log(2 + doc['views'].value)"
            }
          }
        }
      ],
      "score_mode": "multiply",
      "boost_mode": "multiply",
      "max_boost": 42,
      "min_score": 0.5
    }
  }
}

4. 嵌套和父子查询

4.1 Nested查询

# 嵌套查询
GET /products/_search
{
  "query": {
    "nested": {
      "path": "reviews",
      "query": {
        "bool": {
          "must": [
            {
              "match": {
                "reviews.title": "excellent"
              }
            },
            {
              "range": {
                "reviews.rating": {
                  "gte": 4
                }
              }
            }
          ]
        }
      },
      "score_mode": "avg",
      "boost": 1.1
    }
  }
}

# 嵌套查询与内部命中
GET /products/_search
{
  "query": {
    "nested": {
      "path": "reviews",
      "query": {
        "match": {
          "reviews.content": "great product"
        }
      },
      "inner_hits": {
        "highlight": {
          "fields": {
            "reviews.content": {}
          }
        }
      }
    }
  }
}

4.2 Has Child查询

# 子文档查询
GET /posts/_search
{
  "query": {
    "has_child": {
      "type": "comment",
      "query": {
        "match": {
          "content": "excellent"
        }
      },
      "score_mode": "avg",
      "min_children": 2,
      "max_children": 10
    }
  }
}

4.3 Has Parent查询

# 父文档查询
GET /comments/_search
{
  "query": {
    "has_parent": {
      "parent_type": "post",
      "query": {
        "match": {
          "title": "elasticsearch"
        }
      },
      "score": true
    }
  }
}

5. 地理位置查询

5.1 Geo Distance查询

# 地理距离查询
GET /locations/_search
{
  "query": {
    "geo_distance": {
      "distance": "10km",
      "location": {
        "lat": 40.7128,
        "lon": -74.0060
      }
    }
  }
}

# 带距离类型的查询
GET /locations/_search
{
  "query": {
    "geo_distance": {
      "distance": "10km",
      "distance_type": "arc",
      "location": "40.7128,-74.0060"
    }
  }
}

5.2 Geo Bounding Box查询

# 地理边界框查询
GET /locations/_search
{
  "query": {
    "geo_bounding_box": {
      "location": {
        "top_left": {
          "lat": 40.8,
          "lon": -74.1
        },
        "bottom_right": {
          "lat": 40.7,
          "lon": -74.0
        }
      }
    }
  }
}

5.3 Geo Shape查询

# 地理形状查询
GET /locations/_search
{
  "query": {
    "geo_shape": {
      "location": {
        "shape": {
          "type": "envelope",
          "coordinates": [[-74.1, 40.8], [-74.0, 40.7]]
        },
        "relation": "within"
      }
    }
  }
}

6. 特殊查询

6.1 More Like This查询

# 相似文档查询
GET /articles/_search
{
  "query": {
    "more_like_this": {
      "fields": ["title", "content"],
      "like": [
        {
          "_index": "articles",
          "_id": "1"
        },
        "artificial intelligence machine learning"
      ],
      "min_term_freq": 2,
      "max_query_terms": 12,
      "min_doc_freq": 5,
      "max_doc_freq": 1000,
      "minimum_should_match": "30%",
      "boost_terms": 1.2,
      "include": false
    }
  }
}

6.2 Script查询

# 脚本查询
GET /products/_search
{
  "query": {
    "script": {
      "script": {
        "source": "doc['price'].value * doc['discount'].value < params.threshold",
        "params": {
          "threshold": 100
        }
      }
    }
  }
}

6.3 Percolate查询

# 反向查询(查询匹配文档的查询)
GET /queries/_search
{
  "query": {
    "percolate": {
      "field": "query",
      "document": {
        "title": "smartphone apple iphone",
        "price": 999
      }
    }
  }
}

7. 搜索结果处理

7.1 排序

# 基本排序
GET /products/_search
{
  "query": {
    "match_all": {}
  },
  "sort": [
    {
      "price": {
        "order": "desc"
      }
    },
    {
      "_score": {
        "order": "desc"
      }
    },
    "_id"
  ]
}

# 复杂排序
GET /products/_search
{
  "query": {
    "match": {
      "title": "smartphone"
    }
  },
  "sort": [
    {
      "_geo_distance": {
        "location": {
          "lat": 40.7128,
          "lon": -74.0060
        },
        "order": "asc",
        "unit": "km",
        "mode": "min",
        "distance_type": "arc"
      }
    },
    {
      "price": {
        "order": "asc",
        "missing": "_last"
      }
    }
  ]
}

# 脚本排序
GET /products/_search
{
  "query": {
    "match_all": {}
  },
  "sort": {
    "_script": {
      "type": "number",
      "script": {
        "source": "doc['price'].value * doc['rating'].value"
      },
      "order": "desc"
    }
  }
}

7.2 分页

# 基本分页
GET /products/_search
{
  "query": {
    "match_all": {}
  },
  "from": 20,
  "size": 10
}

# Search After分页(推荐用于深度分页)
GET /products/_search
{
  "query": {
    "match_all": {}
  },
  "size": 10,
  "sort": [
    {
      "price": "asc"
    },
    {
      "_id": "asc"
    }
  ],
  "search_after": [999, "product_123"]
}

# Scroll API(用于大量数据导出)
GET /products/_search?scroll=1m
{
  "query": {
    "match_all": {}
  },
  "size": 1000
}

# 继续scroll
GET /_search/scroll
{
  "scroll": "1m",
  "scroll_id": "scroll_id_here"
}

7.3 字段选择

# 选择返回字段
GET /products/_search
{
  "query": {
    "match": {
      "title": "smartphone"
    }
  },
  "_source": ["title", "price", "brand"]
}

# 排除字段
GET /products/_search
{
  "query": {
    "match_all": {}
  },
  "_source": {
    "excludes": ["description", "reviews"]
  }
}

# 包含和排除
GET /products/_search
{
  "query": {
    "match_all": {}
  },
  "_source": {
    "includes": ["title", "price", "spec.*"],
    "excludes": ["spec.internal_code"]
  }
}

7.4 高亮显示

# 基本高亮
GET /articles/_search
{
  "query": {
    "match": {
      "content": "elasticsearch"
    }
  },
  "highlight": {
    "fields": {
      "content": {}
    }
  }
}

# 高级高亮配置
GET /articles/_search
{
  "query": {
    "match": {
      "content": "elasticsearch search engine"
    }
  },
  "highlight": {
    "pre_tags": ["<mark>"],
    "post_tags": ["</mark>"],
    "fields": {
      "title": {
        "fragment_size": 150,
        "number_of_fragments": 3,
        "no_match_size": 150
      },
      "content": {
        "fragment_size": 150,
        "number_of_fragments": 3,
        "highlight_query": {
          "bool": {
            "must": {
              "match": {
                "content": {
                  "query": "elasticsearch search engine",
                  "minimum_should_match": "50%"
                }
              }
            }
          }
        }
      }
    }
  }
}

8. 搜索建议

8.1 Term Suggester

# 词条建议
GET /products/_search
{
  "suggest": {
    "my_suggestion": {
      "text": "smartphon",
      "term": {
        "field": "title",
        "size": 3,
        "sort": "score",
        "suggest_mode": "missing"
      }
    }
  }
}

8.2 Phrase Suggester

# 短语建议
GET /products/_search
{
  "suggest": {
    "my_phrase_suggestion": {
      "text": "smart phon apple",
      "phrase": {
        "field": "title",
        "size": 3,
        "gram_size": 3,
        "direct_generator": [
          {
            "field": "title",
            "suggest_mode": "always",
            "min_word_length": 1
          }
        ],
        "highlight": {
          "pre_tag": "<em>",
          "post_tag": "</em>"
        }
      }
    }
  }
}

8.3 Completion Suggester

# 自动补全建议
GET /products/_search
{
  "suggest": {
    "product_suggest": {
      "prefix": "smart",
      "completion": {
        "field": "suggest",
        "size": 5,
        "contexts": {
          "category": "electronics"
        }
      }
    }
  }
}

8.4 Context Suggester

# 上下文建议
GET /products/_search
{
  "suggest": {
    "product_suggest": {
      "prefix": "smart",
      "completion": {
        "field": "suggest",
        "size": 5,
        "contexts": {
          "category": ["electronics", "mobile"],
          "location": {
            "lat": 40.7128,
            "lon": -74.0060,
            "precision": "5km"
          }
        }
      }
    }
  }
}

9. 实践案例

9.1 电商搜索实现

#!/usr/bin/env python3
# ecommerce_search.py

from elasticsearch import Elasticsearch
from typing import Dict, List, Optional, Any
import json

class EcommerceSearch:
    def __init__(self, es_host="localhost:9200", username=None, password=None):
        if username and password:
            self.es = Elasticsearch(
                [es_host],
                http_auth=(username, password)
            )
        else:
            self.es = Elasticsearch([es_host])
    
    def search_products(self, 
                       query: str = None,
                       category: str = None,
                       brand: str = None,
                       price_min: float = None,
                       price_max: float = None,
                       in_stock: bool = None,
                       sort_by: str = "relevance",
                       page: int = 1,
                       size: int = 20) -> Dict[str, Any]:
        """商品搜索"""
        
        # 构建查询
        search_body = {
            "query": {
                "bool": {
                    "must": [],
                    "filter": []
                }
            },
            "from": (page - 1) * size,
            "size": size,
            "highlight": {
                "fields": {
                    "title": {},
                    "description": {}
                }
            },
            "aggs": {
                "categories": {
                    "terms": {
                        "field": "category.keyword",
                        "size": 10
                    }
                },
                "brands": {
                    "terms": {
                        "field": "brand.keyword",
                        "size": 10
                    }
                },
                "price_ranges": {
                    "range": {
                        "field": "price",
                        "ranges": [
                            {"to": 100},
                            {"from": 100, "to": 500},
                            {"from": 500, "to": 1000},
                            {"from": 1000}
                        ]
                    }
                }
            }
        }
        
        # 添加文本查询
        if query:
            search_body["query"]["bool"]["must"].append({
                "multi_match": {
                    "query": query,
                    "fields": ["title^3", "description^2", "brand", "category"],
                    "type": "best_fields",
                    "fuzziness": "AUTO",
                    "minimum_should_match": "75%"
                }
            })
        else:
            search_body["query"]["bool"]["must"].append({
                "match_all": {}
            })
        
        # 添加过滤条件
        if category:
            search_body["query"]["bool"]["filter"].append({
                "term": {"category.keyword": category}
            })
        
        if brand:
            search_body["query"]["bool"]["filter"].append({
                "term": {"brand.keyword": brand}
            })
        
        if price_min is not None or price_max is not None:
            price_range = {}
            if price_min is not None:
                price_range["gte"] = price_min
            if price_max is not None:
                price_range["lte"] = price_max
            
            search_body["query"]["bool"]["filter"].append({
                "range": {"price": price_range}
            })
        
        if in_stock is not None:
            search_body["query"]["bool"]["filter"].append({
                "term": {"in_stock": in_stock}
            })
        
        # 添加排序
        if sort_by == "price_asc":
            search_body["sort"] = [{"price": "asc"}]
        elif sort_by == "price_desc":
            search_body["sort"] = [{"price": "desc"}]
        elif sort_by == "rating":
            search_body["sort"] = [{"rating.average": "desc"}]
        elif sort_by == "newest":
            search_body["sort"] = [{"created_at": "desc"}]
        # 默认按相关性排序
        
        try:
            response = self.es.search(
                index="products",
                body=search_body
            )
            
            return {
                "total": response["hits"]["total"]["value"],
                "products": [
                    {
                        "id": hit["_id"],
                        "score": hit["_score"],
                        "source": hit["_source"],
                        "highlight": hit.get("highlight", {})
                    }
                    for hit in response["hits"]["hits"]
                ],
                "aggregations": response.get("aggregations", {}),
                "took": response["took"]
            }
        
        except Exception as e:
            return {
                "error": str(e),
                "total": 0,
                "products": [],
                "aggregations": {}
            }
    
    def suggest_products(self, prefix: str, size: int = 5) -> List[str]:
        """商品自动补全建议"""
        try:
            response = self.es.search(
                index="products",
                body={
                    "suggest": {
                        "product_suggest": {
                            "prefix": prefix,
                            "completion": {
                                "field": "suggest",
                                "size": size
                            }
                        }
                    }
                }
            )
            
            suggestions = []
            for option in response["suggest"]["product_suggest"][0]["options"]:
                suggestions.append(option["text"])
            
            return suggestions
        
        except Exception as e:
            print(f"Suggestion error: {e}")
            return []
    
    def similar_products(self, product_id: str, size: int = 5) -> List[Dict]:
        """相似商品推荐"""
        try:
            response = self.es.search(
                index="products",
                body={
                    "query": {
                        "more_like_this": {
                            "fields": ["title", "description", "category", "brand"],
                            "like": [
                                {
                                    "_index": "products",
                                    "_id": product_id
                                }
                            ],
                            "min_term_freq": 2,
                            "max_query_terms": 12,
                            "minimum_should_match": "30%"
                        }
                    },
                    "size": size
                }
            )
            
            return [
                {
                    "id": hit["_id"],
                    "score": hit["_score"],
                    "source": hit["_source"]
                }
                for hit in response["hits"]["hits"]
            ]
        
        except Exception as e:
            print(f"Similar products error: {e}")
            return []
    
    def advanced_search(self, filters: Dict[str, Any]) -> Dict[str, Any]:
        """高级搜索"""
        search_body = {
            "query": {
                "bool": {
                    "must": [],
                    "filter": [],
                    "should": [],
                    "must_not": []
                }
            },
            "size": filters.get("size", 20),
            "from": filters.get("from", 0)
        }
        
        # 文本搜索
        if filters.get("query"):
            search_body["query"]["bool"]["must"].append({
                "multi_match": {
                    "query": filters["query"],
                    "fields": ["title^3", "description^2", "brand", "category"]
                }
            })
        
        # 分类过滤
        if filters.get("categories"):
            search_body["query"]["bool"]["filter"].append({
                "terms": {"category.keyword": filters["categories"]}
            })
        
        # 品牌过滤
        if filters.get("brands"):
            search_body["query"]["bool"]["filter"].append({
                "terms": {"brand.keyword": filters["brands"]}
            })
        
        # 价格范围
        if filters.get("price_range"):
            search_body["query"]["bool"]["filter"].append({
                "range": {
                    "price": {
                        "gte": filters["price_range"]["min"],
                        "lte": filters["price_range"]["max"]
                    }
                }
            })
        
        # 评分过滤
        if filters.get("min_rating"):
            search_body["query"]["bool"]["filter"].append({
                "range": {
                    "rating.average": {
                        "gte": filters["min_rating"]
                    }
                }
            })
        
        # 库存过滤
        if filters.get("in_stock_only"):
            search_body["query"]["bool"]["filter"].append({
                "term": {"in_stock": True}
            })
        
        # 排除条件
        if filters.get("exclude_brands"):
            search_body["query"]["bool"]["must_not"].append({
                "terms": {"brand.keyword": filters["exclude_brands"]}
            })
        
        # 提升条件
        if filters.get("boost_new_products"):
            search_body["query"]["bool"]["should"].append({
                "range": {
                    "created_at": {
                        "gte": "now-30d",
                        "boost": 2.0
                    }
                }
            })
        
        # 排序
        if filters.get("sort"):
            search_body["sort"] = filters["sort"]
        
        try:
            response = self.es.search(
                index="products",
                body=search_body
            )
            
            return {
                "total": response["hits"]["total"]["value"],
                "products": [
                    {
                        "id": hit["_id"],
                        "score": hit["_score"],
                        "source": hit["_source"]
                    }
                    for hit in response["hits"]["hits"]
                ],
                "took": response["took"]
            }
        
        except Exception as e:
            return {
                "error": str(e),
                "total": 0,
                "products": []
            }

# 使用示例
if __name__ == "__main__":
    search = EcommerceSearch()
    
    # 基本搜索
    results = search.search_products(
        query="smartphone",
        category="electronics",
        price_min=100,
        price_max=1000,
        sort_by="price_asc",
        page=1,
        size=10
    )
    
    print(f"Found {results['total']} products")
    for product in results['products']:
        print(f"- {product['source']['title']} - ${product['source']['price']}")
    
    # 自动补全
    suggestions = search.suggest_products("smart")
    print(f"Suggestions: {suggestions}")
    
    # 相似商品
    similar = search.similar_products("product_123")
    print(f"Similar products: {len(similar)}")

9.2 日志分析搜索

#!/usr/bin/env python3
# log_search.py

from elasticsearch import Elasticsearch
from datetime import datetime, timedelta
from typing import Dict, List, Any

class LogSearch:
    def __init__(self, es_host="localhost:9200", username=None, password=None):
        if username and password:
            self.es = Elasticsearch(
                [es_host],
                http_auth=(username, password)
            )
        else:
            self.es = Elasticsearch([es_host])
    
    def search_logs(self,
                   query: str = None,
                   level: str = None,
                   service: str = None,
                   host: str = None,
                   start_time: str = None,
                   end_time: str = None,
                   size: int = 100) -> Dict[str, Any]:
        """日志搜索"""
        
        search_body = {
            "query": {
                "bool": {
                    "must": [],
                    "filter": []
                }
            },
            "sort": [
                {"@timestamp": "desc"}
            ],
            "size": size,
            "highlight": {
                "fields": {
                    "message": {}
                }
            }
        }
        
        # 文本查询
        if query:
            search_body["query"]["bool"]["must"].append({
                "multi_match": {
                    "query": query,
                    "fields": ["message^2", "error.message", "http.url"],
                    "type": "best_fields"
                }
            })
        else:
            search_body["query"]["bool"]["must"].append({
                "match_all": {}
            })
        
        # 日志级别过滤
        if level:
            search_body["query"]["bool"]["filter"].append({
                "term": {"level.keyword": level}
            })
        
        # 服务过滤
        if service:
            search_body["query"]["bool"]["filter"].append({
                "term": {"service.name.keyword": service}
            })
        
        # 主机过滤
        if host:
            search_body["query"]["bool"]["filter"].append({
                "term": {"host.name.keyword": host}
            })
        
        # 时间范围过滤
        time_range = {}
        if start_time:
            time_range["gte"] = start_time
        if end_time:
            time_range["lte"] = end_time
        
        if time_range:
            search_body["query"]["bool"]["filter"].append({
                "range": {"@timestamp": time_range}
            })
        
        try:
            response = self.es.search(
                index="logs-*",
                body=search_body
            )
            
            return {
                "total": response["hits"]["total"]["value"],
                "logs": [
                    {
                        "id": hit["_id"],
                        "index": hit["_index"],
                        "source": hit["_source"],
                        "highlight": hit.get("highlight", {})
                    }
                    for hit in response["hits"]["hits"]
                ],
                "took": response["took"]
            }
        
        except Exception as e:
            return {
                "error": str(e),
                "total": 0,
                "logs": []
            }
    
    def error_analysis(self, time_range: str = "1h") -> Dict[str, Any]:
        """错误日志分析"""
        search_body = {
            "query": {
                "bool": {
                    "filter": [
                        {
                            "term": {"level.keyword": "ERROR"}
                        },
                        {
                            "range": {
                                "@timestamp": {
                                    "gte": f"now-{time_range}"
                                }
                            }
                        }
                    ]
                }
            },
            "size": 0,
            "aggs": {
                "error_types": {
                    "terms": {
                        "field": "error.type.keyword",
                        "size": 10
                    }
                },
                "services": {
                    "terms": {
                        "field": "service.name.keyword",
                        "size": 10
                    }
                },
                "hosts": {
                    "terms": {
                        "field": "host.name.keyword",
                        "size": 10
                    }
                },
                "timeline": {
                    "date_histogram": {
                        "field": "@timestamp",
                        "calendar_interval": "5m"
                    }
                }
            }
        }
        
        try:
            response = self.es.search(
                index="logs-*",
                body=search_body
            )
            
            return {
                "total_errors": response["hits"]["total"]["value"],
                "aggregations": response["aggregations"],
                "took": response["took"]
            }
        
        except Exception as e:
            return {"error": str(e)}
    
    def performance_analysis(self, service: str, time_range: str = "1h") -> Dict[str, Any]:
        """性能分析"""
        search_body = {
            "query": {
                "bool": {
                    "filter": [
                        {
                            "term": {"service.name.keyword": service}
                        },
                        {
                            "exists": {"field": "http.response_time"}
                        },
                        {
                            "range": {
                                "@timestamp": {
                                    "gte": f"now-{time_range}"
                                }
                            }
                        }
                    ]
                }
            },
            "size": 0,
            "aggs": {
                "response_time_stats": {
                    "stats": {
                        "field": "http.response_time"
                    }
                },
                "response_time_percentiles": {
                    "percentiles": {
                        "field": "http.response_time",
                        "percents": [50, 90, 95, 99]
                    }
                },
                "slow_requests": {
                    "filter": {
                        "range": {
                            "http.response_time": {
                                "gte": 1000
                            }
                        }
                    }
                },
                "status_codes": {
                    "terms": {
                        "field": "http.status_code",
                        "size": 10
                    }
                }
            }
        }
        
        try:
            response = self.es.search(
                index="logs-*",
                body=search_body
            )
            
            return {
                "total_requests": response["hits"]["total"]["value"],
                "aggregations": response["aggregations"],
                "took": response["took"]
            }
        
        except Exception as e:
            return {"error": str(e)}

# 使用示例
if __name__ == "__main__":
    log_search = LogSearch()
    
    # 搜索错误日志
    results = log_search.search_logs(
        query="database connection",
        level="ERROR",
        start_time="now-1h",
        size=50
    )
    
    print(f"Found {results['total']} error logs")
    
    # 错误分析
    error_analysis = log_search.error_analysis("1h")
    print(f"Total errors in last hour: {error_analysis['total_errors']}")
    
    # 性能分析
    perf_analysis = log_search.performance_analysis("web-service", "1h")
    print(f"Performance analysis for web-service: {perf_analysis}")

本章总结

本章详细介绍了Elasticsearch的搜索查询语法:

  1. Query DSL基础:掌握了查询结构和上下文概念
  2. 基础查询类型:学习了全文搜索、精确匹配、模糊查询等
  3. 复合查询:了解了bool、boosting、function_score等复合查询
  4. 特殊查询:掌握了嵌套查询、地理位置查询、脚本查询等
  5. 结果处理:学习了排序、分页、高亮、建议等功能
  6. 实践案例:通过电商搜索和日志分析案例学习了实际应用

下一章我们将学习Elasticsearch的聚合分析功能,深入了解如何进行数据统计和分析。

练习题

  1. 实现一个多条件商品搜索功能,支持价格范围、品牌、分类等过滤
  2. 编写一个日志分析查询,统计不同服务的错误率
  3. 实现一个地理位置搜索,查找附近的商店或服务
  4. 创建一个自动补全搜索功能,支持拼写纠错和建议