1. ELK Stack集成

1.1 Logstash集成

基础配置

# logstash.conf
input {
  # 文件输入
  file {
    path => "/var/log/application/*.log"
    start_position => "beginning"
    sincedb_path => "/dev/null"
    codec => "json"
    tags => ["application"]
  }
  
  # Beats输入
  beats {
    port => 5044
  }
  
  # Syslog输入
  syslog {
    port => 514
    tags => ["syslog"]
  }
  
  # HTTP输入
  http {
    port => 8080
    codec => "json"
    tags => ["http_input"]
  }
}

filter {
  # 解析应用日志
  if "application" in [tags] {
    # 解析时间戳
    date {
      match => [ "timestamp", "yyyy-MM-dd HH:mm:ss.SSS" ]
      target => "@timestamp"
    }
    
    # 解析日志级别
    mutate {
      uppercase => [ "level" ]
    }
    
    # 添加字段
    mutate {
      add_field => { "log_type" => "application" }
      add_field => { "environment" => "production" }
    }
    
    # 解析异常堆栈
    if [message] =~ /Exception/ {
      mutate {
        add_tag => [ "exception" ]
      }
      
      # 提取异常类型
      grok {
        match => { "message" => "%{JAVACLASS:exception_class}: %{GREEDYDATA:exception_message}" }
      }
    }
  }
  
  # 解析Web访问日志
  if "nginx" in [tags] {
    grok {
      match => { 
        "message" => "%{COMBINEDAPACHELOG}" 
      }
    }
    
    # 解析User-Agent
    useragent {
      source => "agent"
      target => "user_agent"
    }
    
    # 地理位置解析
    geoip {
      source => "clientip"
      target => "geoip"
    }
    
    # 转换响应时间为数字
    mutate {
      convert => { "response" => "integer" }
      convert => { "bytes" => "integer" }
    }
  }
  
  # 通用过滤器
  # 移除空字段
  ruby {
    code => "
      event.to_hash.each { |k, v|
        if v.nil? || (v.respond_to?(:empty?) && v.empty?)
          event.remove(k)
        end
      }
    "
  }
  
  # 添加处理时间戳
  ruby {
    code => "event.set('processed_at', Time.now.utc.iso8601)"
  }
}

output {
  # 输出到Elasticsearch
  elasticsearch {
    hosts => ["localhost:9200"]
    index => "logs-%{+YYYY.MM.dd}"
    template_name => "logs"
    template_pattern => "logs-*"
    template => "/etc/logstash/templates/logs.json"
    
    # 认证配置
    user => "logstash_writer"
    password => "${LOGSTASH_PASSWORD}"
    
    # 性能优化
    workers => 4
    flush_size => 1000
    idle_flush_time => 5
  }
  
  # 错误输出到文件
  if "_grokparsefailure" in [tags] {
    file {
      path => "/var/log/logstash/grok_failures.log"
      codec => "json_lines"
    }
  }
  
  # 调试输出
  if [log_level] == "DEBUG" {
    stdout {
      codec => "rubydebug"
    }
  }
}

Logstash模板配置

{
  "index_patterns": ["logs-*"],
  "settings": {
    "number_of_shards": 1,
    "number_of_replicas": 1,
    "index.refresh_interval": "5s",
    "index.codec": "best_compression"
  },
  "mappings": {
    "properties": {
      "@timestamp": {
        "type": "date"
      },
      "level": {
        "type": "keyword"
      },
      "message": {
        "type": "text",
        "analyzer": "standard"
      },
      "service": {
        "type": "keyword"
      },
      "host": {
        "type": "keyword"
      },
      "clientip": {
        "type": "ip"
      },
      "response": {
        "type": "integer"
      },
      "bytes": {
        "type": "long"
      },
      "geoip": {
        "properties": {
          "location": {
            "type": "geo_point"
          },
          "country_name": {
            "type": "keyword"
          },
          "city_name": {
            "type": "keyword"
          }
        }
      },
      "user_agent": {
        "properties": {
          "name": {
            "type": "keyword"
          },
          "version": {
            "type": "keyword"
          },
          "os": {
            "type": "keyword"
          }
        }
      }
    }
  }
}

1.2 Kibana集成

Kibana配置

# kibana.yml
server.port: 5601
server.host: "0.0.0.0"
server.name: "kibana-server"

# Elasticsearch配置
elasticsearch.hosts: ["http://localhost:9200"]
elasticsearch.username: "kibana_system"
elasticsearch.password: "${KIBANA_PASSWORD}"

# 安全配置
xpack.security.enabled: true
xpack.security.encryptionKey: "something_at_least_32_characters"
xpack.security.session.idleTimeout: "1h"
xpack.security.session.lifespan: "30d"

# 监控配置
xpack.monitoring.enabled: true
xpack.monitoring.kibana.collection.enabled: true

# 日志配置
logging.appenders:
  file:
    type: file
    fileName: /var/log/kibana/kibana.log
    layout:
      type: json

logging.root:
  level: info
  appenders: [file]

Kibana仪表板自动化

import requests
import json
from typing import Dict, List, Any, Optional

class KibanaManager:
    def __init__(self, kibana_url: str, username: str, password: str):
        self.kibana_url = kibana_url.rstrip('/')
        self.session = requests.Session()
        self.session.auth = (username, password)
        self.session.headers.update({
            'Content-Type': 'application/json',
            'kbn-xsrf': 'true'
        })
    
    def create_index_pattern(self, pattern_name: str, time_field: str = '@timestamp') -> bool:
        """创建索引模式"""
        index_pattern_data = {
            "attributes": {
                "title": pattern_name,
                "timeFieldName": time_field
            }
        }
        
        try:
            response = self.session.post(
                f"{self.kibana_url}/api/saved_objects/index-pattern",
                json=index_pattern_data
            )
            
            if response.status_code in [200, 201]:
                print(f"索引模式 {pattern_name} 创建成功")
                return True
            else:
                print(f"创建索引模式失败: {response.text}")
                return False
        
        except Exception as e:
            print(f"创建索引模式异常: {e}")
            return False
    
    def create_visualization(self, viz_config: Dict[str, Any]) -> Optional[str]:
        """创建可视化"""
        try:
            response = self.session.post(
                f"{self.kibana_url}/api/saved_objects/visualization",
                json=viz_config
            )
            
            if response.status_code in [200, 201]:
                result = response.json()
                viz_id = result['id']
                print(f"可视化 {viz_config['attributes']['title']} 创建成功,ID: {viz_id}")
                return viz_id
            else:
                print(f"创建可视化失败: {response.text}")
                return None
        
        except Exception as e:
            print(f"创建可视化异常: {e}")
            return None
    
    def create_dashboard(self, dashboard_config: Dict[str, Any]) -> Optional[str]:
        """创建仪表板"""
        try:
            response = self.session.post(
                f"{self.kibana_url}/api/saved_objects/dashboard",
                json=dashboard_config
            )
            
            if response.status_code in [200, 201]:
                result = response.json()
                dashboard_id = result['id']
                print(f"仪表板 {dashboard_config['attributes']['title']} 创建成功,ID: {dashboard_id}")
                return dashboard_id
            else:
                print(f"创建仪表板失败: {response.text}")
                return None
        
        except Exception as e:
            print(f"创建仪表板异常: {e}")
            return None
    
    def setup_log_monitoring_dashboard(self) -> bool:
        """设置日志监控仪表板"""
        # 创建索引模式
        if not self.create_index_pattern("logs-*"):
            return False
        
        # 创建可视化组件
        visualizations = []
        
        # 1. 日志级别分布饼图
        log_level_pie = {
            "attributes": {
                "title": "日志级别分布",
                "type": "pie",
                "visState": json.dumps({
                    "title": "日志级别分布",
                    "type": "pie",
                    "aggs": [
                        {
                            "id": "1",
                            "type": "count",
                            "schema": "metric",
                            "params": {}
                        },
                        {
                            "id": "2",
                            "type": "terms",
                            "schema": "segment",
                            "params": {
                                "field": "level.keyword",
                                "size": 10,
                                "order": "desc",
                                "orderBy": "1"
                            }
                        }
                    ]
                }),
                "kibanaSavedObjectMeta": {
                    "searchSourceJSON": json.dumps({
                        "index": "logs-*",
                        "query": {
                            "match_all": {}
                        }
                    })
                }
            }
        }
        
        viz_id = self.create_visualization(log_level_pie)
        if viz_id:
            visualizations.append(viz_id)
        
        # 2. 时间线图
        timeline_chart = {
            "attributes": {
                "title": "日志时间线",
                "type": "histogram",
                "visState": json.dumps({
                    "title": "日志时间线",
                    "type": "histogram",
                    "aggs": [
                        {
                            "id": "1",
                            "type": "count",
                            "schema": "metric",
                            "params": {}
                        },
                        {
                            "id": "2",
                            "type": "date_histogram",
                            "schema": "segment",
                            "params": {
                                "field": "@timestamp",
                                "interval": "auto",
                                "min_doc_count": 1
                            }
                        }
                    ]
                }),
                "kibanaSavedObjectMeta": {
                    "searchSourceJSON": json.dumps({
                        "index": "logs-*",
                        "query": {
                            "match_all": {}
                        }
                    })
                }
            }
        }
        
        viz_id = self.create_visualization(timeline_chart)
        if viz_id:
            visualizations.append(viz_id)
        
        # 3. 错误日志表格
        error_table = {
            "attributes": {
                "title": "错误日志详情",
                "type": "table",
                "visState": json.dumps({
                    "title": "错误日志详情",
                    "type": "table",
                    "aggs": [
                        {
                            "id": "1",
                            "type": "count",
                            "schema": "metric",
                            "params": {}
                        },
                        {
                            "id": "2",
                            "type": "terms",
                            "schema": "bucket",
                            "params": {
                                "field": "service.keyword",
                                "size": 10,
                                "order": "desc",
                                "orderBy": "1"
                            }
                        }
                    ]
                }),
                "kibanaSavedObjectMeta": {
                    "searchSourceJSON": json.dumps({
                        "index": "logs-*",
                        "query": {
                            "term": {
                                "level.keyword": "ERROR"
                            }
                        }
                    })
                }
            }
        }
        
        viz_id = self.create_visualization(error_table)
        if viz_id:
            visualizations.append(viz_id)
        
        # 创建仪表板
        dashboard_config = {
            "attributes": {
                "title": "日志监控仪表板",
                "type": "dashboard",
                "panelsJSON": json.dumps([
                    {
                        "id": visualizations[0],
                        "type": "visualization",
                        "gridData": {
                            "x": 0,
                            "y": 0,
                            "w": 24,
                            "h": 15
                        }
                    },
                    {
                        "id": visualizations[1],
                        "type": "visualization",
                        "gridData": {
                            "x": 24,
                            "y": 0,
                            "w": 24,
                            "h": 15
                        }
                    },
                    {
                        "id": visualizations[2],
                        "type": "visualization",
                        "gridData": {
                            "x": 0,
                            "y": 15,
                            "w": 48,
                            "h": 15
                        }
                    }
                ] if len(visualizations) >= 3 else [])
            }
        }
        
        return self.create_dashboard(dashboard_config) is not None

# 使用示例
if __name__ == "__main__":
    kibana_manager = KibanaManager(
        kibana_url="http://localhost:5601",
        username="elastic",
        password="your_password"
    )
    
    # 设置日志监控仪表板
    success = kibana_manager.setup_log_monitoring_dashboard()
    if success:
        print("日志监控仪表板设置成功")
    else:
        print("日志监控仪表板设置失败")

1.3 Beats集成

Filebeat配置

# filebeat.yml
filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /var/log/application/*.log
  fields:
    service: application
    environment: production
  fields_under_root: true
  multiline.pattern: '^\d{4}-\d{2}-\d{2}'
  multiline.negate: true
  multiline.match: after
  
- type: log
  enabled: true
  paths:
    - /var/log/nginx/access.log
  fields:
    service: nginx
    log_type: access
  fields_under_root: true
  
- type: log
  enabled: true
  paths:
    - /var/log/nginx/error.log
  fields:
    service: nginx
    log_type: error
  fields_under_root: true
  multiline.pattern: '^\d{4}/\d{2}/\d{2}'
  multiline.negate: true
  multiline.match: after

# 处理器配置
processors:
- add_host_metadata:
    when.not.contains.tags: forwarded
- add_docker_metadata: ~
- add_kubernetes_metadata: ~
- drop_fields:
    fields: ["agent", "ecs", "host.architecture"]

# 输出配置
output.elasticsearch:
  hosts: ["localhost:9200"]
  username: "filebeat_writer"
  password: "${FILEBEAT_PASSWORD}"
  index: "filebeat-%{+yyyy.MM.dd}"
  template.name: "filebeat"
  template.pattern: "filebeat-*"
  template.settings:
    index.number_of_shards: 1
    index.number_of_replicas: 1
    index.refresh_interval: 5s

# 日志配置
logging.level: info
logging.to_files: true
logging.files:
  path: /var/log/filebeat
  name: filebeat
  keepfiles: 7
  permissions: 0644

# 监控配置
monitoring.enabled: true
monitoring.elasticsearch:
  hosts: ["localhost:9200"]
  username: "beats_system"
  password: "${BEATS_PASSWORD}"

Metricbeat配置

# metricbeat.yml
metricbeat.config.modules:
  path: ${path.config}/modules.d/*.yml
  reload.enabled: true
  reload.period: 10s

# 系统模块
metricbeat.modules:
- module: system
  metricsets:
    - cpu
    - load
    - memory
    - network
    - process
    - process_summary
    - socket_summary
    - filesystem
    - fsstat
  enabled: true
  period: 10s
  processes: ['.*']
  
- module: elasticsearch
  metricsets:
    - node
    - node_stats
    - cluster_stats
    - index
    - index_recovery
    - index_summary
    - shard
  enabled: true
  period: 10s
  hosts: ["localhost:9200"]
  username: "beats_system"
  password: "${BEATS_PASSWORD}"
  
- module: nginx
  metricsets: ["stubstatus"]
  enabled: true
  period: 10s
  hosts: ["http://localhost/nginx_status"]
  
- module: docker
  metricsets:
    - container
    - cpu
    - diskio
    - healthcheck
    - info
    - memory
    - network
  enabled: true
  period: 10s
  hosts: ["unix:///var/run/docker.sock"]

# 输出配置
output.elasticsearch:
  hosts: ["localhost:9200"]
  username: "metricbeat_writer"
  password: "${METRICBEAT_PASSWORD}"
  index: "metricbeat-%{+yyyy.MM.dd}"

# 处理器
processors:
- add_host_metadata: ~
- add_docker_metadata: ~

# 监控
monitoring.enabled: true
monitoring.elasticsearch:
  hosts: ["localhost:9200"]
  username: "beats_system"
  password: "${BEATS_PASSWORD}"

2. 应用程序集成

2.1 Java应用集成

Spring Boot集成

// pom.xml依赖
/*
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
<dependency>
    <groupId>co.elastic.clients</groupId>
    <artifactId>elasticsearch-java</artifactId>
    <version>8.11.0</version>
</dependency>
*/

// ElasticsearchConfig.java
@Configuration
@EnableElasticsearchRepositories
public class ElasticsearchConfig {
    
    @Value("${elasticsearch.host:localhost}")
    private String host;
    
    @Value("${elasticsearch.port:9200}")
    private int port;
    
    @Value("${elasticsearch.username:}")
    private String username;
    
    @Value("${elasticsearch.password:}")
    private String password;
    
    @Bean
    public ElasticsearchClient elasticsearchClient() {
        // 创建低级客户端
        RestClientBuilder builder = RestClient.builder(
            new HttpHost(host, port, "https")
        );
        
        // 配置认证
        if (!username.isEmpty() && !password.isEmpty()) {
            CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
            credentialsProvider.setCredentials(
                AuthScope.ANY,
                new UsernamePasswordCredentials(username, password)
            );
            
            builder.setHttpClientConfigCallback(httpClientBuilder ->
                httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)
            );
        }
        
        // 配置SSL(生产环境应该验证证书)
        builder.setHttpClientConfigCallback(httpClientBuilder -> {
            try {
                SSLContext sslContext = SSLContext.getInstance("TLS");
                sslContext.init(null, new TrustManager[] {
                    new X509TrustManager() {
                        public X509Certificate[] getAcceptedIssuers() { return null; }
                        public void checkClientTrusted(X509Certificate[] certs, String authType) {}
                        public void checkServerTrusted(X509Certificate[] certs, String authType) {}
                    }
                }, new SecureRandom());
                
                return httpClientBuilder.setSSLContext(sslContext)
                    .setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
        
        RestClient restClient = builder.build();
        
        // 创建传输层
        ElasticsearchTransport transport = new RestClientTransport(
            restClient, new JacksonJsonpMapper()
        );
        
        return new ElasticsearchClient(transport);
    }
    
    @Bean
    public ElasticsearchTemplate elasticsearchTemplate() {
        return new ElasticsearchTemplate(elasticsearchClient());
    }
}

// 实体类
@Document(indexName = "products")
public class Product {
    @Id
    private String id;
    
    @Field(type = FieldType.Text, analyzer = "standard")
    private String name;
    
    @Field(type = FieldType.Text, analyzer = "standard")
    private String description;
    
    @Field(type = FieldType.Double)
    private Double price;
    
    @Field(type = FieldType.Keyword)
    private String category;
    
    @Field(type = FieldType.Integer)
    private Integer stock;
    
    @Field(type = FieldType.Date, format = DateFormat.date_time)
    private LocalDateTime createdAt;
    
    @Field(type = FieldType.Boolean)
    private Boolean active;
    
    @Field(type = FieldType.Nested)
    private List<Tag> tags;
    
    // 构造函数、getter和setter
    public Product() {}
    
    public Product(String name, String description, Double price, String category) {
        this.name = name;
        this.description = description;
        this.price = price;
        this.category = category;
        this.createdAt = LocalDateTime.now();
        this.active = true;
        this.tags = new ArrayList<>();
    }
    
    // getter和setter方法...
}

@Document
public class Tag {
    @Field(type = FieldType.Keyword)
    private String name;
    
    @Field(type = FieldType.Double)
    private Double weight;
    
    // 构造函数、getter和setter
}

// Repository接口
@Repository
public interface ProductRepository extends ElasticsearchRepository<Product, String> {
    
    // 基于方法名的查询
    List<Product> findByName(String name);
    List<Product> findByCategory(String category);
    List<Product> findByPriceBetween(Double minPrice, Double maxPrice);
    List<Product> findByActiveTrue();
    
    // 自定义查询
    @Query("{\"bool\": {\"must\": [{\"match\": {\"name\": \"?0\"}}, {\"range\": {\"price\": {\"gte\": ?1, \"lte\": ?2}}}]}}")
    List<Product> findByNameAndPriceRange(String name, Double minPrice, Double maxPrice);
    
    // 聚合查询
    @Query("{\"aggs\": {\"categories\": {\"terms\": {\"field\": \"category.keyword\"}}}}")
    SearchHits<Product> findCategoryAggregation();
}

// 服务类
@Service
public class ProductService {
    
    @Autowired
    private ProductRepository productRepository;
    
    @Autowired
    private ElasticsearchClient elasticsearchClient;
    
    public Product saveProduct(Product product) {
        return productRepository.save(product);
    }
    
    public List<Product> searchProducts(String query, String category, Double minPrice, Double maxPrice) {
        try {
            BoolQuery.Builder boolQuery = new BoolQuery.Builder();
            
            // 文本搜索
            if (query != null && !query.isEmpty()) {
                boolQuery.must(m -> m
                    .multiMatch(mm -> mm
                        .fields("name", "description")
                        .query(query)
                        .fuzziness("AUTO")
                    )
                );
            }
            
            // 分类过滤
            if (category != null && !category.isEmpty()) {
                boolQuery.filter(f -> f
                    .term(t -> t
                        .field("category.keyword")
                        .value(category)
                    )
                );
            }
            
            // 价格范围过滤
            if (minPrice != null || maxPrice != null) {
                RangeQuery.Builder rangeQuery = new RangeQuery.Builder()
                    .field("price");
                
                if (minPrice != null) {
                    rangeQuery.gte(JsonData.of(minPrice));
                }
                if (maxPrice != null) {
                    rangeQuery.lte(JsonData.of(maxPrice));
                }
                
                boolQuery.filter(f -> f.range(rangeQuery.build()));
            }
            
            // 只返回活跃产品
            boolQuery.filter(f -> f
                .term(t -> t
                    .field("active")
                    .value(true)
                )
            );
            
            SearchRequest searchRequest = SearchRequest.of(s -> s
                .index("products")
                .query(q -> q.bool(boolQuery.build()))
                .sort(sort -> sort
                    .field(f -> f
                        .field("_score")
                        .order(SortOrder.Desc)
                    )
                )
                .size(100)
            );
            
            SearchResponse<Product> response = elasticsearchClient.search(
                searchRequest, Product.class
            );
            
            return response.hits().hits().stream()
                .map(hit -> hit.source())
                .collect(Collectors.toList());
            
        } catch (Exception e) {
            throw new RuntimeException("搜索产品失败", e);
        }
    }
    
    public Map<String, Long> getCategoryStatistics() {
        try {
            SearchRequest searchRequest = SearchRequest.of(s -> s
                .index("products")
                .size(0)
                .aggregations("categories", a -> a
                    .terms(t -> t
                        .field("category.keyword")
                        .size(50)
                    )
                )
            );
            
            SearchResponse<Product> response = elasticsearchClient.search(
                searchRequest, Product.class
            );
            
            Map<String, Long> categoryStats = new HashMap<>();
            
            if (response.aggregations() != null) {
                StringTermsAggregate categories = response.aggregations()
                    .get("categories")
                    .sterms();
                
                for (StringTermsBucket bucket : categories.buckets().array()) {
                    categoryStats.put(bucket.key().stringValue(), bucket.docCount());
                }
            }
            
            return categoryStats;
            
        } catch (Exception e) {
            throw new RuntimeException("获取分类统计失败", e);
        }
    }
    
    public List<Product> getRecommendations(String productId, int size) {
        try {
            // 首先获取当前产品
            GetResponse<Product> getResponse = elasticsearchClient.get(
                g -> g.index("products").id(productId),
                Product.class
            );
            
            if (!getResponse.found()) {
                return Collections.emptyList();
            }
            
            Product currentProduct = getResponse.source();
            
            // 基于相似性推荐
            SearchRequest searchRequest = SearchRequest.of(s -> s
                .index("products")
                .query(q -> q
                    .bool(b -> b
                        .should(sh -> sh
                            .match(m -> m
                                .field("category.keyword")
                                .query(currentProduct.getCategory())
                                .boost(2.0f)
                            )
                        )
                        .should(sh -> sh
                            .morelikeThis(mlt -> mlt
                                .fields("name", "description")
                                .like(l -> l
                                    .document(d -> d
                                        .index("products")
                                        .id(productId)
                                    )
                                )
                                .minTermFreq(1)
                                .maxQueryTerms(12)
                            )
                        )
                        .mustNot(mn -> mn
                            .term(t -> t
                                .field("_id")
                                .value(productId)
                            )
                        )
                        .filter(f -> f
                            .term(t -> t
                                .field("active")
                                .value(true)
                            )
                        )
                    )
                )
                .size(size)
            );
            
            SearchResponse<Product> response = elasticsearchClient.search(
                searchRequest, Product.class
            );
            
            return response.hits().hits().stream()
                .map(hit -> hit.source())
                .collect(Collectors.toList());
            
        } catch (Exception e) {
            throw new RuntimeException("获取推荐产品失败", e);
        }
    }
}

// 控制器
@RestController
@RequestMapping("/api/products")
public class ProductController {
    
    @Autowired
    private ProductService productService;
    
    @PostMapping
    public ResponseEntity<Product> createProduct(@RequestBody Product product) {
        Product savedProduct = productService.saveProduct(product);
        return ResponseEntity.ok(savedProduct);
    }
    
    @GetMapping("/search")
    public ResponseEntity<List<Product>> searchProducts(
            @RequestParam(required = false) String q,
            @RequestParam(required = false) String category,
            @RequestParam(required = false) Double minPrice,
            @RequestParam(required = false) Double maxPrice) {
        
        List<Product> products = productService.searchProducts(q, category, minPrice, maxPrice);
        return ResponseEntity.ok(products);
    }
    
    @GetMapping("/categories/stats")
    public ResponseEntity<Map<String, Long>> getCategoryStatistics() {
        Map<String, Long> stats = productService.getCategoryStatistics();
        return ResponseEntity.ok(stats);
    }
    
    @GetMapping("/{id}/recommendations")
    public ResponseEntity<List<Product>> getRecommendations(
            @PathVariable String id,
            @RequestParam(defaultValue = "5") int size) {
        
        List<Product> recommendations = productService.getRecommendations(id, size);
        return ResponseEntity.ok(recommendations);
    }
}

2.2 Python应用集成

Django集成

# settings.py
ELASTICSEARCH_DSL = {
    'default': {
        'hosts': 'localhost:9200',
        'http_auth': ('username', 'password'),
        'use_ssl': True,
        'verify_certs': False,
        'ssl_show_warn': False,
        'timeout': 30,
        'max_retries': 3,
        'retry_on_timeout': True
    },
}

INSTALLED_APPS = [
    # ... 其他应用
    'django_elasticsearch_dsl',
    'myapp',
]

# models.py
from django.db import models
from django.contrib.auth.models import User

class Article(models.Model):
    title = models.CharField(max_length=200)
    content = models.TextField()
    author = models.ForeignKey(User, on_delete=models.CASCADE)
    category = models.CharField(max_length=50)
    tags = models.ManyToManyField('Tag', blank=True)
    published = models.BooleanField(default=False)
    created_at = models.DateTimeField(auto_now_add=True)
    updated_at = models.DateTimeField(auto_now=True)
    view_count = models.IntegerField(default=0)
    
    def __str__(self):
        return self.title

class Tag(models.Model):
    name = models.CharField(max_length=50, unique=True)
    
    def __str__(self):
        return self.name

class Comment(models.Model):
    article = models.ForeignKey(Article, on_delete=models.CASCADE, related_name='comments')
    author = models.ForeignKey(User, on_delete=models.CASCADE)
    content = models.TextField()
    created_at = models.DateTimeField(auto_now_add=True)
    
    def __str__(self):
        return f'Comment by {self.author.username} on {self.article.title}'

# documents.py
from django_elasticsearch_dsl import Document, Index, fields
from django_elasticsearch_dsl.registries import registry
from .models import Article, Tag, Comment

# 创建索引
articles_index = Index('articles')
articles_index.settings(
    number_of_shards=1,
    number_of_replicas=1,
    refresh_interval='5s'
)

@registry.register_document
@articles_index.doc_type
class ArticleDocument(Document):
    # 基本字段
    title = fields.TextField(
        analyzer='standard',
        fields={
            'keyword': fields.KeywordField(),
            'suggest': fields.CompletionField()
        }
    )
    
    content = fields.TextField(
        analyzer='standard'
    )
    
    category = fields.KeywordField()
    
    # 嵌套字段
    author = fields.ObjectField(
        properties={
            'id': fields.IntegerField(),
            'username': fields.KeywordField(),
            'email': fields.KeywordField(),
            'first_name': fields.TextField(),
            'last_name': fields.TextField(),
        }
    )
    
    # 多值字段
    tags = fields.NestedField(
        properties={
            'id': fields.IntegerField(),
            'name': fields.KeywordField(),
        }
    )
    
    # 关联字段
    comments = fields.NestedField(
        properties={
            'id': fields.IntegerField(),
            'content': fields.TextField(),
            'author_username': fields.KeywordField(),
            'created_at': fields.DateField(),
        }
    )
    
    # 时间字段
    created_at = fields.DateField()
    updated_at = fields.DateField()
    
    # 布尔字段
    published = fields.BooleanField()
    
    # 数值字段
    view_count = fields.IntegerField()
    
    class Django:
        model = Article
        fields = []
        related_models = [Tag, Comment]
    
    def get_queryset(self):
        """返回应该被索引的查询集"""
        return super().get_queryset().select_related('author').prefetch_related('tags', 'comments')
    
    def get_instances_from_related(self, related_instance):
        """当相关模型更新时,返回需要更新的文档实例"""
        if isinstance(related_instance, Tag):
            return related_instance.article_set.all()
        elif isinstance(related_instance, Comment):
            return [related_instance.article]
        return []
    
    def prepare_author(self, instance):
        """准备作者字段数据"""
        return {
            'id': instance.author.id,
            'username': instance.author.username,
            'email': instance.author.email,
            'first_name': instance.author.first_name,
            'last_name': instance.author.last_name,
        }
    
    def prepare_tags(self, instance):
        """准备标签字段数据"""
        return [
            {
                'id': tag.id,
                'name': tag.name,
            }
            for tag in instance.tags.all()
        ]
    
    def prepare_comments(self, instance):
        """准备评论字段数据"""
        return [
            {
                'id': comment.id,
                'content': comment.content,
                'author_username': comment.author.username,
                'created_at': comment.created_at,
            }
            for comment in instance.comments.all()
        ]

# search.py
from elasticsearch_dsl import Q, A
from django_elasticsearch_dsl_drf.constants import LOOKUP_FILTER_RANGE, LOOKUP_QUERY_IN
from django_elasticsearch_dsl_drf.filter_backends import (
    FilteringFilterBackend,
    SearchFilterBackend,
    SuggesterFilterBackend,
    FunctionalSuggesterFilterBackend,
    OrderingFilterBackend,
    HighlightBackend,
    FacetedSearchFilterBackend,
)
from django_elasticsearch_dsl_drf.viewsets import DocumentViewSet
from django_elasticsearch_dsl_drf.pagination import PageNumberPagination
from rest_framework import serializers
from .documents import ArticleDocument

class ArticleDocumentSerializer(serializers.Serializer):
    """文章文档序列化器"""
    id = serializers.IntegerField(read_only=True)
    title = serializers.CharField(read_only=True)
    content = serializers.CharField(read_only=True)
    category = serializers.CharField(read_only=True)
    author = serializers.DictField(read_only=True)
    tags = serializers.ListField(read_only=True)
    comments = serializers.ListField(read_only=True)
    published = serializers.BooleanField(read_only=True)
    created_at = serializers.DateTimeField(read_only=True)
    updated_at = serializers.DateTimeField(read_only=True)
    view_count = serializers.IntegerField(read_only=True)
    
    # 搜索相关字段
    score = serializers.SerializerMethodField()
    highlight = serializers.SerializerMethodField()
    
    def get_score(self, obj):
        """获取搜索评分"""
        return getattr(obj.meta, 'score', None)
    
    def get_highlight(self, obj):
        """获取高亮信息"""
        return getattr(obj.meta, 'highlight', {})

class ArticleDocumentViewSet(DocumentViewSet):
    """文章搜索视图集"""
    document = ArticleDocument
    serializer_class = ArticleDocumentSerializer
    pagination_class = PageNumberPagination
    
    filter_backends = [
        FilteringFilterBackend,
        SearchFilterBackend,
        SuggesterFilterBackend,
        FunctionalSuggesterFilterBackend,
        OrderingFilterBackend,
        HighlightBackend,
        FacetedSearchFilterBackend,
    ]
    
    # 搜索字段
    search_fields = {
        'title': {'boost': 2.0},
        'content': {'boost': 1.0},
        'author.username': {'boost': 1.5},
        'tags.name': {'boost': 1.2},
    }
    
    # 过滤字段
    filter_fields = {
        'category': 'category.keyword',
        'published': 'published',
        'author_id': 'author.id',
        'created_at': {
            'field': 'created_at',
            'lookups': [LOOKUP_FILTER_RANGE],
        },
        'view_count': {
            'field': 'view_count',
            'lookups': [LOOKUP_FILTER_RANGE],
        },
        'tags': {
            'field': 'tags.name.keyword',
            'lookups': [LOOKUP_QUERY_IN],
        },
    }
    
    # 排序字段
    ordering_fields = {
        'created_at': 'created_at',
        'updated_at': 'updated_at',
        'view_count': 'view_count',
        'score': '_score',
    }
    
    ordering = ('-created_at',)
    
    # 高亮字段
    highlight_fields = {
        'title': {
            'enabled': True,
            'options': {
                'pre_tags': ['<mark>'],
                'post_tags': ['</mark>'],
            }
        },
        'content': {
            'enabled': True,
            'options': {
                'pre_tags': ['<mark>'],
                'post_tags': ['</mark>'],
                'fragment_size': 200,
                'number_of_fragments': 3,
            }
        },
    }
    
    # 建议字段
    suggester_fields = {
        'title_suggest': {
            'field': 'title.suggest',
            'suggesters': [
                'completion',
            ],
        },
    }
    
    # 分面搜索字段
    faceted_search_fields = {
        'category': {
            'field': 'category.keyword',
            'enabled': True,
        },
        'tags': {
            'field': 'tags.name.keyword',
            'enabled': True,
        },
        'author': {
            'field': 'author.username.keyword',
            'enabled': True,
        },
        'published': {
            'field': 'published',
            'enabled': True,
        },
    }
    
    def get_queryset(self):
        """自定义查询集"""
        queryset = super().get_queryset()
        
        # 只返回已发布的文章(除非是管理员)
        if not self.request.user.is_staff:
            queryset = queryset.filter('term', published=True)
        
        return queryset

# 高级搜索服务
class AdvancedSearchService:
    """高级搜索服务"""
    
    @staticmethod
    def semantic_search(query: str, size: int = 10):
        """语义搜索"""
        search = ArticleDocument.search()
        
        # 多字段搜索
        multi_match = Q(
            'multi_match',
            query=query,
            fields=[
                'title^3',
                'content^1',
                'tags.name^2',
                'author.username^1.5'
            ],
            type='best_fields',
            fuzziness='AUTO'
        )
        
        # 短语匹配
        phrase_match = Q(
            'multi_match',
            query=query,
            fields=['title', 'content'],
            type='phrase',
            boost=2.0
        )
        
        # 组合查询
        combined_query = Q(
            'bool',
            should=[multi_match, phrase_match],
            minimum_should_match=1
        )
        
        search = search.query(combined_query)
        search = search.filter('term', published=True)
        search = search[:size]
        
        # 添加高亮
        search = search.highlight(
            'title',
            'content',
            pre_tags=['<mark>'],
            post_tags=['</mark>'],
            fragment_size=200,
            number_of_fragments=3
        )
        
        return search.execute()
    
    @staticmethod
    def trending_articles(days: int = 7, size: int = 10):
        """热门文章"""
        from datetime import datetime, timedelta
        
        search = ArticleDocument.search()
        
        # 时间范围过滤
        date_range = datetime.now() - timedelta(days=days)
        search = search.filter(
            'range',
            created_at={'gte': date_range}
        )
        
        # 按浏览量排序
        search = search.sort('-view_count', '-created_at')
        search = search.filter('term', published=True)
        search = search[:size]
        
        return search.execute()
    
    @staticmethod
    def related_articles(article_id: int, size: int = 5):
        """相关文章推荐"""
        # 获取原文章
        try:
            article_doc = ArticleDocument.get(id=article_id)
        except:
            return []
        
        search = ArticleDocument.search()
        
        # More Like This查询
        mlt_query = Q(
            'more_like_this',
            fields=['title', 'content', 'tags.name'],
            like=[
                {
                    '_index': 'articles',
                    '_id': article_id
                }
            ],
            min_term_freq=1,
            max_query_terms=12,
            min_doc_freq=1
        )
        
        # 排除原文章
        search = search.query(mlt_query)
        search = search.exclude('term', _id=article_id)
        search = search.filter('term', published=True)
        search = search[:size]
        
        return search.execute()
    
    @staticmethod
    def category_statistics():
        """分类统计"""
        search = ArticleDocument.search()
        search = search.filter('term', published=True)
        
        # 分类聚合
        search.aggs.bucket(
            'categories',
            'terms',
            field='category.keyword',
            size=50
        )
        
        # 标签聚合
        search.aggs.bucket(
            'tags',
            'terms',
            field='tags.name.keyword',
            size=20
        )
        
        # 作者聚合
        search.aggs.bucket(
            'authors',
            'terms',
            field='author.username.keyword',
            size=10
        )
        
        # 时间聚合
        search.aggs.bucket(
            'monthly_posts',
            'date_histogram',
            field='created_at',
            calendar_interval='month'
        )
        
        search = search[:0]  # 不返回文档,只返回聚合结果
        
        response = search.execute()
        
        return {
            'categories': [
                {'name': bucket.key, 'count': bucket.doc_count}
                for bucket in response.aggregations.categories.buckets
            ],
            'tags': [
                {'name': bucket.key, 'count': bucket.doc_count}
                for bucket in response.aggregations.tags.buckets
            ],
            'authors': [
                {'username': bucket.key, 'count': bucket.doc_count}
                for bucket in response.aggregations.authors.buckets
            ],
            'monthly_posts': [
                {
                    'date': bucket.key_as_string,
                    'count': bucket.doc_count
                }
                for bucket in response.aggregations.monthly_posts.buckets
            ]
        }

# views.py
from django.http import JsonResponse
from django.views.decorators.http import require_http_methods
from django.views.decorators.csrf import csrf_exempt
from .search import AdvancedSearchService
import json

@require_http_methods(["GET"])
def semantic_search(request):
    """语义搜索API"""
    query = request.GET.get('q', '')
    size = int(request.GET.get('size', 10))
    
    if not query:
        return JsonResponse({'error': '查询参数不能为空'}, status=400)
    
    try:
        results = AdvancedSearchService.semantic_search(query, size)
        
        articles = []
        for hit in results:
            article = {
                'id': hit.meta.id,
                'title': hit.title,
                'content': hit.content[:200] + '...' if len(hit.content) > 200 else hit.content,
                'category': hit.category,
                'author': hit.author,
                'created_at': hit.created_at,
                'view_count': hit.view_count,
                'score': hit.meta.score,
            }
            
            # 添加高亮信息
            if hasattr(hit.meta, 'highlight'):
                article['highlight'] = hit.meta.highlight
            
            articles.append(article)
        
        return JsonResponse({
            'total': results.hits.total.value,
            'articles': articles
        })
    
    except Exception as e:
        return JsonResponse({'error': str(e)}, status=500)

@require_http_methods(["GET"])
def trending_articles(request):
    """热门文章API"""
    days = int(request.GET.get('days', 7))
    size = int(request.GET.get('size', 10))
    
    try:
        results = AdvancedSearchService.trending_articles(days, size)
        
        articles = [
            {
                'id': hit.meta.id,
                'title': hit.title,
                'category': hit.category,
                'author': hit.author,
                'created_at': hit.created_at,
                'view_count': hit.view_count,
            }
            for hit in results
        ]
        
        return JsonResponse({'articles': articles})
    
    except Exception as e:
        return JsonResponse({'error': str(e)}, status=500)

@require_http_methods(["GET"])
def related_articles(request, article_id):
    """相关文章API"""
    size = int(request.GET.get('size', 5))
    
    try:
        results = AdvancedSearchService.related_articles(article_id, size)
        
        articles = [
            {
                'id': hit.meta.id,
                'title': hit.title,
                'category': hit.category,
                'author': hit.author,
                'created_at': hit.created_at,
            }
            for hit in results
        ]
        
        return JsonResponse({'articles': articles})
    
    except Exception as e:
        return JsonResponse({'error': str(e)}, status=500)

@require_http_methods(["GET"])
def category_statistics(request):
    """分类统计API"""
    try:
        stats = AdvancedSearchService.category_statistics()
        return JsonResponse(stats)
    
    except Exception as e:
        return JsonResponse({'error': str(e)}, status=500)

# urls.py
from django.urls import path, include
from rest_framework.routers import DefaultRouter
from . import views
from .search import ArticleDocumentViewSet

router = DefaultRouter()
router.register(r'search/articles', ArticleDocumentViewSet, basename='article-search')

urlpatterns = [
    path('api/', include(router.urls)),
    path('api/semantic-search/', views.semantic_search, name='semantic-search'),
    path('api/trending/', views.trending_articles, name='trending-articles'),
    path('api/articles/<int:article_id>/related/', views.related_articles, name='related-articles'),
    path('api/statistics/', views.category_statistics, name='category-statistics'),
]

2.3 Node.js集成

Express.js集成

// package.json
/*
{
  "dependencies": {
    "@elastic/elasticsearch": "^8.11.0",
    "express": "^4.18.0",
    "body-parser": "^1.20.0",
    "cors": "^2.8.5",
    "helmet": "^7.0.0",
    "morgan": "^1.10.0",
    "dotenv": "^16.0.0"
  }
}
*/

// config/elasticsearch.js
const { Client } = require('@elastic/elasticsearch');
require('dotenv').config();

class ElasticsearchClient {
    constructor() {
        this.client = new Client({
            node: process.env.ELASTICSEARCH_URL || 'https://localhost:9200',
            auth: {
                username: process.env.ELASTICSEARCH_USERNAME || 'elastic',
                password: process.env.ELASTICSEARCH_PASSWORD || 'password'
            },
            tls: {
                rejectUnauthorized: false // 生产环境应该设置为true
            },
            requestTimeout: 30000,
            maxRetries: 3,
            resurrectStrategy: 'ping'
        });
    }
    
    async testConnection() {
        try {
            const response = await this.client.ping();
            console.log('Elasticsearch连接成功');
            return true;
        } catch (error) {
            console.error('Elasticsearch连接失败:', error);
            return false;
        }
    }
    
    getClient() {
        return this.client;
    }
}

module.exports = new ElasticsearchClient();

// models/Product.js
class Product {
    constructor(data) {
        this.id = data.id;
        this.name = data.name;
        this.description = data.description;
        this.price = data.price;
        this.category = data.category;
        this.brand = data.brand;
        this.tags = data.tags || [];
        this.stock = data.stock || 0;
        this.rating = data.rating || 0;
        this.reviews_count = data.reviews_count || 0;
        this.created_at = data.created_at || new Date();
        this.updated_at = data.updated_at || new Date();
        this.active = data.active !== undefined ? data.active : true;
    }
    
    toElasticsearchDoc() {
        return {
            id: this.id,
            name: this.name,
            description: this.description,
            price: this.price,
            category: this.category,
            brand: this.brand,
            tags: this.tags,
            stock: this.stock,
            rating: this.rating,
            reviews_count: this.reviews_count,
            created_at: this.created_at,
            updated_at: this.updated_at,
            active: this.active,
            suggest: {
                input: [this.name, this.brand, ...this.tags],
                weight: this.rating * 10 + this.reviews_count
            }
        };
    }
    
    static fromElasticsearchDoc(doc) {
        return new Product(doc._source || doc);
    }
}

module.exports = Product;

// services/ProductService.js
const elasticsearchClient = require('../config/elasticsearch');
const Product = require('../models/Product');

class ProductService {
    constructor() {
        this.client = elasticsearchClient.getClient();
        this.index = 'products';
    }
    
    async createIndex() {
        try {
            const exists = await this.client.indices.exists({ index: this.index });
            
            if (!exists) {
                await this.client.indices.create({
                    index: this.index,
                    body: {
                        settings: {
                            number_of_shards: 1,
                            number_of_replicas: 1,
                            analysis: {
                                analyzer: {
                                    product_analyzer: {
                                        type: 'custom',
                                        tokenizer: 'standard',
                                        filter: ['lowercase', 'stop', 'snowball']
                                    }
                                }
                            }
                        },
                        mappings: {
                            properties: {
                                name: {
                                    type: 'text',
                                    analyzer: 'product_analyzer',
                                    fields: {
                                        keyword: { type: 'keyword' },
                                        suggest: { type: 'completion' }
                                    }
                                },
                                description: {
                                    type: 'text',
                                    analyzer: 'product_analyzer'
                                },
                                price: { type: 'double' },
                                category: { type: 'keyword' },
                                brand: { type: 'keyword' },
                                tags: { type: 'keyword' },
                                stock: { type: 'integer' },
                                rating: { type: 'float' },
                                reviews_count: { type: 'integer' },
                                created_at: { type: 'date' },
                                updated_at: { type: 'date' },
                                active: { type: 'boolean' },
                                suggest: {
                                    type: 'completion',
                                    analyzer: 'simple',
                                    preserve_separators: true,
                                    preserve_position_increments: true,
                                    max_input_length: 50
                                }
                            }
                        }
                    }
                });
                
                console.log(`索引 ${this.index} 创建成功`);
            }
        } catch (error) {
            console.error('创建索引失败:', error);
            throw error;
        }
    }
    
    async indexProduct(product) {
        try {
            const response = await this.client.index({
                index: this.index,
                id: product.id,
                body: product.toElasticsearchDoc()
            });
            
            return response;
        } catch (error) {
            console.error('索引产品失败:', error);
            throw error;
        }
    }
    
    async bulkIndexProducts(products) {
        try {
            const body = products.flatMap(product => [
                { index: { _index: this.index, _id: product.id } },
                product.toElasticsearchDoc()
            ]);
            
            const response = await this.client.bulk({ body });
            
            if (response.errors) {
                const erroredDocuments = [];
                response.items.forEach((action, i) => {
                    const operation = Object.keys(action)[0];
                    if (action[operation].error) {
                        erroredDocuments.push({
                            status: action[operation].status,
                            error: action[operation].error,
                            operation: body[i * 2],
                            document: body[i * 2 + 1]
                        });
                    }
                });
                
                console.error('批量索引部分失败:', erroredDocuments);
            }
            
            return response;
        } catch (error) {
            console.error('批量索引产品失败:', error);
            throw error;
        }
    }
    
    async searchProducts(query, filters = {}, options = {}) {
        try {
            const {
                category,
                brand,
                minPrice,
                maxPrice,
                minRating,
                tags,
                inStock
            } = filters;
            
            const {
                from = 0,
                size = 20,
                sortBy = 'relevance',
                sortOrder = 'desc'
            } = options;
            
            // 构建查询
            const must = [];
            const filter = [];
            
            // 文本搜索
            if (query) {
                must.push({
                    multi_match: {
                        query: query,
                        fields: [
                            'name^3',
                            'description^1',
                            'brand^2',
                            'tags^1.5'
                        ],
                        type: 'best_fields',
                        fuzziness: 'AUTO'
                    }
                });
            } else {
                must.push({ match_all: {} });
            }
            
            // 过滤条件
            filter.push({ term: { active: true } });
            
            if (category) {
                filter.push({ term: { category } });
            }
            
            if (brand) {
                filter.push({ term: { brand } });
            }
            
            if (minPrice !== undefined || maxPrice !== undefined) {
                const priceRange = {};
                if (minPrice !== undefined) priceRange.gte = minPrice;
                if (maxPrice !== undefined) priceRange.lte = maxPrice;
                filter.push({ range: { price: priceRange } });
            }
            
            if (minRating !== undefined) {
                filter.push({ range: { rating: { gte: minRating } } });
            }
            
            if (tags && tags.length > 0) {
                filter.push({ terms: { tags } });
            }
            
            if (inStock) {
                filter.push({ range: { stock: { gt: 0 } } });
            }
            
            // 排序
            let sort = [];
            switch (sortBy) {
                case 'price_asc':
                    sort = [{ price: { order: 'asc' } }];
                    break;
                case 'price_desc':
                    sort = [{ price: { order: 'desc' } }];
                    break;
                case 'rating':
                    sort = [{ rating: { order: 'desc' } }];
                    break;
                case 'newest':
                    sort = [{ created_at: { order: 'desc' } }];
                    break;
                case 'popularity':
                    sort = [{ reviews_count: { order: 'desc' } }];
                    break;
                default:
                    sort = ['_score'];
            }
            
            const searchBody = {
                query: {
                    bool: {
                        must,
                        filter
                    }
                },
                sort,
                from,
                size,
                highlight: {
                    fields: {
                        name: {},
                        description: {
                            fragment_size: 150,
                            number_of_fragments: 3
                        }
                    },
                    pre_tags: ['<mark>'],
                    post_tags: ['</mark>']
                },
                aggs: {
                    categories: {
                        terms: {
                            field: 'category',
                            size: 20
                        }
                    },
                    brands: {
                        terms: {
                            field: 'brand',
                            size: 20
                        }
                    },
                    price_ranges: {
                        range: {
                            field: 'price',
                            ranges: [
                                { to: 50 },
                                { from: 50, to: 100 },
                                { from: 100, to: 200 },
                                { from: 200, to: 500 },
                                { from: 500 }
                            ]
                        }
                    },
                    avg_price: {
                        avg: { field: 'price' }
                    }
                }
            };
            
            const response = await this.client.search({
                index: this.index,
                body: searchBody
            });
            
            return {
                total: response.hits.total.value,
                products: response.hits.hits.map(hit => ({
                    ...Product.fromElasticsearchDoc(hit),
                    score: hit._score,
                    highlight: hit.highlight
                })),
                aggregations: response.aggregations
            };
            
        } catch (error) {
            console.error('搜索产品失败:', error);
            throw error;
        }
    }
    
    async getProductSuggestions(query, size = 10) {
        try {
            const response = await this.client.search({
                index: this.index,
                body: {
                    suggest: {
                        product_suggest: {
                            prefix: query,
                            completion: {
                                field: 'suggest',
                                size: size,
                                skip_duplicates: true
                            }
                        }
                    }
                }
            });
            
            return response.suggest.product_suggest[0].options.map(option => ({
                text: option.text,
                score: option._score,
                product: Product.fromElasticsearchDoc(option._source)
            }));
            
        } catch (error) {
            console.error('获取产品建议失败:', error);
            throw error;
        }
    }
    
    async getRecommendations(productId, size = 5) {
        try {
            // 首先获取产品信息
            const productResponse = await this.client.get({
                index: this.index,
                id: productId
            });
            
            const product = Product.fromElasticsearchDoc(productResponse);
            
            // 基于相似性推荐
            const response = await this.client.search({
                index: this.index,
                body: {
                    query: {
                        bool: {
                            should: [
                                {
                                    term: {
                                        category: {
                                            value: product.category,
                                            boost: 2.0
                                        }
                                    }
                                },
                                {
                                    term: {
                                        brand: {
                                            value: product.brand,
                                            boost: 1.5
                                        }
                                    }
                                },
                                {
                                    terms: {
                                        tags: product.tags,
                                        boost: 1.2
                                    }
                                },
                                {
                                    more_like_this: {
                                        fields: ['name', 'description'],
                                        like: [
                                            {
                                                _index: this.index,
                                                _id: productId
                                            }
                                        ],
                                        min_term_freq: 1,
                                        max_query_terms: 12
                                    }
                                }
                            ],
                            must_not: [
                                { term: { _id: productId } }
                            ],
                            filter: [
                                { term: { active: true } }
                            ]
                        }
                    },
                    size
                }
            });
            
            return response.hits.hits.map(hit => ({
                ...Product.fromElasticsearchDoc(hit),
                score: hit._score
            }));
            
        } catch (error) {
            console.error('获取推荐产品失败:', error);
            throw error;
        }
    }
}

module.exports = ProductService;

// routes/products.js
const express = require('express');
const router = express.Router();
const ProductService = require('../services/ProductService');
const Product = require('../models/Product');

const productService = new ProductService();

// 搜索产品
router.get('/search', async (req, res) => {
    try {
        const {
            q: query,
            category,
            brand,
            min_price: minPrice,
            max_price: maxPrice,
            min_rating: minRating,
            tags,
            in_stock: inStock,
            page = 1,
            size = 20,
            sort_by: sortBy = 'relevance'
        } = req.query;
        
        const filters = {
            category,
            brand,
            minPrice: minPrice ? parseFloat(minPrice) : undefined,
            maxPrice: maxPrice ? parseFloat(maxPrice) : undefined,
            minRating: minRating ? parseFloat(minRating) : undefined,
            tags: tags ? tags.split(',') : undefined,
            inStock: inStock === 'true'
        };
        
        const options = {
            from: (page - 1) * size,
            size: parseInt(size),
            sortBy
        };
        
        const result = await productService.searchProducts(query, filters, options);
        
        res.json({
            success: true,
            data: {
                total: result.total,
                page: parseInt(page),
                size: parseInt(size),
                products: result.products,
                aggregations: result.aggregations
            }
        });
        
    } catch (error) {
        console.error('搜索产品错误:', error);
        res.status(500).json({
            success: false,
            error: '搜索产品失败'
        });
    }
});

// 获取产品建议
router.get('/suggest', async (req, res) => {
    try {
        const { q: query, size = 10 } = req.query;
        
        if (!query) {
            return res.status(400).json({
                success: false,
                error: '查询参数不能为空'
            });
        }
        
        const suggestions = await productService.getProductSuggestions(query, parseInt(size));
        
        res.json({
            success: true,
            data: suggestions
        });
        
    } catch (error) {
        console.error('获取产品建议错误:', error);
        res.status(500).json({
            success: false,
            error: '获取产品建议失败'
        });
    }
});

// 获取推荐产品
router.get('/:id/recommendations', async (req, res) => {
    try {
        const { id } = req.params;
        const { size = 5 } = req.query;
        
        const recommendations = await productService.getRecommendations(id, parseInt(size));
        
        res.json({
            success: true,
            data: recommendations
        });
        
    } catch (error) {
        console.error('获取推荐产品错误:', error);
        res.status(500).json({
            success: false,
            error: '获取推荐产品失败'
        });
    }
});

// 创建产品
router.post('/', async (req, res) => {
    try {
        const product = new Product(req.body);
        const result = await productService.indexProduct(product);
        
        res.status(201).json({
            success: true,
            data: {
                id: result._id,
                product
            }
        });
        
    } catch (error) {
        console.error('创建产品错误:', error);
        res.status(500).json({
            success: false,
            error: '创建产品失败'
        });
    }
});

// 批量创建产品
router.post('/bulk', async (req, res) => {
    try {
        const { products } = req.body;
        
        if (!Array.isArray(products)) {
            return res.status(400).json({
                success: false,
                error: '产品数据必须是数组格式'
            });
        }
        
        const productInstances = products.map(data => new Product(data));
        const result = await productService.bulkIndexProducts(productInstances);
        
        res.json({
            success: true,
            data: {
                indexed: result.items.length,
                errors: result.errors
            }
        });
        
    } catch (error) {
        console.error('批量创建产品错误:', error);
        res.status(500).json({
            success: false,
            error: '批量创建产品失败'
        });
    }
});

module.exports = router;

// app.js
const express = require('express');
const cors = require('cors');
const helmet = require('helmet');
const morgan = require('morgan');
const bodyParser = require('body-parser');
require('dotenv').config();

const elasticsearchClient = require('./config/elasticsearch');
const ProductService = require('./services/ProductService');
const productRoutes = require('./routes/products');

const app = express();
const PORT = process.env.PORT || 3000;

// 中间件
app.use(helmet());
app.use(cors());
app.use(morgan('combined'));
app.use(bodyParser.json({ limit: '10mb' }));
app.use(bodyParser.urlencoded({ extended: true }));

// 路由
app.use('/api/products', productRoutes);

// 健康检查
app.get('/health', async (req, res) => {
    try {
        const esConnected = await elasticsearchClient.testConnection();
        
        res.json({
            status: 'ok',
            timestamp: new Date().toISOString(),
            services: {
                elasticsearch: esConnected ? 'connected' : 'disconnected'
            }
        });
    } catch (error) {
        res.status(500).json({
            status: 'error',
            timestamp: new Date().toISOString(),
            error: error.message
        });
    }
});

// 错误处理中间件
app.use((err, req, res, next) => {
    console.error('未处理的错误:', err);
    res.status(500).json({
        success: false,
        error: '服务器内部错误'
    });
});

// 404处理
app.use((req, res) => {
    res.status(404).json({
        success: false,
        error: '接口不存在'
    });
});

// 启动服务器
async function startServer() {
    try {
        // 测试Elasticsearch连接
        const connected = await elasticsearchClient.testConnection();
        if (!connected) {
            console.error('无法连接到Elasticsearch,服务器启动失败');
            process.exit(1);
        }
        
        // 创建索引
        const productService = new ProductService();
        await productService.createIndex();
        
        app.listen(PORT, () => {
            console.log(`服务器运行在端口 ${PORT}`);
            console.log(`健康检查: http://localhost:${PORT}/health`);
            console.log(`API文档: http://localhost:${PORT}/api/products/search`);
        });
        
    } catch (error) {
        console.error('启动服务器失败:', error);
        process.exit(1);
    }
}

startServer();

module.exports = app;

3.3 Python集成

3.3.1 Django集成

# settings.py
from elasticsearch import Elasticsearch
from elasticsearch_dsl import connections

# Elasticsearch配置
ELASTICSEARCH_DSL = {
    'default': {
        'hosts': 'localhost:9200',
        'http_auth': ('elastic', 'password'),
        'use_ssl': True,
        'verify_certs': False,
        'ssl_show_warn': False,
        'timeout': 30,
        'max_retries': 3,
        'retry_on_timeout': True
    },
}

# 建立连接
connections.configure(**ELASTICSEARCH_DSL)

# models.py
from django.db import models
from elasticsearch_dsl import Document, Text, Keyword, Integer, Float, Date, Boolean, Completion
from elasticsearch_dsl.connections import connections

class Product(models.Model):
    name = models.CharField(max_length=200)
    description = models.TextField()
    price = models.DecimalField(max_digits=10, decimal_places=2)
    category = models.CharField(max_length=100)
    brand = models.CharField(max_length=100)
    stock = models.IntegerField(default=0)
    rating = models.FloatField(default=0.0)
    reviews_count = models.IntegerField(default=0)
    active = models.BooleanField(default=True)
    created_at = models.DateTimeField(auto_now_add=True)
    updated_at = models.DateTimeField(auto_now=True)
    
    def __str__(self):
        return self.name
    
    def to_elasticsearch(self):
        """转换为Elasticsearch文档"""
        return {
            'id': self.id,
            'name': self.name,
            'description': self.description,
            'price': float(self.price),
            'category': self.category,
            'brand': self.brand,
            'stock': self.stock,
            'rating': self.rating,
            'reviews_count': self.reviews_count,
            'active': self.active,
            'created_at': self.created_at,
            'updated_at': self.updated_at,
            'suggest': {
                'input': [self.name, self.brand],
                'weight': int(self.rating * 10 + self.reviews_count)
            }
        }

class ProductDocument(Document):
    """产品Elasticsearch文档"""
    id = Integer()
    name = Text(
        analyzer='standard',
        fields={
            'keyword': Keyword(),
            'suggest': Completion()
        }
    )
    description = Text(analyzer='standard')
    price = Float()
    category = Keyword()
    brand = Keyword()
    stock = Integer()
    rating = Float()
    reviews_count = Integer()
    active = Boolean()
    created_at = Date()
    updated_at = Date()
    suggest = Completion()
    
    class Index:
        name = 'products'
        settings = {
            'number_of_shards': 1,
            'number_of_replicas': 1
        }
    
    def save(self, **kwargs):
        return super().save(**kwargs)
    
    @classmethod
    def from_django_model(cls, product):
        """从Django模型创建文档"""
        doc = cls(
            meta={'id': product.id},
            **product.to_elasticsearch()
        )
        return doc

# services.py
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
from django.conf import settings
from .models import Product, ProductDocument
import logging

logger = logging.getLogger(__name__)

class ElasticsearchService:
    def __init__(self):
        self.es = Elasticsearch(**settings.ELASTICSEARCH_DSL['default'])
        self.index = 'products'
    
    def create_index(self):
        """创建索引"""
        try:
            ProductDocument.init(using=self.es)
            logger.info(f"索引 {self.index} 创建成功")
        except Exception as e:
            logger.error(f"创建索引失败: {e}")
            raise
    
    def index_product(self, product):
        """索引单个产品"""
        try:
            doc = ProductDocument.from_django_model(product)
            doc.save(using=self.es)
            logger.info(f"产品 {product.id} 索引成功")
        except Exception as e:
            logger.error(f"索引产品失败: {e}")
            raise
    
    def bulk_index_products(self, products):
        """批量索引产品"""
        try:
            actions = []
            for product in products:
                doc = product.to_elasticsearch()
                action = {
                    '_index': self.index,
                    '_id': product.id,
                    '_source': doc
                }
                actions.append(action)
            
            success, failed = bulk(self.es, actions)
            logger.info(f"批量索引完成: 成功 {success}, 失败 {len(failed)}")
            return success, failed
            
        except Exception as e:
            logger.error(f"批量索引失败: {e}")
            raise
    
    def search_products(self, query=None, filters=None, page=1, size=20):
        """搜索产品"""
        try:
            search = ProductDocument.search(using=self.es)
            
            # 构建查询
            if query:
                search = search.query(
                    'multi_match',
                    query=query,
                    fields=['name^3', 'description^1', 'brand^2'],
                    fuzziness='AUTO'
                )
            
            # 应用过滤器
            if filters:
                if filters.get('category'):
                    search = search.filter('term', category=filters['category'])
                
                if filters.get('brand'):
                    search = search.filter('term', brand=filters['brand'])
                
                if filters.get('min_price') or filters.get('max_price'):
                    price_range = {}
                    if filters.get('min_price'):
                        price_range['gte'] = filters['min_price']
                    if filters.get('max_price'):
                        price_range['lte'] = filters['max_price']
                    search = search.filter('range', price=price_range)
                
                if filters.get('in_stock'):
                    search = search.filter('range', stock={'gt': 0})
            
            # 分页
            start = (page - 1) * size
            search = search[start:start + size]
            
            # 添加聚合
            search.aggs.bucket('categories', 'terms', field='category')
            search.aggs.bucket('brands', 'terms', field='brand')
            
            # 执行搜索
            response = search.execute()
            
            return {
                'total': response.hits.total.value,
                'products': [hit.to_dict() for hit in response],
                'aggregations': response.aggregations.to_dict()
            }
            
        except Exception as e:
            logger.error(f"搜索产品失败: {e}")
            raise
    
    def get_suggestions(self, query, size=10):
        """获取搜索建议"""
        try:
            search = ProductDocument.search(using=self.es)
            search = search.suggest(
                'product_suggest',
                query,
                completion={
                    'field': 'suggest',
                    'size': size,
                    'skip_duplicates': True
                }
            )
            
            response = search.execute()
            suggestions = []
            
            for option in response.suggest.product_suggest[0].options:
                suggestions.append({
                    'text': option.text,
                    'score': option._score,
                    'product': option._source.to_dict()
                })
            
            return suggestions
            
        except Exception as e:
            logger.error(f"获取建议失败: {e}")
            raise

# signals.py
from django.db.models.signals import post_save, post_delete
from django.dispatch import receiver
from .models import Product
from .services import ElasticsearchService
import logging

logger = logging.getLogger(__name__)
es_service = ElasticsearchService()

@receiver(post_save, sender=Product)
def update_product_index(sender, instance, created, **kwargs):
    """产品保存时更新索引"""
    try:
        es_service.index_product(instance)
        logger.info(f"产品 {instance.id} 索引更新成功")
    except Exception as e:
        logger.error(f"更新产品索引失败: {e}")

@receiver(post_delete, sender=Product)
def delete_product_index(sender, instance, **kwargs):
    """产品删除时从索引中移除"""
    try:
        ProductDocument.get(id=instance.id, using=es_service.es).delete()
        logger.info(f"产品 {instance.id} 从索引中删除成功")
    except Exception as e:
        logger.error(f"从索引删除产品失败: {e}")

# views.py
from django.http import JsonResponse
from django.views import View
from django.views.decorators.csrf import csrf_exempt
from django.utils.decorators import method_decorator
from .services import ElasticsearchService
import json

es_service = ElasticsearchService()

@method_decorator(csrf_exempt, name='dispatch')
class ProductSearchView(View):
    def get(self, request):
        try:
            query = request.GET.get('q', '')
            category = request.GET.get('category')
            brand = request.GET.get('brand')
            min_price = request.GET.get('min_price')
            max_price = request.GET.get('max_price')
            in_stock = request.GET.get('in_stock') == 'true'
            page = int(request.GET.get('page', 1))
            size = int(request.GET.get('size', 20))
            
            filters = {}
            if category:
                filters['category'] = category
            if brand:
                filters['brand'] = brand
            if min_price:
                filters['min_price'] = float(min_price)
            if max_price:
                filters['max_price'] = float(max_price)
            if in_stock:
                filters['in_stock'] = in_stock
            
            result = es_service.search_products(
                query=query,
                filters=filters,
                page=page,
                size=size
            )
            
            return JsonResponse({
                'success': True,
                'data': result
            })
            
        except Exception as e:
            return JsonResponse({
                'success': False,
                'error': str(e)
            }, status=500)

class ProductSuggestView(View):
    def get(self, request):
        try:
            query = request.GET.get('q', '')
            size = int(request.GET.get('size', 10))
            
            if not query:
                return JsonResponse({
                    'success': False,
                    'error': '查询参数不能为空'
                }, status=400)
            
            suggestions = es_service.get_suggestions(query, size)
            
            return JsonResponse({
                'success': True,
                'data': suggestions
            })
            
        except Exception as e:
            return JsonResponse({
                'success': False,
                'error': str(e)
            }, status=500)

# urls.py
from django.urls import path
from .views import ProductSearchView, ProductSuggestView

urlpatterns = [
    path('search/', ProductSearchView.as_view(), name='product_search'),
    path('suggest/', ProductSuggestView.as_view(), name='product_suggest'),
]

# management/commands/rebuild_index.py
from django.core.management.base import BaseCommand
from myapp.models import Product
from myapp.services import ElasticsearchService

class Command(BaseCommand):
    help = '重建Elasticsearch索引'
    
    def add_arguments(self, parser):
        parser.add_argument(
            '--batch-size',
            type=int,
            default=1000,
            help='批量处理大小'
        )
    
    def handle(self, *args, **options):
        es_service = ElasticsearchService()
        batch_size = options['batch_size']
        
        self.stdout.write('开始重建索引...')
        
        # 创建索引
        es_service.create_index()
        
        # 获取所有产品
        total_products = Product.objects.count()
        self.stdout.write(f'总共 {total_products} 个产品需要索引')
        
        # 批量处理
        for i in range(0, total_products, batch_size):
            products = Product.objects.all()[i:i + batch_size]
            success, failed = es_service.bulk_index_products(products)
            
            self.stdout.write(
                f'批次 {i//batch_size + 1}: 成功 {success}, 失败 {len(failed)}'
            )
        
        self.stdout.write(
            self.style.SUCCESS('索引重建完成!')
        )

3.3.2 Flask集成

# app.py
from flask import Flask, request, jsonify
from elasticsearch import Elasticsearch
from elasticsearch_dsl import Document, Text, Keyword, Integer, Float, Date, Boolean, Completion, connections
import os
from datetime import datetime

app = Flask(__name__)

# Elasticsearch配置
ES_HOST = os.getenv('ELASTICSEARCH_HOST', 'localhost:9200')
ES_USERNAME = os.getenv('ELASTICSEARCH_USERNAME', 'elastic')
ES_PASSWORD = os.getenv('ELASTICSEARCH_PASSWORD', 'password')

# 建立连接
connections.create_connection(
    hosts=[ES_HOST],
    http_auth=(ES_USERNAME, ES_PASSWORD),
    use_ssl=True,
    verify_certs=False,
    ssl_show_warn=False,
    timeout=30
)

class ProductDocument(Document):
    """产品文档"""
    name = Text(
        analyzer='standard',
        fields={
            'keyword': Keyword(),
            'suggest': Completion()
        }
    )
    description = Text(analyzer='standard')
    price = Float()
    category = Keyword()
    brand = Keyword()
    stock = Integer()
    rating = Float()
    reviews_count = Integer()
    active = Boolean()
    created_at = Date()
    updated_at = Date()
    suggest = Completion()
    
    class Index:
        name = 'products'
        settings = {
            'number_of_shards': 1,
            'number_of_replicas': 1
        }

class ProductService:
    def __init__(self):
        self.es = connections.get_connection()
        self.index = 'products'
    
    def create_index(self):
        """创建索引"""
        try:
            ProductDocument.init()
            return True
        except Exception as e:
            print(f"创建索引失败: {e}")
            return False
    
    def index_product(self, product_data):
        """索引产品"""
        try:
            doc = ProductDocument(
                meta={'id': product_data.get('id')},
                **product_data
            )
            doc.save()
            return True
        except Exception as e:
            print(f"索引产品失败: {e}")
            return False
    
    def search_products(self, query=None, filters=None, page=1, size=20):
        """搜索产品"""
        try:
            search = ProductDocument.search()
            
            # 构建查询
            if query:
                search = search.query(
                    'multi_match',
                    query=query,
                    fields=['name^3', 'description^1', 'brand^2'],
                    fuzziness='AUTO'
                )
            
            # 应用过滤器
            if filters:
                for key, value in filters.items():
                    if key == 'category' and value:
                        search = search.filter('term', category=value)
                    elif key == 'brand' and value:
                        search = search.filter('term', brand=value)
                    elif key == 'price_range' and value:
                        search = search.filter('range', price=value)
                    elif key == 'in_stock' and value:
                        search = search.filter('range', stock={'gt': 0})
            
            # 分页
            start = (page - 1) * size
            search = search[start:start + size]
            
            # 添加聚合
            search.aggs.bucket('categories', 'terms', field='category')
            search.aggs.bucket('brands', 'terms', field='brand')
            
            # 执行搜索
            response = search.execute()
            
            return {
                'total': response.hits.total.value,
                'products': [hit.to_dict() for hit in response],
                'aggregations': response.aggregations.to_dict() if hasattr(response, 'aggregations') else {}
            }
            
        except Exception as e:
            print(f"搜索失败: {e}")
            return {'total': 0, 'products': [], 'aggregations': {}}
    
    def get_suggestions(self, query, size=10):
        """获取搜索建议"""
        try:
            search = ProductDocument.search()
            search = search.suggest(
                'product_suggest',
                query,
                completion={
                    'field': 'suggest',
                    'size': size,
                    'skip_duplicates': True
                }
            )
            
            response = search.execute()
            suggestions = []
            
            if hasattr(response, 'suggest') and 'product_suggest' in response.suggest:
                for option in response.suggest.product_suggest[0].options:
                    suggestions.append({
                        'text': option.text,
                        'score': option._score
                    })
            
            return suggestions
            
        except Exception as e:
            print(f"获取建议失败: {e}")
            return []

# 初始化服务
product_service = ProductService()

@app.route('/health')
def health_check():
    """健康检查"""
    try:
        es = connections.get_connection()
        es.ping()
        return jsonify({
            'status': 'ok',
            'timestamp': datetime.now().isoformat(),
            'elasticsearch': 'connected'
        })
    except Exception as e:
        return jsonify({
            'status': 'error',
            'timestamp': datetime.now().isoformat(),
            'elasticsearch': 'disconnected',
            'error': str(e)
        }), 500

@app.route('/api/products/search')
def search_products():
    """搜索产品"""
    try:
        query = request.args.get('q', '')
        category = request.args.get('category')
        brand = request.args.get('brand')
        min_price = request.args.get('min_price')
        max_price = request.args.get('max_price')
        in_stock = request.args.get('in_stock') == 'true'
        page = int(request.args.get('page', 1))
        size = int(request.args.get('size', 20))
        
        filters = {}
        if category:
            filters['category'] = category
        if brand:
            filters['brand'] = brand
        if min_price or max_price:
            price_range = {}
            if min_price:
                price_range['gte'] = float(min_price)
            if max_price:
                price_range['lte'] = float(max_price)
            filters['price_range'] = price_range
        if in_stock:
            filters['in_stock'] = in_stock
        
        result = product_service.search_products(
            query=query,
            filters=filters,
            page=page,
            size=size
        )
        
        return jsonify({
            'success': True,
            'data': result
        })
        
    except Exception as e:
        return jsonify({
            'success': False,
            'error': str(e)
        }), 500

@app.route('/api/products/suggest')
def suggest_products():
    """产品搜索建议"""
    try:
        query = request.args.get('q', '')
        size = int(request.args.get('size', 10))
        
        if not query:
            return jsonify({
                'success': False,
                'error': '查询参数不能为空'
            }), 400
        
        suggestions = product_service.get_suggestions(query, size)
        
        return jsonify({
            'success': True,
            'data': suggestions
        })
        
    except Exception as e:
        return jsonify({
            'success': False,
            'error': str(e)
        }), 500

@app.route('/api/products', methods=['POST'])
def create_product():
    """创建产品"""
    try:
        data = request.get_json()
        
        # 添加时间戳
        data['created_at'] = datetime.now()
        data['updated_at'] = datetime.now()
        
        # 添加搜索建议
        data['suggest'] = {
            'input': [data.get('name', ''), data.get('brand', '')],
            'weight': int(data.get('rating', 0) * 10 + data.get('reviews_count', 0))
        }
        
        success = product_service.index_product(data)
        
        if success:
            return jsonify({
                'success': True,
                'message': '产品创建成功'
            }), 201
        else:
            return jsonify({
                'success': False,
                'error': '产品创建失败'
            }), 500
            
    except Exception as e:
        return jsonify({
            'success': False,
            'error': str(e)
        }), 500

if __name__ == '__main__':
    # 创建索引
    product_service.create_index()
    
    # 启动应用
    app.run(debug=True, host='0.0.0.0', port=5000)

3.4 Go集成

// main.go
package main

import (
	"context"
	"encoding/json"
	"fmt"
	"log"
	"net/http"
	"strconv"
	"time"

	"github.com/elastic/go-elasticsearch/v8"
	"github.com/elastic/go-elasticsearch/v8/esapi"
	"github.com/gorilla/mux"
	"github.com/rs/cors"
)

type Product struct {
	ID           int       `json:"id"`
	Name         string    `json:"name"`
	Description  string    `json:"description"`
	Price        float64   `json:"price"`
	Category     string    `json:"category"`
	Brand        string    `json:"brand"`
	Stock        int       `json:"stock"`
	Rating       float64   `json:"rating"`
	ReviewsCount int       `json:"reviews_count"`
	Active       bool      `json:"active"`
	CreatedAt    time.Time `json:"created_at"`
	UpdatedAt    time.Time `json:"updated_at"`
	Suggest      Suggest   `json:"suggest"`
}

type Suggest struct {
	Input  []string `json:"input"`
	Weight int      `json:"weight"`
}

type SearchResponse struct {
	Total        int                    `json:"total"`
	Products     []Product              `json:"products"`
	Aggregations map[string]interface{} `json:"aggregations"`
}

type ElasticsearchService struct {
	client *elasticsearch.Client
	index  string
}

func NewElasticsearchService() (*ElasticsearchService, error) {
	cfg := elasticsearch.Config{
		Addresses: []string{
			"https://localhost:9200",
		},
		Username: "elastic",
		Password: "password",
		Transport: &http.Transport{
			TLSClientConfig: &tls.Config{
				InsecureSkipVerify: true,
			},
		},
	}

	client, err := elasticsearch.NewClient(cfg)
	if err != nil {
		return nil, fmt.Errorf("创建Elasticsearch客户端失败: %v", err)
	}

	return &ElasticsearchService{
		client: client,
		index:  "products",
	}, nil
}

func (es *ElasticsearchService) CreateIndex() error {
	mapping := `{
		"settings": {
			"number_of_shards": 1,
			"number_of_replicas": 1,
			"analysis": {
				"analyzer": {
					"product_analyzer": {
						"type": "custom",
						"tokenizer": "standard",
						"filter": ["lowercase", "stop", "snowball"]
					}
				}
			}
		},
		"mappings": {
			"properties": {
				"name": {
					"type": "text",
					"analyzer": "product_analyzer",
					"fields": {
						"keyword": {"type": "keyword"},
						"suggest": {"type": "completion"}
					}
				},
				"description": {
					"type": "text",
					"analyzer": "product_analyzer"
				},
				"price": {"type": "double"},
				"category": {"type": "keyword"},
				"brand": {"type": "keyword"},
				"stock": {"type": "integer"},
				"rating": {"type": "float"},
				"reviews_count": {"type": "integer"},
				"active": {"type": "boolean"},
				"created_at": {"type": "date"},
				"updated_at": {"type": "date"},
				"suggest": {
					"type": "completion",
					"analyzer": "simple",
					"preserve_separators": true,
					"preserve_position_increments": true,
					"max_input_length": 50
				}
			}
		}
	}`

	req := esapi.IndicesCreateRequest{
		Index: es.index,
		Body:  strings.NewReader(mapping),
	}

	res, err := req.Do(context.Background(), es.client)
	if err != nil {
		return fmt.Errorf("创建索引请求失败: %v", err)
	}
	defer res.Body.Close()

	if res.IsError() {
		return fmt.Errorf("创建索引失败: %s", res.String())
	}

	log.Printf("索引 %s 创建成功", es.index)
	return nil
}

func (es *ElasticsearchService) IndexProduct(product Product) error {
	productJSON, err := json.Marshal(product)
	if err != nil {
		return fmt.Errorf("序列化产品失败: %v", err)
	}

	req := esapi.IndexRequest{
		Index:      es.index,
		DocumentID: strconv.Itoa(product.ID),
		Body:       strings.NewReader(string(productJSON)),
		Refresh:    "true",
	}

	res, err := req.Do(context.Background(), es.client)
	if err != nil {
		return fmt.Errorf("索引产品请求失败: %v", err)
	}
	defer res.Body.Close()

	if res.IsError() {
		return fmt.Errorf("索引产品失败: %s", res.String())
	}

	return nil
}

func (es *ElasticsearchService) SearchProducts(query string, filters map[string]interface{}, page, size int) (*SearchResponse, error) {
	// 构建搜索查询
	searchQuery := map[string]interface{}{
		"query": map[string]interface{}{
			"bool": map[string]interface{}{
				"must": []interface{}{},
				"filter": []interface{}{
					map[string]interface{}{
						"term": map[string]interface{}{
							"active": true,
						},
					},
				},
			},
		},
		"from": (page - 1) * size,
		"size": size,
		"aggs": map[string]interface{}{
			"categories": map[string]interface{}{
				"terms": map[string]interface{}{
					"field": "category",
					"size":  20,
				},
			},
			"brands": map[string]interface{}{
				"terms": map[string]interface{}{
					"field": "brand",
					"size":  20,
				},
			},
		},
	}

	// 添加文本查询
	if query != "" {
		mustQueries := searchQuery["query"].(map[string]interface{})["bool"].(map[string]interface{})["must"].([]interface{})
		mustQueries = append(mustQueries, map[string]interface{}{
			"multi_match": map[string]interface{}{
				"query": query,
				"fields": []string{
					"name^3",
					"description^1",
					"brand^2",
				},
				"fuzziness": "AUTO",
			},
		})
		searchQuery["query"].(map[string]interface{})["bool"].(map[string]interface{})["must"] = mustQueries
	} else {
		mustQueries := searchQuery["query"].(map[string]interface{})["bool"].(map[string]interface{})["must"].([]interface{})
		mustQueries = append(mustQueries, map[string]interface{}{
			"match_all": map[string]interface{}{},
		})
		searchQuery["query"].(map[string]interface{})["bool"].(map[string]interface{})["must"] = mustQueries
	}

	// 添加过滤器
	filterQueries := searchQuery["query"].(map[string]interface{})["bool"].(map[string]interface{})["filter"].([]interface{})

	if category, ok := filters["category"]; ok && category != "" {
		filterQueries = append(filterQueries, map[string]interface{}{
			"term": map[string]interface{}{
				"category": category,
			},
		})
	}

	if brand, ok := filters["brand"]; ok && brand != "" {
		filterQueries = append(filterQueries, map[string]interface{}{
			"term": map[string]interface{}{
				"brand": brand,
			},
		})
	}

	if minPrice, ok := filters["min_price"]; ok {
		priceRange := map[string]interface{}{}
		if minPrice != nil {
			priceRange["gte"] = minPrice
		}
		if maxPrice, ok := filters["max_price"]; ok && maxPrice != nil {
			priceRange["lte"] = maxPrice
		}
		if len(priceRange) > 0 {
			filterQueries = append(filterQueries, map[string]interface{}{
				"range": map[string]interface{}{
					"price": priceRange,
				},
			})
		}
	}

	if inStock, ok := filters["in_stock"]; ok && inStock == true {
		filterQueries = append(filterQueries, map[string]interface{}{
			"range": map[string]interface{}{
				"stock": map[string]interface{}{
					"gt": 0,
				},
			},
		})
	}

	searchQuery["query"].(map[string]interface{})["bool"].(map[string]interface{})["filter"] = filterQueries

	// 执行搜索
	queryJSON, err := json.Marshal(searchQuery)
	if err != nil {
		return nil, fmt.Errorf("序列化查询失败: %v", err)
	}

	req := esapi.SearchRequest{
		Index: []string{es.index},
		Body:  strings.NewReader(string(queryJSON)),
	}

	res, err := req.Do(context.Background(), es.client)
	if err != nil {
		return nil, fmt.Errorf("搜索请求失败: %v", err)
	}
	defer res.Body.Close()

	if res.IsError() {
		return nil, fmt.Errorf("搜索失败: %s", res.String())
	}

	// 解析响应
	var searchResult map[string]interface{}
	if err := json.NewDecoder(res.Body).Decode(&searchResult); err != nil {
		return nil, fmt.Errorf("解析搜索结果失败: %v", err)
	}

	// 提取产品
	hits := searchResult["hits"].(map[string]interface{})["hits"].([]interface{})
	products := make([]Product, 0, len(hits))

	for _, hit := range hits {
		hitMap := hit.(map[string]interface{})
		source := hitMap["_source"].(map[string]interface{})
		
		var product Product
		productJSON, _ := json.Marshal(source)
		json.Unmarshal(productJSON, &product)
		
		products = append(products, product)
	}

	// 提取总数
	total := int(searchResult["hits"].(map[string]interface{})["total"].(map[string]interface{})["value"].(float64))

	// 提取聚合
	aggregations := make(map[string]interface{})
	if aggs, ok := searchResult["aggregations"]; ok {
		aggregations = aggs.(map[string]interface{})
	}

	return &SearchResponse{
		Total:        total,
		Products:     products,
		Aggregations: aggregations,
	}, nil
}

func (es *ElasticsearchService) GetSuggestions(query string, size int) ([]map[string]interface{}, error) {
	suggestQuery := map[string]interface{}{
		"suggest": map[string]interface{}{
			"product_suggest": map[string]interface{}{
				"prefix": query,
				"completion": map[string]interface{}{
					"field": "suggest",
					"size":  size,
					"skip_duplicates": true,
				},
			},
		},
	}

	queryJSON, err := json.Marshal(suggestQuery)
	if err != nil {
		return nil, fmt.Errorf("序列化建议查询失败: %v", err)
	}

	req := esapi.SearchRequest{
		Index: []string{es.index},
		Body:  strings.NewReader(string(queryJSON)),
	}

	res, err := req.Do(context.Background(), es.client)
	if err != nil {
		return nil, fmt.Errorf("建议请求失败: %v", err)
	}
	defer res.Body.Close()

	if res.IsError() {
		return nil, fmt.Errorf("获取建议失败: %s", res.String())
	}

	var suggestResult map[string]interface{}
	if err := json.NewDecoder(res.Body).Decode(&suggestResult); err != nil {
		return nil, fmt.Errorf("解析建议结果失败: %v", err)
	}

	suggestions := make([]map[string]interface{}, 0)
	if suggest, ok := suggestResult["suggest"]; ok {
		if productSuggest, ok := suggest.(map[string]interface{})["product_suggest"]; ok {
			if options, ok := productSuggest.([]interface{})[0].(map[string]interface{})["options"]; ok {
				for _, option := range options.([]interface{}) {
					optionMap := option.(map[string]interface{})
					suggestions = append(suggestions, map[string]interface{}{
						"text":  optionMap["text"],
						"score": optionMap["_score"],
					})
				}
			}
		}
	}

	return suggestions, nil
}

// HTTP处理器
func (es *ElasticsearchService) SearchHandler(w http.ResponseWriter, r *http.Request) {
	w.Header().Set("Content-Type", "application/json")

	query := r.URL.Query().Get("q")
	category := r.URL.Query().Get("category")
	brand := r.URL.Query().Get("brand")
	minPriceStr := r.URL.Query().Get("min_price")
	maxPriceStr := r.URL.Query().Get("max_price")
	inStockStr := r.URL.Query().Get("in_stock")
	pageStr := r.URL.Query().Get("page")
	sizeStr := r.URL.Query().Get("size")

	// 解析参数
	page := 1
	if pageStr != "" {
		if p, err := strconv.Atoi(pageStr); err == nil {
			page = p
		}
	}

	size := 20
	if sizeStr != "" {
		if s, err := strconv.Atoi(sizeStr); err == nil {
			size = s
		}
	}

	filters := make(map[string]interface{})
	if category != "" {
		filters["category"] = category
	}
	if brand != "" {
		filters["brand"] = brand
	}
	if minPriceStr != "" {
		if minPrice, err := strconv.ParseFloat(minPriceStr, 64); err == nil {
			filters["min_price"] = minPrice
		}
	}
	if maxPriceStr != "" {
		if maxPrice, err := strconv.ParseFloat(maxPriceStr, 64); err == nil {
			filters["max_price"] = maxPrice
		}
	}
	if inStockStr == "true" {
		filters["in_stock"] = true
	}

	result, err := es.SearchProducts(query, filters, page, size)
	if err != nil {
		http.Error(w, fmt.Sprintf(`{"success": false, "error": "%s"}`, err.Error()), http.StatusInternalServerError)
		return
	}

	response := map[string]interface{}{
		"success": true,
		"data":    result,
	}

	json.NewEncoder(w).Encode(response)
}

func (es *ElasticsearchService) SuggestHandler(w http.ResponseWriter, r *http.Request) {
	w.Header().Set("Content-Type", "application/json")

	query := r.URL.Query().Get("q")
	sizeStr := r.URL.Query().Get("size")

	if query == "" {
		http.Error(w, `{"success": false, "error": "查询参数不能为空"}`, http.StatusBadRequest)
		return
	}

	size := 10
	if sizeStr != "" {
		if s, err := strconv.Atoi(sizeStr); err == nil {
			size = s
		}
	}

	suggestions, err := es.GetSuggestions(query, size)
	if err != nil {
		http.Error(w, fmt.Sprintf(`{"success": false, "error": "%s"}`, err.Error()), http.StatusInternalServerError)
		return
	}

	response := map[string]interface{}{
		"success": true,
		"data":    suggestions,
	}

	json.NewEncoder(w).Encode(response)
}

func (es *ElasticsearchService) CreateProductHandler(w http.ResponseWriter, r *http.Request) {
	w.Header().Set("Content-Type", "application/json")

	if r.Method != "POST" {
		http.Error(w, `{"success": false, "error": "方法不允许"}`, http.StatusMethodNotAllowed)
		return
	}

	var product Product
	if err := json.NewDecoder(r.Body).Decode(&product); err != nil {
		http.Error(w, fmt.Sprintf(`{"success": false, "error": "解析请求失败: %s"}`, err.Error()), http.StatusBadRequest)
		return
	}

	// 设置时间戳
	now := time.Now()
	product.CreatedAt = now
	product.UpdatedAt = now

	// 设置搜索建议
	product.Suggest = Suggest{
		Input:  []string{product.Name, product.Brand},
		Weight: int(product.Rating*10) + product.ReviewsCount,
	}

	if err := es.IndexProduct(product); err != nil {
		http.Error(w, fmt.Sprintf(`{"success": false, "error": "%s"}`, err.Error()), http.StatusInternalServerError)
		return
	}

	response := map[string]interface{}{
		"success": true,
		"message": "产品创建成功",
		"data":    product,
	}

	w.WriteHeader(http.StatusCreated)
	json.NewEncoder(w).Encode(response)
}

func healthHandler(w http.ResponseWriter, r *http.Request) {
	w.Header().Set("Content-Type", "application/json")

	response := map[string]interface{}{
		"status":    "ok",
		"timestamp": time.Now().Format(time.RFC3339),
		"service":   "elasticsearch-api",
	}

	json.NewEncoder(w).Encode(response)
}

func main() {
	// 创建Elasticsearch服务
	esService, err := NewElasticsearchService()
	if err != nil {
		log.Fatalf("创建Elasticsearch服务失败: %v", err)
	}

	// 创建索引
	if err := esService.CreateIndex(); err != nil {
		log.Printf("创建索引失败: %v", err)
	}

	// 创建路由
	r := mux.NewRouter()

	// API路由
	api := r.PathPrefix("/api").Subrouter()
	api.HandleFunc("/products/search", esService.SearchHandler).Methods("GET")
	api.HandleFunc("/products/suggest", esService.SuggestHandler).Methods("GET")
	api.HandleFunc("/products", esService.CreateProductHandler).Methods("POST")

	// 健康检查
	r.HandleFunc("/health", healthHandler).Methods("GET")

	// CORS配置
	c := cors.New(cors.Options{
		AllowedOrigins: []string{"*"},
		AllowedMethods: []string{"GET", "POST", "PUT", "DELETE", "OPTIONS"},
		AllowedHeaders: []string{"*"},
	})

	handler := c.Handler(r)

	// 启动服务器
	port := ":8080"
	log.Printf("服务器启动在端口 %s", port)
	log.Printf("健康检查: http://localhost%s/health", port)
	log.Printf("搜索API: http://localhost%s/api/products/search", port)

	if err := http.ListenAndServe(port, handler); err != nil {
		log.Fatalf("启动服务器失败: %v", err)
	}
}

4. 最佳实践

4.1 架构设计原则

  1. 分离关注点

    • 数据存储与搜索分离
    • 读写分离
    • 缓存层设计
  2. 可扩展性

    • 水平扩展设计
    • 微服务架构
    • 负载均衡
  3. 容错性

    • 故障转移
    • 降级策略
    • 监控告警

4.2 性能优化

  1. 索引优化

    • 合理的分片策略
    • 索引模板使用
    • 字段映射优化
  2. 查询优化

    • 查询缓存
    • 分页优化
    • 聚合优化
  3. 数据同步

    • 增量同步
    • 批量处理
    • 异步处理

4.3 安全考虑

  1. 访问控制

    • 用户认证
    • 权限管理
    • API密钥管理
  2. 数据保护

    • 传输加密
    • 存储加密
    • 敏感数据脱敏
  3. 审计日志

    • 操作记录
    • 异常监控
    • 安全事件追踪

5. 章节总结

5.1 核心知识点

  1. ELK Stack集成

    • Logstash数据处理
    • Kibana可视化
    • Beats数据收集
  2. 编程语言集成

    • Node.js客户端
    • Python客户端
    • Go客户端
  3. 最佳实践

    • 架构设计
    • 性能优化
    • 安全考虑

5.2 实践要点

  1. 选择合适的集成方案
  2. 遵循最佳实践
  3. 注重性能和安全
  4. 建立监控和告警

5.3 练习题

  1. 设计一个完整的ELK Stack日志分析系统
  2. 实现一个多语言的Elasticsearch搜索服务
  3. 建立Elasticsearch集群的监控和告警系统
  4. 设计一个高可用的搜索架构方案