1. ELK Stack集成
1.1 Logstash集成
基础配置
# logstash.conf
input {
# 文件输入
file {
path => "/var/log/application/*.log"
start_position => "beginning"
sincedb_path => "/dev/null"
codec => "json"
tags => ["application"]
}
# Beats输入
beats {
port => 5044
}
# Syslog输入
syslog {
port => 514
tags => ["syslog"]
}
# HTTP输入
http {
port => 8080
codec => "json"
tags => ["http_input"]
}
}
filter {
# 解析应用日志
if "application" in [tags] {
# 解析时间戳
date {
match => [ "timestamp", "yyyy-MM-dd HH:mm:ss.SSS" ]
target => "@timestamp"
}
# 解析日志级别
mutate {
uppercase => [ "level" ]
}
# 添加字段
mutate {
add_field => { "log_type" => "application" }
add_field => { "environment" => "production" }
}
# 解析异常堆栈
if [message] =~ /Exception/ {
mutate {
add_tag => [ "exception" ]
}
# 提取异常类型
grok {
match => { "message" => "%{JAVACLASS:exception_class}: %{GREEDYDATA:exception_message}" }
}
}
}
# 解析Web访问日志
if "nginx" in [tags] {
grok {
match => {
"message" => "%{COMBINEDAPACHELOG}"
}
}
# 解析User-Agent
useragent {
source => "agent"
target => "user_agent"
}
# 地理位置解析
geoip {
source => "clientip"
target => "geoip"
}
# 转换响应时间为数字
mutate {
convert => { "response" => "integer" }
convert => { "bytes" => "integer" }
}
}
# 通用过滤器
# 移除空字段
ruby {
code => "
event.to_hash.each { |k, v|
if v.nil? || (v.respond_to?(:empty?) && v.empty?)
event.remove(k)
end
}
"
}
# 添加处理时间戳
ruby {
code => "event.set('processed_at', Time.now.utc.iso8601)"
}
}
output {
# 输出到Elasticsearch
elasticsearch {
hosts => ["localhost:9200"]
index => "logs-%{+YYYY.MM.dd}"
template_name => "logs"
template_pattern => "logs-*"
template => "/etc/logstash/templates/logs.json"
# 认证配置
user => "logstash_writer"
password => "${LOGSTASH_PASSWORD}"
# 性能优化
workers => 4
flush_size => 1000
idle_flush_time => 5
}
# 错误输出到文件
if "_grokparsefailure" in [tags] {
file {
path => "/var/log/logstash/grok_failures.log"
codec => "json_lines"
}
}
# 调试输出
if [log_level] == "DEBUG" {
stdout {
codec => "rubydebug"
}
}
}
Logstash模板配置
{
"index_patterns": ["logs-*"],
"settings": {
"number_of_shards": 1,
"number_of_replicas": 1,
"index.refresh_interval": "5s",
"index.codec": "best_compression"
},
"mappings": {
"properties": {
"@timestamp": {
"type": "date"
},
"level": {
"type": "keyword"
},
"message": {
"type": "text",
"analyzer": "standard"
},
"service": {
"type": "keyword"
},
"host": {
"type": "keyword"
},
"clientip": {
"type": "ip"
},
"response": {
"type": "integer"
},
"bytes": {
"type": "long"
},
"geoip": {
"properties": {
"location": {
"type": "geo_point"
},
"country_name": {
"type": "keyword"
},
"city_name": {
"type": "keyword"
}
}
},
"user_agent": {
"properties": {
"name": {
"type": "keyword"
},
"version": {
"type": "keyword"
},
"os": {
"type": "keyword"
}
}
}
}
}
}
1.2 Kibana集成
Kibana配置
# kibana.yml
server.port: 5601
server.host: "0.0.0.0"
server.name: "kibana-server"
# Elasticsearch配置
elasticsearch.hosts: ["http://localhost:9200"]
elasticsearch.username: "kibana_system"
elasticsearch.password: "${KIBANA_PASSWORD}"
# 安全配置
xpack.security.enabled: true
xpack.security.encryptionKey: "something_at_least_32_characters"
xpack.security.session.idleTimeout: "1h"
xpack.security.session.lifespan: "30d"
# 监控配置
xpack.monitoring.enabled: true
xpack.monitoring.kibana.collection.enabled: true
# 日志配置
logging.appenders:
file:
type: file
fileName: /var/log/kibana/kibana.log
layout:
type: json
logging.root:
level: info
appenders: [file]
Kibana仪表板自动化
import requests
import json
from typing import Dict, List, Any, Optional
class KibanaManager:
def __init__(self, kibana_url: str, username: str, password: str):
self.kibana_url = kibana_url.rstrip('/')
self.session = requests.Session()
self.session.auth = (username, password)
self.session.headers.update({
'Content-Type': 'application/json',
'kbn-xsrf': 'true'
})
def create_index_pattern(self, pattern_name: str, time_field: str = '@timestamp') -> bool:
"""创建索引模式"""
index_pattern_data = {
"attributes": {
"title": pattern_name,
"timeFieldName": time_field
}
}
try:
response = self.session.post(
f"{self.kibana_url}/api/saved_objects/index-pattern",
json=index_pattern_data
)
if response.status_code in [200, 201]:
print(f"索引模式 {pattern_name} 创建成功")
return True
else:
print(f"创建索引模式失败: {response.text}")
return False
except Exception as e:
print(f"创建索引模式异常: {e}")
return False
def create_visualization(self, viz_config: Dict[str, Any]) -> Optional[str]:
"""创建可视化"""
try:
response = self.session.post(
f"{self.kibana_url}/api/saved_objects/visualization",
json=viz_config
)
if response.status_code in [200, 201]:
result = response.json()
viz_id = result['id']
print(f"可视化 {viz_config['attributes']['title']} 创建成功,ID: {viz_id}")
return viz_id
else:
print(f"创建可视化失败: {response.text}")
return None
except Exception as e:
print(f"创建可视化异常: {e}")
return None
def create_dashboard(self, dashboard_config: Dict[str, Any]) -> Optional[str]:
"""创建仪表板"""
try:
response = self.session.post(
f"{self.kibana_url}/api/saved_objects/dashboard",
json=dashboard_config
)
if response.status_code in [200, 201]:
result = response.json()
dashboard_id = result['id']
print(f"仪表板 {dashboard_config['attributes']['title']} 创建成功,ID: {dashboard_id}")
return dashboard_id
else:
print(f"创建仪表板失败: {response.text}")
return None
except Exception as e:
print(f"创建仪表板异常: {e}")
return None
def setup_log_monitoring_dashboard(self) -> bool:
"""设置日志监控仪表板"""
# 创建索引模式
if not self.create_index_pattern("logs-*"):
return False
# 创建可视化组件
visualizations = []
# 1. 日志级别分布饼图
log_level_pie = {
"attributes": {
"title": "日志级别分布",
"type": "pie",
"visState": json.dumps({
"title": "日志级别分布",
"type": "pie",
"aggs": [
{
"id": "1",
"type": "count",
"schema": "metric",
"params": {}
},
{
"id": "2",
"type": "terms",
"schema": "segment",
"params": {
"field": "level.keyword",
"size": 10,
"order": "desc",
"orderBy": "1"
}
}
]
}),
"kibanaSavedObjectMeta": {
"searchSourceJSON": json.dumps({
"index": "logs-*",
"query": {
"match_all": {}
}
})
}
}
}
viz_id = self.create_visualization(log_level_pie)
if viz_id:
visualizations.append(viz_id)
# 2. 时间线图
timeline_chart = {
"attributes": {
"title": "日志时间线",
"type": "histogram",
"visState": json.dumps({
"title": "日志时间线",
"type": "histogram",
"aggs": [
{
"id": "1",
"type": "count",
"schema": "metric",
"params": {}
},
{
"id": "2",
"type": "date_histogram",
"schema": "segment",
"params": {
"field": "@timestamp",
"interval": "auto",
"min_doc_count": 1
}
}
]
}),
"kibanaSavedObjectMeta": {
"searchSourceJSON": json.dumps({
"index": "logs-*",
"query": {
"match_all": {}
}
})
}
}
}
viz_id = self.create_visualization(timeline_chart)
if viz_id:
visualizations.append(viz_id)
# 3. 错误日志表格
error_table = {
"attributes": {
"title": "错误日志详情",
"type": "table",
"visState": json.dumps({
"title": "错误日志详情",
"type": "table",
"aggs": [
{
"id": "1",
"type": "count",
"schema": "metric",
"params": {}
},
{
"id": "2",
"type": "terms",
"schema": "bucket",
"params": {
"field": "service.keyword",
"size": 10,
"order": "desc",
"orderBy": "1"
}
}
]
}),
"kibanaSavedObjectMeta": {
"searchSourceJSON": json.dumps({
"index": "logs-*",
"query": {
"term": {
"level.keyword": "ERROR"
}
}
})
}
}
}
viz_id = self.create_visualization(error_table)
if viz_id:
visualizations.append(viz_id)
# 创建仪表板
dashboard_config = {
"attributes": {
"title": "日志监控仪表板",
"type": "dashboard",
"panelsJSON": json.dumps([
{
"id": visualizations[0],
"type": "visualization",
"gridData": {
"x": 0,
"y": 0,
"w": 24,
"h": 15
}
},
{
"id": visualizations[1],
"type": "visualization",
"gridData": {
"x": 24,
"y": 0,
"w": 24,
"h": 15
}
},
{
"id": visualizations[2],
"type": "visualization",
"gridData": {
"x": 0,
"y": 15,
"w": 48,
"h": 15
}
}
] if len(visualizations) >= 3 else [])
}
}
return self.create_dashboard(dashboard_config) is not None
# 使用示例
if __name__ == "__main__":
kibana_manager = KibanaManager(
kibana_url="http://localhost:5601",
username="elastic",
password="your_password"
)
# 设置日志监控仪表板
success = kibana_manager.setup_log_monitoring_dashboard()
if success:
print("日志监控仪表板设置成功")
else:
print("日志监控仪表板设置失败")
1.3 Beats集成
Filebeat配置
# filebeat.yml
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/application/*.log
fields:
service: application
environment: production
fields_under_root: true
multiline.pattern: '^\d{4}-\d{2}-\d{2}'
multiline.negate: true
multiline.match: after
- type: log
enabled: true
paths:
- /var/log/nginx/access.log
fields:
service: nginx
log_type: access
fields_under_root: true
- type: log
enabled: true
paths:
- /var/log/nginx/error.log
fields:
service: nginx
log_type: error
fields_under_root: true
multiline.pattern: '^\d{4}/\d{2}/\d{2}'
multiline.negate: true
multiline.match: after
# 处理器配置
processors:
- add_host_metadata:
when.not.contains.tags: forwarded
- add_docker_metadata: ~
- add_kubernetes_metadata: ~
- drop_fields:
fields: ["agent", "ecs", "host.architecture"]
# 输出配置
output.elasticsearch:
hosts: ["localhost:9200"]
username: "filebeat_writer"
password: "${FILEBEAT_PASSWORD}"
index: "filebeat-%{+yyyy.MM.dd}"
template.name: "filebeat"
template.pattern: "filebeat-*"
template.settings:
index.number_of_shards: 1
index.number_of_replicas: 1
index.refresh_interval: 5s
# 日志配置
logging.level: info
logging.to_files: true
logging.files:
path: /var/log/filebeat
name: filebeat
keepfiles: 7
permissions: 0644
# 监控配置
monitoring.enabled: true
monitoring.elasticsearch:
hosts: ["localhost:9200"]
username: "beats_system"
password: "${BEATS_PASSWORD}"
Metricbeat配置
# metricbeat.yml
metricbeat.config.modules:
path: ${path.config}/modules.d/*.yml
reload.enabled: true
reload.period: 10s
# 系统模块
metricbeat.modules:
- module: system
metricsets:
- cpu
- load
- memory
- network
- process
- process_summary
- socket_summary
- filesystem
- fsstat
enabled: true
period: 10s
processes: ['.*']
- module: elasticsearch
metricsets:
- node
- node_stats
- cluster_stats
- index
- index_recovery
- index_summary
- shard
enabled: true
period: 10s
hosts: ["localhost:9200"]
username: "beats_system"
password: "${BEATS_PASSWORD}"
- module: nginx
metricsets: ["stubstatus"]
enabled: true
period: 10s
hosts: ["http://localhost/nginx_status"]
- module: docker
metricsets:
- container
- cpu
- diskio
- healthcheck
- info
- memory
- network
enabled: true
period: 10s
hosts: ["unix:///var/run/docker.sock"]
# 输出配置
output.elasticsearch:
hosts: ["localhost:9200"]
username: "metricbeat_writer"
password: "${METRICBEAT_PASSWORD}"
index: "metricbeat-%{+yyyy.MM.dd}"
# 处理器
processors:
- add_host_metadata: ~
- add_docker_metadata: ~
# 监控
monitoring.enabled: true
monitoring.elasticsearch:
hosts: ["localhost:9200"]
username: "beats_system"
password: "${BEATS_PASSWORD}"
2. 应用程序集成
2.1 Java应用集成
Spring Boot集成
// pom.xml依赖
/*
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
<dependency>
<groupId>co.elastic.clients</groupId>
<artifactId>elasticsearch-java</artifactId>
<version>8.11.0</version>
</dependency>
*/
// ElasticsearchConfig.java
@Configuration
@EnableElasticsearchRepositories
public class ElasticsearchConfig {
@Value("${elasticsearch.host:localhost}")
private String host;
@Value("${elasticsearch.port:9200}")
private int port;
@Value("${elasticsearch.username:}")
private String username;
@Value("${elasticsearch.password:}")
private String password;
@Bean
public ElasticsearchClient elasticsearchClient() {
// 创建低级客户端
RestClientBuilder builder = RestClient.builder(
new HttpHost(host, port, "https")
);
// 配置认证
if (!username.isEmpty() && !password.isEmpty()) {
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(
AuthScope.ANY,
new UsernamePasswordCredentials(username, password)
);
builder.setHttpClientConfigCallback(httpClientBuilder ->
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)
);
}
// 配置SSL(生产环境应该验证证书)
builder.setHttpClientConfigCallback(httpClientBuilder -> {
try {
SSLContext sslContext = SSLContext.getInstance("TLS");
sslContext.init(null, new TrustManager[] {
new X509TrustManager() {
public X509Certificate[] getAcceptedIssuers() { return null; }
public void checkClientTrusted(X509Certificate[] certs, String authType) {}
public void checkServerTrusted(X509Certificate[] certs, String authType) {}
}
}, new SecureRandom());
return httpClientBuilder.setSSLContext(sslContext)
.setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
RestClient restClient = builder.build();
// 创建传输层
ElasticsearchTransport transport = new RestClientTransport(
restClient, new JacksonJsonpMapper()
);
return new ElasticsearchClient(transport);
}
@Bean
public ElasticsearchTemplate elasticsearchTemplate() {
return new ElasticsearchTemplate(elasticsearchClient());
}
}
// 实体类
@Document(indexName = "products")
public class Product {
@Id
private String id;
@Field(type = FieldType.Text, analyzer = "standard")
private String name;
@Field(type = FieldType.Text, analyzer = "standard")
private String description;
@Field(type = FieldType.Double)
private Double price;
@Field(type = FieldType.Keyword)
private String category;
@Field(type = FieldType.Integer)
private Integer stock;
@Field(type = FieldType.Date, format = DateFormat.date_time)
private LocalDateTime createdAt;
@Field(type = FieldType.Boolean)
private Boolean active;
@Field(type = FieldType.Nested)
private List<Tag> tags;
// 构造函数、getter和setter
public Product() {}
public Product(String name, String description, Double price, String category) {
this.name = name;
this.description = description;
this.price = price;
this.category = category;
this.createdAt = LocalDateTime.now();
this.active = true;
this.tags = new ArrayList<>();
}
// getter和setter方法...
}
@Document
public class Tag {
@Field(type = FieldType.Keyword)
private String name;
@Field(type = FieldType.Double)
private Double weight;
// 构造函数、getter和setter
}
// Repository接口
@Repository
public interface ProductRepository extends ElasticsearchRepository<Product, String> {
// 基于方法名的查询
List<Product> findByName(String name);
List<Product> findByCategory(String category);
List<Product> findByPriceBetween(Double minPrice, Double maxPrice);
List<Product> findByActiveTrue();
// 自定义查询
@Query("{\"bool\": {\"must\": [{\"match\": {\"name\": \"?0\"}}, {\"range\": {\"price\": {\"gte\": ?1, \"lte\": ?2}}}]}}")
List<Product> findByNameAndPriceRange(String name, Double minPrice, Double maxPrice);
// 聚合查询
@Query("{\"aggs\": {\"categories\": {\"terms\": {\"field\": \"category.keyword\"}}}}")
SearchHits<Product> findCategoryAggregation();
}
// 服务类
@Service
public class ProductService {
@Autowired
private ProductRepository productRepository;
@Autowired
private ElasticsearchClient elasticsearchClient;
public Product saveProduct(Product product) {
return productRepository.save(product);
}
public List<Product> searchProducts(String query, String category, Double minPrice, Double maxPrice) {
try {
BoolQuery.Builder boolQuery = new BoolQuery.Builder();
// 文本搜索
if (query != null && !query.isEmpty()) {
boolQuery.must(m -> m
.multiMatch(mm -> mm
.fields("name", "description")
.query(query)
.fuzziness("AUTO")
)
);
}
// 分类过滤
if (category != null && !category.isEmpty()) {
boolQuery.filter(f -> f
.term(t -> t
.field("category.keyword")
.value(category)
)
);
}
// 价格范围过滤
if (minPrice != null || maxPrice != null) {
RangeQuery.Builder rangeQuery = new RangeQuery.Builder()
.field("price");
if (minPrice != null) {
rangeQuery.gte(JsonData.of(minPrice));
}
if (maxPrice != null) {
rangeQuery.lte(JsonData.of(maxPrice));
}
boolQuery.filter(f -> f.range(rangeQuery.build()));
}
// 只返回活跃产品
boolQuery.filter(f -> f
.term(t -> t
.field("active")
.value(true)
)
);
SearchRequest searchRequest = SearchRequest.of(s -> s
.index("products")
.query(q -> q.bool(boolQuery.build()))
.sort(sort -> sort
.field(f -> f
.field("_score")
.order(SortOrder.Desc)
)
)
.size(100)
);
SearchResponse<Product> response = elasticsearchClient.search(
searchRequest, Product.class
);
return response.hits().hits().stream()
.map(hit -> hit.source())
.collect(Collectors.toList());
} catch (Exception e) {
throw new RuntimeException("搜索产品失败", e);
}
}
public Map<String, Long> getCategoryStatistics() {
try {
SearchRequest searchRequest = SearchRequest.of(s -> s
.index("products")
.size(0)
.aggregations("categories", a -> a
.terms(t -> t
.field("category.keyword")
.size(50)
)
)
);
SearchResponse<Product> response = elasticsearchClient.search(
searchRequest, Product.class
);
Map<String, Long> categoryStats = new HashMap<>();
if (response.aggregations() != null) {
StringTermsAggregate categories = response.aggregations()
.get("categories")
.sterms();
for (StringTermsBucket bucket : categories.buckets().array()) {
categoryStats.put(bucket.key().stringValue(), bucket.docCount());
}
}
return categoryStats;
} catch (Exception e) {
throw new RuntimeException("获取分类统计失败", e);
}
}
public List<Product> getRecommendations(String productId, int size) {
try {
// 首先获取当前产品
GetResponse<Product> getResponse = elasticsearchClient.get(
g -> g.index("products").id(productId),
Product.class
);
if (!getResponse.found()) {
return Collections.emptyList();
}
Product currentProduct = getResponse.source();
// 基于相似性推荐
SearchRequest searchRequest = SearchRequest.of(s -> s
.index("products")
.query(q -> q
.bool(b -> b
.should(sh -> sh
.match(m -> m
.field("category.keyword")
.query(currentProduct.getCategory())
.boost(2.0f)
)
)
.should(sh -> sh
.morelikeThis(mlt -> mlt
.fields("name", "description")
.like(l -> l
.document(d -> d
.index("products")
.id(productId)
)
)
.minTermFreq(1)
.maxQueryTerms(12)
)
)
.mustNot(mn -> mn
.term(t -> t
.field("_id")
.value(productId)
)
)
.filter(f -> f
.term(t -> t
.field("active")
.value(true)
)
)
)
)
.size(size)
);
SearchResponse<Product> response = elasticsearchClient.search(
searchRequest, Product.class
);
return response.hits().hits().stream()
.map(hit -> hit.source())
.collect(Collectors.toList());
} catch (Exception e) {
throw new RuntimeException("获取推荐产品失败", e);
}
}
}
// 控制器
@RestController
@RequestMapping("/api/products")
public class ProductController {
@Autowired
private ProductService productService;
@PostMapping
public ResponseEntity<Product> createProduct(@RequestBody Product product) {
Product savedProduct = productService.saveProduct(product);
return ResponseEntity.ok(savedProduct);
}
@GetMapping("/search")
public ResponseEntity<List<Product>> searchProducts(
@RequestParam(required = false) String q,
@RequestParam(required = false) String category,
@RequestParam(required = false) Double minPrice,
@RequestParam(required = false) Double maxPrice) {
List<Product> products = productService.searchProducts(q, category, minPrice, maxPrice);
return ResponseEntity.ok(products);
}
@GetMapping("/categories/stats")
public ResponseEntity<Map<String, Long>> getCategoryStatistics() {
Map<String, Long> stats = productService.getCategoryStatistics();
return ResponseEntity.ok(stats);
}
@GetMapping("/{id}/recommendations")
public ResponseEntity<List<Product>> getRecommendations(
@PathVariable String id,
@RequestParam(defaultValue = "5") int size) {
List<Product> recommendations = productService.getRecommendations(id, size);
return ResponseEntity.ok(recommendations);
}
}
2.2 Python应用集成
Django集成
# settings.py
ELASTICSEARCH_DSL = {
'default': {
'hosts': 'localhost:9200',
'http_auth': ('username', 'password'),
'use_ssl': True,
'verify_certs': False,
'ssl_show_warn': False,
'timeout': 30,
'max_retries': 3,
'retry_on_timeout': True
},
}
INSTALLED_APPS = [
# ... 其他应用
'django_elasticsearch_dsl',
'myapp',
]
# models.py
from django.db import models
from django.contrib.auth.models import User
class Article(models.Model):
title = models.CharField(max_length=200)
content = models.TextField()
author = models.ForeignKey(User, on_delete=models.CASCADE)
category = models.CharField(max_length=50)
tags = models.ManyToManyField('Tag', blank=True)
published = models.BooleanField(default=False)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
view_count = models.IntegerField(default=0)
def __str__(self):
return self.title
class Tag(models.Model):
name = models.CharField(max_length=50, unique=True)
def __str__(self):
return self.name
class Comment(models.Model):
article = models.ForeignKey(Article, on_delete=models.CASCADE, related_name='comments')
author = models.ForeignKey(User, on_delete=models.CASCADE)
content = models.TextField()
created_at = models.DateTimeField(auto_now_add=True)
def __str__(self):
return f'Comment by {self.author.username} on {self.article.title}'
# documents.py
from django_elasticsearch_dsl import Document, Index, fields
from django_elasticsearch_dsl.registries import registry
from .models import Article, Tag, Comment
# 创建索引
articles_index = Index('articles')
articles_index.settings(
number_of_shards=1,
number_of_replicas=1,
refresh_interval='5s'
)
@registry.register_document
@articles_index.doc_type
class ArticleDocument(Document):
# 基本字段
title = fields.TextField(
analyzer='standard',
fields={
'keyword': fields.KeywordField(),
'suggest': fields.CompletionField()
}
)
content = fields.TextField(
analyzer='standard'
)
category = fields.KeywordField()
# 嵌套字段
author = fields.ObjectField(
properties={
'id': fields.IntegerField(),
'username': fields.KeywordField(),
'email': fields.KeywordField(),
'first_name': fields.TextField(),
'last_name': fields.TextField(),
}
)
# 多值字段
tags = fields.NestedField(
properties={
'id': fields.IntegerField(),
'name': fields.KeywordField(),
}
)
# 关联字段
comments = fields.NestedField(
properties={
'id': fields.IntegerField(),
'content': fields.TextField(),
'author_username': fields.KeywordField(),
'created_at': fields.DateField(),
}
)
# 时间字段
created_at = fields.DateField()
updated_at = fields.DateField()
# 布尔字段
published = fields.BooleanField()
# 数值字段
view_count = fields.IntegerField()
class Django:
model = Article
fields = []
related_models = [Tag, Comment]
def get_queryset(self):
"""返回应该被索引的查询集"""
return super().get_queryset().select_related('author').prefetch_related('tags', 'comments')
def get_instances_from_related(self, related_instance):
"""当相关模型更新时,返回需要更新的文档实例"""
if isinstance(related_instance, Tag):
return related_instance.article_set.all()
elif isinstance(related_instance, Comment):
return [related_instance.article]
return []
def prepare_author(self, instance):
"""准备作者字段数据"""
return {
'id': instance.author.id,
'username': instance.author.username,
'email': instance.author.email,
'first_name': instance.author.first_name,
'last_name': instance.author.last_name,
}
def prepare_tags(self, instance):
"""准备标签字段数据"""
return [
{
'id': tag.id,
'name': tag.name,
}
for tag in instance.tags.all()
]
def prepare_comments(self, instance):
"""准备评论字段数据"""
return [
{
'id': comment.id,
'content': comment.content,
'author_username': comment.author.username,
'created_at': comment.created_at,
}
for comment in instance.comments.all()
]
# search.py
from elasticsearch_dsl import Q, A
from django_elasticsearch_dsl_drf.constants import LOOKUP_FILTER_RANGE, LOOKUP_QUERY_IN
from django_elasticsearch_dsl_drf.filter_backends import (
FilteringFilterBackend,
SearchFilterBackend,
SuggesterFilterBackend,
FunctionalSuggesterFilterBackend,
OrderingFilterBackend,
HighlightBackend,
FacetedSearchFilterBackend,
)
from django_elasticsearch_dsl_drf.viewsets import DocumentViewSet
from django_elasticsearch_dsl_drf.pagination import PageNumberPagination
from rest_framework import serializers
from .documents import ArticleDocument
class ArticleDocumentSerializer(serializers.Serializer):
"""文章文档序列化器"""
id = serializers.IntegerField(read_only=True)
title = serializers.CharField(read_only=True)
content = serializers.CharField(read_only=True)
category = serializers.CharField(read_only=True)
author = serializers.DictField(read_only=True)
tags = serializers.ListField(read_only=True)
comments = serializers.ListField(read_only=True)
published = serializers.BooleanField(read_only=True)
created_at = serializers.DateTimeField(read_only=True)
updated_at = serializers.DateTimeField(read_only=True)
view_count = serializers.IntegerField(read_only=True)
# 搜索相关字段
score = serializers.SerializerMethodField()
highlight = serializers.SerializerMethodField()
def get_score(self, obj):
"""获取搜索评分"""
return getattr(obj.meta, 'score', None)
def get_highlight(self, obj):
"""获取高亮信息"""
return getattr(obj.meta, 'highlight', {})
class ArticleDocumentViewSet(DocumentViewSet):
"""文章搜索视图集"""
document = ArticleDocument
serializer_class = ArticleDocumentSerializer
pagination_class = PageNumberPagination
filter_backends = [
FilteringFilterBackend,
SearchFilterBackend,
SuggesterFilterBackend,
FunctionalSuggesterFilterBackend,
OrderingFilterBackend,
HighlightBackend,
FacetedSearchFilterBackend,
]
# 搜索字段
search_fields = {
'title': {'boost': 2.0},
'content': {'boost': 1.0},
'author.username': {'boost': 1.5},
'tags.name': {'boost': 1.2},
}
# 过滤字段
filter_fields = {
'category': 'category.keyword',
'published': 'published',
'author_id': 'author.id',
'created_at': {
'field': 'created_at',
'lookups': [LOOKUP_FILTER_RANGE],
},
'view_count': {
'field': 'view_count',
'lookups': [LOOKUP_FILTER_RANGE],
},
'tags': {
'field': 'tags.name.keyword',
'lookups': [LOOKUP_QUERY_IN],
},
}
# 排序字段
ordering_fields = {
'created_at': 'created_at',
'updated_at': 'updated_at',
'view_count': 'view_count',
'score': '_score',
}
ordering = ('-created_at',)
# 高亮字段
highlight_fields = {
'title': {
'enabled': True,
'options': {
'pre_tags': ['<mark>'],
'post_tags': ['</mark>'],
}
},
'content': {
'enabled': True,
'options': {
'pre_tags': ['<mark>'],
'post_tags': ['</mark>'],
'fragment_size': 200,
'number_of_fragments': 3,
}
},
}
# 建议字段
suggester_fields = {
'title_suggest': {
'field': 'title.suggest',
'suggesters': [
'completion',
],
},
}
# 分面搜索字段
faceted_search_fields = {
'category': {
'field': 'category.keyword',
'enabled': True,
},
'tags': {
'field': 'tags.name.keyword',
'enabled': True,
},
'author': {
'field': 'author.username.keyword',
'enabled': True,
},
'published': {
'field': 'published',
'enabled': True,
},
}
def get_queryset(self):
"""自定义查询集"""
queryset = super().get_queryset()
# 只返回已发布的文章(除非是管理员)
if not self.request.user.is_staff:
queryset = queryset.filter('term', published=True)
return queryset
# 高级搜索服务
class AdvancedSearchService:
"""高级搜索服务"""
@staticmethod
def semantic_search(query: str, size: int = 10):
"""语义搜索"""
search = ArticleDocument.search()
# 多字段搜索
multi_match = Q(
'multi_match',
query=query,
fields=[
'title^3',
'content^1',
'tags.name^2',
'author.username^1.5'
],
type='best_fields',
fuzziness='AUTO'
)
# 短语匹配
phrase_match = Q(
'multi_match',
query=query,
fields=['title', 'content'],
type='phrase',
boost=2.0
)
# 组合查询
combined_query = Q(
'bool',
should=[multi_match, phrase_match],
minimum_should_match=1
)
search = search.query(combined_query)
search = search.filter('term', published=True)
search = search[:size]
# 添加高亮
search = search.highlight(
'title',
'content',
pre_tags=['<mark>'],
post_tags=['</mark>'],
fragment_size=200,
number_of_fragments=3
)
return search.execute()
@staticmethod
def trending_articles(days: int = 7, size: int = 10):
"""热门文章"""
from datetime import datetime, timedelta
search = ArticleDocument.search()
# 时间范围过滤
date_range = datetime.now() - timedelta(days=days)
search = search.filter(
'range',
created_at={'gte': date_range}
)
# 按浏览量排序
search = search.sort('-view_count', '-created_at')
search = search.filter('term', published=True)
search = search[:size]
return search.execute()
@staticmethod
def related_articles(article_id: int, size: int = 5):
"""相关文章推荐"""
# 获取原文章
try:
article_doc = ArticleDocument.get(id=article_id)
except:
return []
search = ArticleDocument.search()
# More Like This查询
mlt_query = Q(
'more_like_this',
fields=['title', 'content', 'tags.name'],
like=[
{
'_index': 'articles',
'_id': article_id
}
],
min_term_freq=1,
max_query_terms=12,
min_doc_freq=1
)
# 排除原文章
search = search.query(mlt_query)
search = search.exclude('term', _id=article_id)
search = search.filter('term', published=True)
search = search[:size]
return search.execute()
@staticmethod
def category_statistics():
"""分类统计"""
search = ArticleDocument.search()
search = search.filter('term', published=True)
# 分类聚合
search.aggs.bucket(
'categories',
'terms',
field='category.keyword',
size=50
)
# 标签聚合
search.aggs.bucket(
'tags',
'terms',
field='tags.name.keyword',
size=20
)
# 作者聚合
search.aggs.bucket(
'authors',
'terms',
field='author.username.keyword',
size=10
)
# 时间聚合
search.aggs.bucket(
'monthly_posts',
'date_histogram',
field='created_at',
calendar_interval='month'
)
search = search[:0] # 不返回文档,只返回聚合结果
response = search.execute()
return {
'categories': [
{'name': bucket.key, 'count': bucket.doc_count}
for bucket in response.aggregations.categories.buckets
],
'tags': [
{'name': bucket.key, 'count': bucket.doc_count}
for bucket in response.aggregations.tags.buckets
],
'authors': [
{'username': bucket.key, 'count': bucket.doc_count}
for bucket in response.aggregations.authors.buckets
],
'monthly_posts': [
{
'date': bucket.key_as_string,
'count': bucket.doc_count
}
for bucket in response.aggregations.monthly_posts.buckets
]
}
# views.py
from django.http import JsonResponse
from django.views.decorators.http import require_http_methods
from django.views.decorators.csrf import csrf_exempt
from .search import AdvancedSearchService
import json
@require_http_methods(["GET"])
def semantic_search(request):
"""语义搜索API"""
query = request.GET.get('q', '')
size = int(request.GET.get('size', 10))
if not query:
return JsonResponse({'error': '查询参数不能为空'}, status=400)
try:
results = AdvancedSearchService.semantic_search(query, size)
articles = []
for hit in results:
article = {
'id': hit.meta.id,
'title': hit.title,
'content': hit.content[:200] + '...' if len(hit.content) > 200 else hit.content,
'category': hit.category,
'author': hit.author,
'created_at': hit.created_at,
'view_count': hit.view_count,
'score': hit.meta.score,
}
# 添加高亮信息
if hasattr(hit.meta, 'highlight'):
article['highlight'] = hit.meta.highlight
articles.append(article)
return JsonResponse({
'total': results.hits.total.value,
'articles': articles
})
except Exception as e:
return JsonResponse({'error': str(e)}, status=500)
@require_http_methods(["GET"])
def trending_articles(request):
"""热门文章API"""
days = int(request.GET.get('days', 7))
size = int(request.GET.get('size', 10))
try:
results = AdvancedSearchService.trending_articles(days, size)
articles = [
{
'id': hit.meta.id,
'title': hit.title,
'category': hit.category,
'author': hit.author,
'created_at': hit.created_at,
'view_count': hit.view_count,
}
for hit in results
]
return JsonResponse({'articles': articles})
except Exception as e:
return JsonResponse({'error': str(e)}, status=500)
@require_http_methods(["GET"])
def related_articles(request, article_id):
"""相关文章API"""
size = int(request.GET.get('size', 5))
try:
results = AdvancedSearchService.related_articles(article_id, size)
articles = [
{
'id': hit.meta.id,
'title': hit.title,
'category': hit.category,
'author': hit.author,
'created_at': hit.created_at,
}
for hit in results
]
return JsonResponse({'articles': articles})
except Exception as e:
return JsonResponse({'error': str(e)}, status=500)
@require_http_methods(["GET"])
def category_statistics(request):
"""分类统计API"""
try:
stats = AdvancedSearchService.category_statistics()
return JsonResponse(stats)
except Exception as e:
return JsonResponse({'error': str(e)}, status=500)
# urls.py
from django.urls import path, include
from rest_framework.routers import DefaultRouter
from . import views
from .search import ArticleDocumentViewSet
router = DefaultRouter()
router.register(r'search/articles', ArticleDocumentViewSet, basename='article-search')
urlpatterns = [
path('api/', include(router.urls)),
path('api/semantic-search/', views.semantic_search, name='semantic-search'),
path('api/trending/', views.trending_articles, name='trending-articles'),
path('api/articles/<int:article_id>/related/', views.related_articles, name='related-articles'),
path('api/statistics/', views.category_statistics, name='category-statistics'),
]
2.3 Node.js集成
Express.js集成
// package.json
/*
{
"dependencies": {
"@elastic/elasticsearch": "^8.11.0",
"express": "^4.18.0",
"body-parser": "^1.20.0",
"cors": "^2.8.5",
"helmet": "^7.0.0",
"morgan": "^1.10.0",
"dotenv": "^16.0.0"
}
}
*/
// config/elasticsearch.js
const { Client } = require('@elastic/elasticsearch');
require('dotenv').config();
class ElasticsearchClient {
constructor() {
this.client = new Client({
node: process.env.ELASTICSEARCH_URL || 'https://localhost:9200',
auth: {
username: process.env.ELASTICSEARCH_USERNAME || 'elastic',
password: process.env.ELASTICSEARCH_PASSWORD || 'password'
},
tls: {
rejectUnauthorized: false // 生产环境应该设置为true
},
requestTimeout: 30000,
maxRetries: 3,
resurrectStrategy: 'ping'
});
}
async testConnection() {
try {
const response = await this.client.ping();
console.log('Elasticsearch连接成功');
return true;
} catch (error) {
console.error('Elasticsearch连接失败:', error);
return false;
}
}
getClient() {
return this.client;
}
}
module.exports = new ElasticsearchClient();
// models/Product.js
class Product {
constructor(data) {
this.id = data.id;
this.name = data.name;
this.description = data.description;
this.price = data.price;
this.category = data.category;
this.brand = data.brand;
this.tags = data.tags || [];
this.stock = data.stock || 0;
this.rating = data.rating || 0;
this.reviews_count = data.reviews_count || 0;
this.created_at = data.created_at || new Date();
this.updated_at = data.updated_at || new Date();
this.active = data.active !== undefined ? data.active : true;
}
toElasticsearchDoc() {
return {
id: this.id,
name: this.name,
description: this.description,
price: this.price,
category: this.category,
brand: this.brand,
tags: this.tags,
stock: this.stock,
rating: this.rating,
reviews_count: this.reviews_count,
created_at: this.created_at,
updated_at: this.updated_at,
active: this.active,
suggest: {
input: [this.name, this.brand, ...this.tags],
weight: this.rating * 10 + this.reviews_count
}
};
}
static fromElasticsearchDoc(doc) {
return new Product(doc._source || doc);
}
}
module.exports = Product;
// services/ProductService.js
const elasticsearchClient = require('../config/elasticsearch');
const Product = require('../models/Product');
class ProductService {
constructor() {
this.client = elasticsearchClient.getClient();
this.index = 'products';
}
async createIndex() {
try {
const exists = await this.client.indices.exists({ index: this.index });
if (!exists) {
await this.client.indices.create({
index: this.index,
body: {
settings: {
number_of_shards: 1,
number_of_replicas: 1,
analysis: {
analyzer: {
product_analyzer: {
type: 'custom',
tokenizer: 'standard',
filter: ['lowercase', 'stop', 'snowball']
}
}
}
},
mappings: {
properties: {
name: {
type: 'text',
analyzer: 'product_analyzer',
fields: {
keyword: { type: 'keyword' },
suggest: { type: 'completion' }
}
},
description: {
type: 'text',
analyzer: 'product_analyzer'
},
price: { type: 'double' },
category: { type: 'keyword' },
brand: { type: 'keyword' },
tags: { type: 'keyword' },
stock: { type: 'integer' },
rating: { type: 'float' },
reviews_count: { type: 'integer' },
created_at: { type: 'date' },
updated_at: { type: 'date' },
active: { type: 'boolean' },
suggest: {
type: 'completion',
analyzer: 'simple',
preserve_separators: true,
preserve_position_increments: true,
max_input_length: 50
}
}
}
}
});
console.log(`索引 ${this.index} 创建成功`);
}
} catch (error) {
console.error('创建索引失败:', error);
throw error;
}
}
async indexProduct(product) {
try {
const response = await this.client.index({
index: this.index,
id: product.id,
body: product.toElasticsearchDoc()
});
return response;
} catch (error) {
console.error('索引产品失败:', error);
throw error;
}
}
async bulkIndexProducts(products) {
try {
const body = products.flatMap(product => [
{ index: { _index: this.index, _id: product.id } },
product.toElasticsearchDoc()
]);
const response = await this.client.bulk({ body });
if (response.errors) {
const erroredDocuments = [];
response.items.forEach((action, i) => {
const operation = Object.keys(action)[0];
if (action[operation].error) {
erroredDocuments.push({
status: action[operation].status,
error: action[operation].error,
operation: body[i * 2],
document: body[i * 2 + 1]
});
}
});
console.error('批量索引部分失败:', erroredDocuments);
}
return response;
} catch (error) {
console.error('批量索引产品失败:', error);
throw error;
}
}
async searchProducts(query, filters = {}, options = {}) {
try {
const {
category,
brand,
minPrice,
maxPrice,
minRating,
tags,
inStock
} = filters;
const {
from = 0,
size = 20,
sortBy = 'relevance',
sortOrder = 'desc'
} = options;
// 构建查询
const must = [];
const filter = [];
// 文本搜索
if (query) {
must.push({
multi_match: {
query: query,
fields: [
'name^3',
'description^1',
'brand^2',
'tags^1.5'
],
type: 'best_fields',
fuzziness: 'AUTO'
}
});
} else {
must.push({ match_all: {} });
}
// 过滤条件
filter.push({ term: { active: true } });
if (category) {
filter.push({ term: { category } });
}
if (brand) {
filter.push({ term: { brand } });
}
if (minPrice !== undefined || maxPrice !== undefined) {
const priceRange = {};
if (minPrice !== undefined) priceRange.gte = minPrice;
if (maxPrice !== undefined) priceRange.lte = maxPrice;
filter.push({ range: { price: priceRange } });
}
if (minRating !== undefined) {
filter.push({ range: { rating: { gte: minRating } } });
}
if (tags && tags.length > 0) {
filter.push({ terms: { tags } });
}
if (inStock) {
filter.push({ range: { stock: { gt: 0 } } });
}
// 排序
let sort = [];
switch (sortBy) {
case 'price_asc':
sort = [{ price: { order: 'asc' } }];
break;
case 'price_desc':
sort = [{ price: { order: 'desc' } }];
break;
case 'rating':
sort = [{ rating: { order: 'desc' } }];
break;
case 'newest':
sort = [{ created_at: { order: 'desc' } }];
break;
case 'popularity':
sort = [{ reviews_count: { order: 'desc' } }];
break;
default:
sort = ['_score'];
}
const searchBody = {
query: {
bool: {
must,
filter
}
},
sort,
from,
size,
highlight: {
fields: {
name: {},
description: {
fragment_size: 150,
number_of_fragments: 3
}
},
pre_tags: ['<mark>'],
post_tags: ['</mark>']
},
aggs: {
categories: {
terms: {
field: 'category',
size: 20
}
},
brands: {
terms: {
field: 'brand',
size: 20
}
},
price_ranges: {
range: {
field: 'price',
ranges: [
{ to: 50 },
{ from: 50, to: 100 },
{ from: 100, to: 200 },
{ from: 200, to: 500 },
{ from: 500 }
]
}
},
avg_price: {
avg: { field: 'price' }
}
}
};
const response = await this.client.search({
index: this.index,
body: searchBody
});
return {
total: response.hits.total.value,
products: response.hits.hits.map(hit => ({
...Product.fromElasticsearchDoc(hit),
score: hit._score,
highlight: hit.highlight
})),
aggregations: response.aggregations
};
} catch (error) {
console.error('搜索产品失败:', error);
throw error;
}
}
async getProductSuggestions(query, size = 10) {
try {
const response = await this.client.search({
index: this.index,
body: {
suggest: {
product_suggest: {
prefix: query,
completion: {
field: 'suggest',
size: size,
skip_duplicates: true
}
}
}
}
});
return response.suggest.product_suggest[0].options.map(option => ({
text: option.text,
score: option._score,
product: Product.fromElasticsearchDoc(option._source)
}));
} catch (error) {
console.error('获取产品建议失败:', error);
throw error;
}
}
async getRecommendations(productId, size = 5) {
try {
// 首先获取产品信息
const productResponse = await this.client.get({
index: this.index,
id: productId
});
const product = Product.fromElasticsearchDoc(productResponse);
// 基于相似性推荐
const response = await this.client.search({
index: this.index,
body: {
query: {
bool: {
should: [
{
term: {
category: {
value: product.category,
boost: 2.0
}
}
},
{
term: {
brand: {
value: product.brand,
boost: 1.5
}
}
},
{
terms: {
tags: product.tags,
boost: 1.2
}
},
{
more_like_this: {
fields: ['name', 'description'],
like: [
{
_index: this.index,
_id: productId
}
],
min_term_freq: 1,
max_query_terms: 12
}
}
],
must_not: [
{ term: { _id: productId } }
],
filter: [
{ term: { active: true } }
]
}
},
size
}
});
return response.hits.hits.map(hit => ({
...Product.fromElasticsearchDoc(hit),
score: hit._score
}));
} catch (error) {
console.error('获取推荐产品失败:', error);
throw error;
}
}
}
module.exports = ProductService;
// routes/products.js
const express = require('express');
const router = express.Router();
const ProductService = require('../services/ProductService');
const Product = require('../models/Product');
const productService = new ProductService();
// 搜索产品
router.get('/search', async (req, res) => {
try {
const {
q: query,
category,
brand,
min_price: minPrice,
max_price: maxPrice,
min_rating: minRating,
tags,
in_stock: inStock,
page = 1,
size = 20,
sort_by: sortBy = 'relevance'
} = req.query;
const filters = {
category,
brand,
minPrice: minPrice ? parseFloat(minPrice) : undefined,
maxPrice: maxPrice ? parseFloat(maxPrice) : undefined,
minRating: minRating ? parseFloat(minRating) : undefined,
tags: tags ? tags.split(',') : undefined,
inStock: inStock === 'true'
};
const options = {
from: (page - 1) * size,
size: parseInt(size),
sortBy
};
const result = await productService.searchProducts(query, filters, options);
res.json({
success: true,
data: {
total: result.total,
page: parseInt(page),
size: parseInt(size),
products: result.products,
aggregations: result.aggregations
}
});
} catch (error) {
console.error('搜索产品错误:', error);
res.status(500).json({
success: false,
error: '搜索产品失败'
});
}
});
// 获取产品建议
router.get('/suggest', async (req, res) => {
try {
const { q: query, size = 10 } = req.query;
if (!query) {
return res.status(400).json({
success: false,
error: '查询参数不能为空'
});
}
const suggestions = await productService.getProductSuggestions(query, parseInt(size));
res.json({
success: true,
data: suggestions
});
} catch (error) {
console.error('获取产品建议错误:', error);
res.status(500).json({
success: false,
error: '获取产品建议失败'
});
}
});
// 获取推荐产品
router.get('/:id/recommendations', async (req, res) => {
try {
const { id } = req.params;
const { size = 5 } = req.query;
const recommendations = await productService.getRecommendations(id, parseInt(size));
res.json({
success: true,
data: recommendations
});
} catch (error) {
console.error('获取推荐产品错误:', error);
res.status(500).json({
success: false,
error: '获取推荐产品失败'
});
}
});
// 创建产品
router.post('/', async (req, res) => {
try {
const product = new Product(req.body);
const result = await productService.indexProduct(product);
res.status(201).json({
success: true,
data: {
id: result._id,
product
}
});
} catch (error) {
console.error('创建产品错误:', error);
res.status(500).json({
success: false,
error: '创建产品失败'
});
}
});
// 批量创建产品
router.post('/bulk', async (req, res) => {
try {
const { products } = req.body;
if (!Array.isArray(products)) {
return res.status(400).json({
success: false,
error: '产品数据必须是数组格式'
});
}
const productInstances = products.map(data => new Product(data));
const result = await productService.bulkIndexProducts(productInstances);
res.json({
success: true,
data: {
indexed: result.items.length,
errors: result.errors
}
});
} catch (error) {
console.error('批量创建产品错误:', error);
res.status(500).json({
success: false,
error: '批量创建产品失败'
});
}
});
module.exports = router;
// app.js
const express = require('express');
const cors = require('cors');
const helmet = require('helmet');
const morgan = require('morgan');
const bodyParser = require('body-parser');
require('dotenv').config();
const elasticsearchClient = require('./config/elasticsearch');
const ProductService = require('./services/ProductService');
const productRoutes = require('./routes/products');
const app = express();
const PORT = process.env.PORT || 3000;
// 中间件
app.use(helmet());
app.use(cors());
app.use(morgan('combined'));
app.use(bodyParser.json({ limit: '10mb' }));
app.use(bodyParser.urlencoded({ extended: true }));
// 路由
app.use('/api/products', productRoutes);
// 健康检查
app.get('/health', async (req, res) => {
try {
const esConnected = await elasticsearchClient.testConnection();
res.json({
status: 'ok',
timestamp: new Date().toISOString(),
services: {
elasticsearch: esConnected ? 'connected' : 'disconnected'
}
});
} catch (error) {
res.status(500).json({
status: 'error',
timestamp: new Date().toISOString(),
error: error.message
});
}
});
// 错误处理中间件
app.use((err, req, res, next) => {
console.error('未处理的错误:', err);
res.status(500).json({
success: false,
error: '服务器内部错误'
});
});
// 404处理
app.use((req, res) => {
res.status(404).json({
success: false,
error: '接口不存在'
});
});
// 启动服务器
async function startServer() {
try {
// 测试Elasticsearch连接
const connected = await elasticsearchClient.testConnection();
if (!connected) {
console.error('无法连接到Elasticsearch,服务器启动失败');
process.exit(1);
}
// 创建索引
const productService = new ProductService();
await productService.createIndex();
app.listen(PORT, () => {
console.log(`服务器运行在端口 ${PORT}`);
console.log(`健康检查: http://localhost:${PORT}/health`);
console.log(`API文档: http://localhost:${PORT}/api/products/search`);
});
} catch (error) {
console.error('启动服务器失败:', error);
process.exit(1);
}
}
startServer();
module.exports = app;
3.3 Python集成
3.3.1 Django集成
# settings.py
from elasticsearch import Elasticsearch
from elasticsearch_dsl import connections
# Elasticsearch配置
ELASTICSEARCH_DSL = {
'default': {
'hosts': 'localhost:9200',
'http_auth': ('elastic', 'password'),
'use_ssl': True,
'verify_certs': False,
'ssl_show_warn': False,
'timeout': 30,
'max_retries': 3,
'retry_on_timeout': True
},
}
# 建立连接
connections.configure(**ELASTICSEARCH_DSL)
# models.py
from django.db import models
from elasticsearch_dsl import Document, Text, Keyword, Integer, Float, Date, Boolean, Completion
from elasticsearch_dsl.connections import connections
class Product(models.Model):
name = models.CharField(max_length=200)
description = models.TextField()
price = models.DecimalField(max_digits=10, decimal_places=2)
category = models.CharField(max_length=100)
brand = models.CharField(max_length=100)
stock = models.IntegerField(default=0)
rating = models.FloatField(default=0.0)
reviews_count = models.IntegerField(default=0)
active = models.BooleanField(default=True)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
def __str__(self):
return self.name
def to_elasticsearch(self):
"""转换为Elasticsearch文档"""
return {
'id': self.id,
'name': self.name,
'description': self.description,
'price': float(self.price),
'category': self.category,
'brand': self.brand,
'stock': self.stock,
'rating': self.rating,
'reviews_count': self.reviews_count,
'active': self.active,
'created_at': self.created_at,
'updated_at': self.updated_at,
'suggest': {
'input': [self.name, self.brand],
'weight': int(self.rating * 10 + self.reviews_count)
}
}
class ProductDocument(Document):
"""产品Elasticsearch文档"""
id = Integer()
name = Text(
analyzer='standard',
fields={
'keyword': Keyword(),
'suggest': Completion()
}
)
description = Text(analyzer='standard')
price = Float()
category = Keyword()
brand = Keyword()
stock = Integer()
rating = Float()
reviews_count = Integer()
active = Boolean()
created_at = Date()
updated_at = Date()
suggest = Completion()
class Index:
name = 'products'
settings = {
'number_of_shards': 1,
'number_of_replicas': 1
}
def save(self, **kwargs):
return super().save(**kwargs)
@classmethod
def from_django_model(cls, product):
"""从Django模型创建文档"""
doc = cls(
meta={'id': product.id},
**product.to_elasticsearch()
)
return doc
# services.py
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
from django.conf import settings
from .models import Product, ProductDocument
import logging
logger = logging.getLogger(__name__)
class ElasticsearchService:
def __init__(self):
self.es = Elasticsearch(**settings.ELASTICSEARCH_DSL['default'])
self.index = 'products'
def create_index(self):
"""创建索引"""
try:
ProductDocument.init(using=self.es)
logger.info(f"索引 {self.index} 创建成功")
except Exception as e:
logger.error(f"创建索引失败: {e}")
raise
def index_product(self, product):
"""索引单个产品"""
try:
doc = ProductDocument.from_django_model(product)
doc.save(using=self.es)
logger.info(f"产品 {product.id} 索引成功")
except Exception as e:
logger.error(f"索引产品失败: {e}")
raise
def bulk_index_products(self, products):
"""批量索引产品"""
try:
actions = []
for product in products:
doc = product.to_elasticsearch()
action = {
'_index': self.index,
'_id': product.id,
'_source': doc
}
actions.append(action)
success, failed = bulk(self.es, actions)
logger.info(f"批量索引完成: 成功 {success}, 失败 {len(failed)}")
return success, failed
except Exception as e:
logger.error(f"批量索引失败: {e}")
raise
def search_products(self, query=None, filters=None, page=1, size=20):
"""搜索产品"""
try:
search = ProductDocument.search(using=self.es)
# 构建查询
if query:
search = search.query(
'multi_match',
query=query,
fields=['name^3', 'description^1', 'brand^2'],
fuzziness='AUTO'
)
# 应用过滤器
if filters:
if filters.get('category'):
search = search.filter('term', category=filters['category'])
if filters.get('brand'):
search = search.filter('term', brand=filters['brand'])
if filters.get('min_price') or filters.get('max_price'):
price_range = {}
if filters.get('min_price'):
price_range['gte'] = filters['min_price']
if filters.get('max_price'):
price_range['lte'] = filters['max_price']
search = search.filter('range', price=price_range)
if filters.get('in_stock'):
search = search.filter('range', stock={'gt': 0})
# 分页
start = (page - 1) * size
search = search[start:start + size]
# 添加聚合
search.aggs.bucket('categories', 'terms', field='category')
search.aggs.bucket('brands', 'terms', field='brand')
# 执行搜索
response = search.execute()
return {
'total': response.hits.total.value,
'products': [hit.to_dict() for hit in response],
'aggregations': response.aggregations.to_dict()
}
except Exception as e:
logger.error(f"搜索产品失败: {e}")
raise
def get_suggestions(self, query, size=10):
"""获取搜索建议"""
try:
search = ProductDocument.search(using=self.es)
search = search.suggest(
'product_suggest',
query,
completion={
'field': 'suggest',
'size': size,
'skip_duplicates': True
}
)
response = search.execute()
suggestions = []
for option in response.suggest.product_suggest[0].options:
suggestions.append({
'text': option.text,
'score': option._score,
'product': option._source.to_dict()
})
return suggestions
except Exception as e:
logger.error(f"获取建议失败: {e}")
raise
# signals.py
from django.db.models.signals import post_save, post_delete
from django.dispatch import receiver
from .models import Product
from .services import ElasticsearchService
import logging
logger = logging.getLogger(__name__)
es_service = ElasticsearchService()
@receiver(post_save, sender=Product)
def update_product_index(sender, instance, created, **kwargs):
"""产品保存时更新索引"""
try:
es_service.index_product(instance)
logger.info(f"产品 {instance.id} 索引更新成功")
except Exception as e:
logger.error(f"更新产品索引失败: {e}")
@receiver(post_delete, sender=Product)
def delete_product_index(sender, instance, **kwargs):
"""产品删除时从索引中移除"""
try:
ProductDocument.get(id=instance.id, using=es_service.es).delete()
logger.info(f"产品 {instance.id} 从索引中删除成功")
except Exception as e:
logger.error(f"从索引删除产品失败: {e}")
# views.py
from django.http import JsonResponse
from django.views import View
from django.views.decorators.csrf import csrf_exempt
from django.utils.decorators import method_decorator
from .services import ElasticsearchService
import json
es_service = ElasticsearchService()
@method_decorator(csrf_exempt, name='dispatch')
class ProductSearchView(View):
def get(self, request):
try:
query = request.GET.get('q', '')
category = request.GET.get('category')
brand = request.GET.get('brand')
min_price = request.GET.get('min_price')
max_price = request.GET.get('max_price')
in_stock = request.GET.get('in_stock') == 'true'
page = int(request.GET.get('page', 1))
size = int(request.GET.get('size', 20))
filters = {}
if category:
filters['category'] = category
if brand:
filters['brand'] = brand
if min_price:
filters['min_price'] = float(min_price)
if max_price:
filters['max_price'] = float(max_price)
if in_stock:
filters['in_stock'] = in_stock
result = es_service.search_products(
query=query,
filters=filters,
page=page,
size=size
)
return JsonResponse({
'success': True,
'data': result
})
except Exception as e:
return JsonResponse({
'success': False,
'error': str(e)
}, status=500)
class ProductSuggestView(View):
def get(self, request):
try:
query = request.GET.get('q', '')
size = int(request.GET.get('size', 10))
if not query:
return JsonResponse({
'success': False,
'error': '查询参数不能为空'
}, status=400)
suggestions = es_service.get_suggestions(query, size)
return JsonResponse({
'success': True,
'data': suggestions
})
except Exception as e:
return JsonResponse({
'success': False,
'error': str(e)
}, status=500)
# urls.py
from django.urls import path
from .views import ProductSearchView, ProductSuggestView
urlpatterns = [
path('search/', ProductSearchView.as_view(), name='product_search'),
path('suggest/', ProductSuggestView.as_view(), name='product_suggest'),
]
# management/commands/rebuild_index.py
from django.core.management.base import BaseCommand
from myapp.models import Product
from myapp.services import ElasticsearchService
class Command(BaseCommand):
help = '重建Elasticsearch索引'
def add_arguments(self, parser):
parser.add_argument(
'--batch-size',
type=int,
default=1000,
help='批量处理大小'
)
def handle(self, *args, **options):
es_service = ElasticsearchService()
batch_size = options['batch_size']
self.stdout.write('开始重建索引...')
# 创建索引
es_service.create_index()
# 获取所有产品
total_products = Product.objects.count()
self.stdout.write(f'总共 {total_products} 个产品需要索引')
# 批量处理
for i in range(0, total_products, batch_size):
products = Product.objects.all()[i:i + batch_size]
success, failed = es_service.bulk_index_products(products)
self.stdout.write(
f'批次 {i//batch_size + 1}: 成功 {success}, 失败 {len(failed)}'
)
self.stdout.write(
self.style.SUCCESS('索引重建完成!')
)
3.3.2 Flask集成
# app.py
from flask import Flask, request, jsonify
from elasticsearch import Elasticsearch
from elasticsearch_dsl import Document, Text, Keyword, Integer, Float, Date, Boolean, Completion, connections
import os
from datetime import datetime
app = Flask(__name__)
# Elasticsearch配置
ES_HOST = os.getenv('ELASTICSEARCH_HOST', 'localhost:9200')
ES_USERNAME = os.getenv('ELASTICSEARCH_USERNAME', 'elastic')
ES_PASSWORD = os.getenv('ELASTICSEARCH_PASSWORD', 'password')
# 建立连接
connections.create_connection(
hosts=[ES_HOST],
http_auth=(ES_USERNAME, ES_PASSWORD),
use_ssl=True,
verify_certs=False,
ssl_show_warn=False,
timeout=30
)
class ProductDocument(Document):
"""产品文档"""
name = Text(
analyzer='standard',
fields={
'keyword': Keyword(),
'suggest': Completion()
}
)
description = Text(analyzer='standard')
price = Float()
category = Keyword()
brand = Keyword()
stock = Integer()
rating = Float()
reviews_count = Integer()
active = Boolean()
created_at = Date()
updated_at = Date()
suggest = Completion()
class Index:
name = 'products'
settings = {
'number_of_shards': 1,
'number_of_replicas': 1
}
class ProductService:
def __init__(self):
self.es = connections.get_connection()
self.index = 'products'
def create_index(self):
"""创建索引"""
try:
ProductDocument.init()
return True
except Exception as e:
print(f"创建索引失败: {e}")
return False
def index_product(self, product_data):
"""索引产品"""
try:
doc = ProductDocument(
meta={'id': product_data.get('id')},
**product_data
)
doc.save()
return True
except Exception as e:
print(f"索引产品失败: {e}")
return False
def search_products(self, query=None, filters=None, page=1, size=20):
"""搜索产品"""
try:
search = ProductDocument.search()
# 构建查询
if query:
search = search.query(
'multi_match',
query=query,
fields=['name^3', 'description^1', 'brand^2'],
fuzziness='AUTO'
)
# 应用过滤器
if filters:
for key, value in filters.items():
if key == 'category' and value:
search = search.filter('term', category=value)
elif key == 'brand' and value:
search = search.filter('term', brand=value)
elif key == 'price_range' and value:
search = search.filter('range', price=value)
elif key == 'in_stock' and value:
search = search.filter('range', stock={'gt': 0})
# 分页
start = (page - 1) * size
search = search[start:start + size]
# 添加聚合
search.aggs.bucket('categories', 'terms', field='category')
search.aggs.bucket('brands', 'terms', field='brand')
# 执行搜索
response = search.execute()
return {
'total': response.hits.total.value,
'products': [hit.to_dict() for hit in response],
'aggregations': response.aggregations.to_dict() if hasattr(response, 'aggregations') else {}
}
except Exception as e:
print(f"搜索失败: {e}")
return {'total': 0, 'products': [], 'aggregations': {}}
def get_suggestions(self, query, size=10):
"""获取搜索建议"""
try:
search = ProductDocument.search()
search = search.suggest(
'product_suggest',
query,
completion={
'field': 'suggest',
'size': size,
'skip_duplicates': True
}
)
response = search.execute()
suggestions = []
if hasattr(response, 'suggest') and 'product_suggest' in response.suggest:
for option in response.suggest.product_suggest[0].options:
suggestions.append({
'text': option.text,
'score': option._score
})
return suggestions
except Exception as e:
print(f"获取建议失败: {e}")
return []
# 初始化服务
product_service = ProductService()
@app.route('/health')
def health_check():
"""健康检查"""
try:
es = connections.get_connection()
es.ping()
return jsonify({
'status': 'ok',
'timestamp': datetime.now().isoformat(),
'elasticsearch': 'connected'
})
except Exception as e:
return jsonify({
'status': 'error',
'timestamp': datetime.now().isoformat(),
'elasticsearch': 'disconnected',
'error': str(e)
}), 500
@app.route('/api/products/search')
def search_products():
"""搜索产品"""
try:
query = request.args.get('q', '')
category = request.args.get('category')
brand = request.args.get('brand')
min_price = request.args.get('min_price')
max_price = request.args.get('max_price')
in_stock = request.args.get('in_stock') == 'true'
page = int(request.args.get('page', 1))
size = int(request.args.get('size', 20))
filters = {}
if category:
filters['category'] = category
if brand:
filters['brand'] = brand
if min_price or max_price:
price_range = {}
if min_price:
price_range['gte'] = float(min_price)
if max_price:
price_range['lte'] = float(max_price)
filters['price_range'] = price_range
if in_stock:
filters['in_stock'] = in_stock
result = product_service.search_products(
query=query,
filters=filters,
page=page,
size=size
)
return jsonify({
'success': True,
'data': result
})
except Exception as e:
return jsonify({
'success': False,
'error': str(e)
}), 500
@app.route('/api/products/suggest')
def suggest_products():
"""产品搜索建议"""
try:
query = request.args.get('q', '')
size = int(request.args.get('size', 10))
if not query:
return jsonify({
'success': False,
'error': '查询参数不能为空'
}), 400
suggestions = product_service.get_suggestions(query, size)
return jsonify({
'success': True,
'data': suggestions
})
except Exception as e:
return jsonify({
'success': False,
'error': str(e)
}), 500
@app.route('/api/products', methods=['POST'])
def create_product():
"""创建产品"""
try:
data = request.get_json()
# 添加时间戳
data['created_at'] = datetime.now()
data['updated_at'] = datetime.now()
# 添加搜索建议
data['suggest'] = {
'input': [data.get('name', ''), data.get('brand', '')],
'weight': int(data.get('rating', 0) * 10 + data.get('reviews_count', 0))
}
success = product_service.index_product(data)
if success:
return jsonify({
'success': True,
'message': '产品创建成功'
}), 201
else:
return jsonify({
'success': False,
'error': '产品创建失败'
}), 500
except Exception as e:
return jsonify({
'success': False,
'error': str(e)
}), 500
if __name__ == '__main__':
# 创建索引
product_service.create_index()
# 启动应用
app.run(debug=True, host='0.0.0.0', port=5000)
3.4 Go集成
// main.go
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"net/http"
"strconv"
"time"
"github.com/elastic/go-elasticsearch/v8"
"github.com/elastic/go-elasticsearch/v8/esapi"
"github.com/gorilla/mux"
"github.com/rs/cors"
)
type Product struct {
ID int `json:"id"`
Name string `json:"name"`
Description string `json:"description"`
Price float64 `json:"price"`
Category string `json:"category"`
Brand string `json:"brand"`
Stock int `json:"stock"`
Rating float64 `json:"rating"`
ReviewsCount int `json:"reviews_count"`
Active bool `json:"active"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
Suggest Suggest `json:"suggest"`
}
type Suggest struct {
Input []string `json:"input"`
Weight int `json:"weight"`
}
type SearchResponse struct {
Total int `json:"total"`
Products []Product `json:"products"`
Aggregations map[string]interface{} `json:"aggregations"`
}
type ElasticsearchService struct {
client *elasticsearch.Client
index string
}
func NewElasticsearchService() (*ElasticsearchService, error) {
cfg := elasticsearch.Config{
Addresses: []string{
"https://localhost:9200",
},
Username: "elastic",
Password: "password",
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true,
},
},
}
client, err := elasticsearch.NewClient(cfg)
if err != nil {
return nil, fmt.Errorf("创建Elasticsearch客户端失败: %v", err)
}
return &ElasticsearchService{
client: client,
index: "products",
}, nil
}
func (es *ElasticsearchService) CreateIndex() error {
mapping := `{
"settings": {
"number_of_shards": 1,
"number_of_replicas": 1,
"analysis": {
"analyzer": {
"product_analyzer": {
"type": "custom",
"tokenizer": "standard",
"filter": ["lowercase", "stop", "snowball"]
}
}
}
},
"mappings": {
"properties": {
"name": {
"type": "text",
"analyzer": "product_analyzer",
"fields": {
"keyword": {"type": "keyword"},
"suggest": {"type": "completion"}
}
},
"description": {
"type": "text",
"analyzer": "product_analyzer"
},
"price": {"type": "double"},
"category": {"type": "keyword"},
"brand": {"type": "keyword"},
"stock": {"type": "integer"},
"rating": {"type": "float"},
"reviews_count": {"type": "integer"},
"active": {"type": "boolean"},
"created_at": {"type": "date"},
"updated_at": {"type": "date"},
"suggest": {
"type": "completion",
"analyzer": "simple",
"preserve_separators": true,
"preserve_position_increments": true,
"max_input_length": 50
}
}
}
}`
req := esapi.IndicesCreateRequest{
Index: es.index,
Body: strings.NewReader(mapping),
}
res, err := req.Do(context.Background(), es.client)
if err != nil {
return fmt.Errorf("创建索引请求失败: %v", err)
}
defer res.Body.Close()
if res.IsError() {
return fmt.Errorf("创建索引失败: %s", res.String())
}
log.Printf("索引 %s 创建成功", es.index)
return nil
}
func (es *ElasticsearchService) IndexProduct(product Product) error {
productJSON, err := json.Marshal(product)
if err != nil {
return fmt.Errorf("序列化产品失败: %v", err)
}
req := esapi.IndexRequest{
Index: es.index,
DocumentID: strconv.Itoa(product.ID),
Body: strings.NewReader(string(productJSON)),
Refresh: "true",
}
res, err := req.Do(context.Background(), es.client)
if err != nil {
return fmt.Errorf("索引产品请求失败: %v", err)
}
defer res.Body.Close()
if res.IsError() {
return fmt.Errorf("索引产品失败: %s", res.String())
}
return nil
}
func (es *ElasticsearchService) SearchProducts(query string, filters map[string]interface{}, page, size int) (*SearchResponse, error) {
// 构建搜索查询
searchQuery := map[string]interface{}{
"query": map[string]interface{}{
"bool": map[string]interface{}{
"must": []interface{}{},
"filter": []interface{}{
map[string]interface{}{
"term": map[string]interface{}{
"active": true,
},
},
},
},
},
"from": (page - 1) * size,
"size": size,
"aggs": map[string]interface{}{
"categories": map[string]interface{}{
"terms": map[string]interface{}{
"field": "category",
"size": 20,
},
},
"brands": map[string]interface{}{
"terms": map[string]interface{}{
"field": "brand",
"size": 20,
},
},
},
}
// 添加文本查询
if query != "" {
mustQueries := searchQuery["query"].(map[string]interface{})["bool"].(map[string]interface{})["must"].([]interface{})
mustQueries = append(mustQueries, map[string]interface{}{
"multi_match": map[string]interface{}{
"query": query,
"fields": []string{
"name^3",
"description^1",
"brand^2",
},
"fuzziness": "AUTO",
},
})
searchQuery["query"].(map[string]interface{})["bool"].(map[string]interface{})["must"] = mustQueries
} else {
mustQueries := searchQuery["query"].(map[string]interface{})["bool"].(map[string]interface{})["must"].([]interface{})
mustQueries = append(mustQueries, map[string]interface{}{
"match_all": map[string]interface{}{},
})
searchQuery["query"].(map[string]interface{})["bool"].(map[string]interface{})["must"] = mustQueries
}
// 添加过滤器
filterQueries := searchQuery["query"].(map[string]interface{})["bool"].(map[string]interface{})["filter"].([]interface{})
if category, ok := filters["category"]; ok && category != "" {
filterQueries = append(filterQueries, map[string]interface{}{
"term": map[string]interface{}{
"category": category,
},
})
}
if brand, ok := filters["brand"]; ok && brand != "" {
filterQueries = append(filterQueries, map[string]interface{}{
"term": map[string]interface{}{
"brand": brand,
},
})
}
if minPrice, ok := filters["min_price"]; ok {
priceRange := map[string]interface{}{}
if minPrice != nil {
priceRange["gte"] = minPrice
}
if maxPrice, ok := filters["max_price"]; ok && maxPrice != nil {
priceRange["lte"] = maxPrice
}
if len(priceRange) > 0 {
filterQueries = append(filterQueries, map[string]interface{}{
"range": map[string]interface{}{
"price": priceRange,
},
})
}
}
if inStock, ok := filters["in_stock"]; ok && inStock == true {
filterQueries = append(filterQueries, map[string]interface{}{
"range": map[string]interface{}{
"stock": map[string]interface{}{
"gt": 0,
},
},
})
}
searchQuery["query"].(map[string]interface{})["bool"].(map[string]interface{})["filter"] = filterQueries
// 执行搜索
queryJSON, err := json.Marshal(searchQuery)
if err != nil {
return nil, fmt.Errorf("序列化查询失败: %v", err)
}
req := esapi.SearchRequest{
Index: []string{es.index},
Body: strings.NewReader(string(queryJSON)),
}
res, err := req.Do(context.Background(), es.client)
if err != nil {
return nil, fmt.Errorf("搜索请求失败: %v", err)
}
defer res.Body.Close()
if res.IsError() {
return nil, fmt.Errorf("搜索失败: %s", res.String())
}
// 解析响应
var searchResult map[string]interface{}
if err := json.NewDecoder(res.Body).Decode(&searchResult); err != nil {
return nil, fmt.Errorf("解析搜索结果失败: %v", err)
}
// 提取产品
hits := searchResult["hits"].(map[string]interface{})["hits"].([]interface{})
products := make([]Product, 0, len(hits))
for _, hit := range hits {
hitMap := hit.(map[string]interface{})
source := hitMap["_source"].(map[string]interface{})
var product Product
productJSON, _ := json.Marshal(source)
json.Unmarshal(productJSON, &product)
products = append(products, product)
}
// 提取总数
total := int(searchResult["hits"].(map[string]interface{})["total"].(map[string]interface{})["value"].(float64))
// 提取聚合
aggregations := make(map[string]interface{})
if aggs, ok := searchResult["aggregations"]; ok {
aggregations = aggs.(map[string]interface{})
}
return &SearchResponse{
Total: total,
Products: products,
Aggregations: aggregations,
}, nil
}
func (es *ElasticsearchService) GetSuggestions(query string, size int) ([]map[string]interface{}, error) {
suggestQuery := map[string]interface{}{
"suggest": map[string]interface{}{
"product_suggest": map[string]interface{}{
"prefix": query,
"completion": map[string]interface{}{
"field": "suggest",
"size": size,
"skip_duplicates": true,
},
},
},
}
queryJSON, err := json.Marshal(suggestQuery)
if err != nil {
return nil, fmt.Errorf("序列化建议查询失败: %v", err)
}
req := esapi.SearchRequest{
Index: []string{es.index},
Body: strings.NewReader(string(queryJSON)),
}
res, err := req.Do(context.Background(), es.client)
if err != nil {
return nil, fmt.Errorf("建议请求失败: %v", err)
}
defer res.Body.Close()
if res.IsError() {
return nil, fmt.Errorf("获取建议失败: %s", res.String())
}
var suggestResult map[string]interface{}
if err := json.NewDecoder(res.Body).Decode(&suggestResult); err != nil {
return nil, fmt.Errorf("解析建议结果失败: %v", err)
}
suggestions := make([]map[string]interface{}, 0)
if suggest, ok := suggestResult["suggest"]; ok {
if productSuggest, ok := suggest.(map[string]interface{})["product_suggest"]; ok {
if options, ok := productSuggest.([]interface{})[0].(map[string]interface{})["options"]; ok {
for _, option := range options.([]interface{}) {
optionMap := option.(map[string]interface{})
suggestions = append(suggestions, map[string]interface{}{
"text": optionMap["text"],
"score": optionMap["_score"],
})
}
}
}
}
return suggestions, nil
}
// HTTP处理器
func (es *ElasticsearchService) SearchHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
query := r.URL.Query().Get("q")
category := r.URL.Query().Get("category")
brand := r.URL.Query().Get("brand")
minPriceStr := r.URL.Query().Get("min_price")
maxPriceStr := r.URL.Query().Get("max_price")
inStockStr := r.URL.Query().Get("in_stock")
pageStr := r.URL.Query().Get("page")
sizeStr := r.URL.Query().Get("size")
// 解析参数
page := 1
if pageStr != "" {
if p, err := strconv.Atoi(pageStr); err == nil {
page = p
}
}
size := 20
if sizeStr != "" {
if s, err := strconv.Atoi(sizeStr); err == nil {
size = s
}
}
filters := make(map[string]interface{})
if category != "" {
filters["category"] = category
}
if brand != "" {
filters["brand"] = brand
}
if minPriceStr != "" {
if minPrice, err := strconv.ParseFloat(minPriceStr, 64); err == nil {
filters["min_price"] = minPrice
}
}
if maxPriceStr != "" {
if maxPrice, err := strconv.ParseFloat(maxPriceStr, 64); err == nil {
filters["max_price"] = maxPrice
}
}
if inStockStr == "true" {
filters["in_stock"] = true
}
result, err := es.SearchProducts(query, filters, page, size)
if err != nil {
http.Error(w, fmt.Sprintf(`{"success": false, "error": "%s"}`, err.Error()), http.StatusInternalServerError)
return
}
response := map[string]interface{}{
"success": true,
"data": result,
}
json.NewEncoder(w).Encode(response)
}
func (es *ElasticsearchService) SuggestHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
query := r.URL.Query().Get("q")
sizeStr := r.URL.Query().Get("size")
if query == "" {
http.Error(w, `{"success": false, "error": "查询参数不能为空"}`, http.StatusBadRequest)
return
}
size := 10
if sizeStr != "" {
if s, err := strconv.Atoi(sizeStr); err == nil {
size = s
}
}
suggestions, err := es.GetSuggestions(query, size)
if err != nil {
http.Error(w, fmt.Sprintf(`{"success": false, "error": "%s"}`, err.Error()), http.StatusInternalServerError)
return
}
response := map[string]interface{}{
"success": true,
"data": suggestions,
}
json.NewEncoder(w).Encode(response)
}
func (es *ElasticsearchService) CreateProductHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
if r.Method != "POST" {
http.Error(w, `{"success": false, "error": "方法不允许"}`, http.StatusMethodNotAllowed)
return
}
var product Product
if err := json.NewDecoder(r.Body).Decode(&product); err != nil {
http.Error(w, fmt.Sprintf(`{"success": false, "error": "解析请求失败: %s"}`, err.Error()), http.StatusBadRequest)
return
}
// 设置时间戳
now := time.Now()
product.CreatedAt = now
product.UpdatedAt = now
// 设置搜索建议
product.Suggest = Suggest{
Input: []string{product.Name, product.Brand},
Weight: int(product.Rating*10) + product.ReviewsCount,
}
if err := es.IndexProduct(product); err != nil {
http.Error(w, fmt.Sprintf(`{"success": false, "error": "%s"}`, err.Error()), http.StatusInternalServerError)
return
}
response := map[string]interface{}{
"success": true,
"message": "产品创建成功",
"data": product,
}
w.WriteHeader(http.StatusCreated)
json.NewEncoder(w).Encode(response)
}
func healthHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
response := map[string]interface{}{
"status": "ok",
"timestamp": time.Now().Format(time.RFC3339),
"service": "elasticsearch-api",
}
json.NewEncoder(w).Encode(response)
}
func main() {
// 创建Elasticsearch服务
esService, err := NewElasticsearchService()
if err != nil {
log.Fatalf("创建Elasticsearch服务失败: %v", err)
}
// 创建索引
if err := esService.CreateIndex(); err != nil {
log.Printf("创建索引失败: %v", err)
}
// 创建路由
r := mux.NewRouter()
// API路由
api := r.PathPrefix("/api").Subrouter()
api.HandleFunc("/products/search", esService.SearchHandler).Methods("GET")
api.HandleFunc("/products/suggest", esService.SuggestHandler).Methods("GET")
api.HandleFunc("/products", esService.CreateProductHandler).Methods("POST")
// 健康检查
r.HandleFunc("/health", healthHandler).Methods("GET")
// CORS配置
c := cors.New(cors.Options{
AllowedOrigins: []string{"*"},
AllowedMethods: []string{"GET", "POST", "PUT", "DELETE", "OPTIONS"},
AllowedHeaders: []string{"*"},
})
handler := c.Handler(r)
// 启动服务器
port := ":8080"
log.Printf("服务器启动在端口 %s", port)
log.Printf("健康检查: http://localhost%s/health", port)
log.Printf("搜索API: http://localhost%s/api/products/search", port)
if err := http.ListenAndServe(port, handler); err != nil {
log.Fatalf("启动服务器失败: %v", err)
}
}
4. 最佳实践
4.1 架构设计原则
分离关注点
- 数据存储与搜索分离
- 读写分离
- 缓存层设计
可扩展性
- 水平扩展设计
- 微服务架构
- 负载均衡
容错性
- 故障转移
- 降级策略
- 监控告警
4.2 性能优化
索引优化
- 合理的分片策略
- 索引模板使用
- 字段映射优化
查询优化
- 查询缓存
- 分页优化
- 聚合优化
数据同步
- 增量同步
- 批量处理
- 异步处理
4.3 安全考虑
访问控制
- 用户认证
- 权限管理
- API密钥管理
数据保护
- 传输加密
- 存储加密
- 敏感数据脱敏
审计日志
- 操作记录
- 异常监控
- 安全事件追踪
5. 章节总结
5.1 核心知识点
ELK Stack集成
- Logstash数据处理
- Kibana可视化
- Beats数据收集
编程语言集成
- Node.js客户端
- Python客户端
- Go客户端
最佳实践
- 架构设计
- 性能优化
- 安全考虑
5.2 实践要点
- 选择合适的集成方案
- 遵循最佳实践
- 注重性能和安全
- 建立监控和告警
5.3 练习题
- 设计一个完整的ELK Stack日志分析系统
- 实现一个多语言的Elasticsearch搜索服务
- 建立Elasticsearch集群的监控和告警系统
- 设计一个高可用的搜索架构方案