概述
本章将通过实际案例深入分析ELK Stack在不同场景下的应用,总结最佳实践和优化策略。我们将从架构设计、性能优化、运维管理等多个维度,为您提供企业级ELK Stack部署和运维的完整指南。
架构设计最佳实践
1. 分层架构设计
企业级ELK架构图:
graph TB
subgraph "数据源层"
A1[Web服务器]
A2[应用服务器]
A3[数据库服务器]
A4[网络设备]
A5[安全设备]
end
subgraph "数据收集层"
B1[Filebeat]
B2[Metricbeat]
B3[Packetbeat]
B4[Winlogbeat]
B5[Heartbeat]
end
subgraph "数据缓冲层"
C1[Redis Cluster]
C2[Kafka Cluster]
end
subgraph "数据处理层"
D1[Logstash Node 1]
D2[Logstash Node 2]
D3[Logstash Node 3]
end
subgraph "数据存储层"
E1[ES Master Node 1]
E2[ES Master Node 2]
E3[ES Master Node 3]
E4[ES Data Node 1]
E5[ES Data Node 2]
E6[ES Data Node 3]
E7[ES Coordinating Node]
end
subgraph "数据展示层"
F1[Kibana Node 1]
F2[Kibana Node 2]
end
subgraph "负载均衡层"
G1[Nginx/HAProxy]
end
A1 --> B1
A2 --> B1
A3 --> B2
A4 --> B3
A5 --> B4
B1 --> C1
B2 --> C2
B3 --> C1
B4 --> C2
B5 --> C1
C1 --> D1
C1 --> D2
C2 --> D2
C2 --> D3
D1 --> E7
D2 --> E7
D3 --> E7
E7 --> E4
E7 --> E5
E7 --> E6
E1 -.-> E2
E2 -.-> E3
E3 -.-> E1
E4 --> F1
E5 --> F1
E6 --> F2
F1 --> G1
F2 --> G1
架构设计原则配置:
# architecture-config.yml
architecture:
design_principles:
- high_availability
- horizontal_scalability
- fault_tolerance
- performance_optimization
- security_first
layers:
data_collection:
components: ["filebeat", "metricbeat", "packetbeat"]
deployment_strategy: "distributed"
redundancy: "active-active"
data_buffering:
components: ["redis", "kafka"]
purpose: "decoupling and buffering"
capacity_planning: "3x peak load"
data_processing:
components: ["logstash"]
scaling_strategy: "horizontal"
resource_allocation: "cpu_intensive"
data_storage:
components: ["elasticsearch"]
node_types: ["master", "data", "coordinating"]
replication_factor: 1
shard_strategy: "time_based"
data_visualization:
components: ["kibana"]
load_balancing: "round_robin"
session_affinity: false
capacity_planning:
daily_log_volume: "500GB"
retention_period: "90_days"
peak_ingestion_rate: "100MB/s"
concurrent_users: 100
query_response_time: "<2s"
2. 容量规划
容量规划计算脚本:
#!/usr/bin/env python3
# capacity_planning.py
import json
import math
from typing import Dict, Any
from datetime import datetime, timedelta
class ELKCapacityPlanner:
def __init__(self):
self.sizing_factors = {
'compression_ratio': 0.3, # 压缩比
'replica_factor': 1, # 副本数
'overhead_factor': 1.2, # 系统开销
'growth_factor': 1.5, # 增长预留
'index_overhead': 0.1 # 索引开销
}
self.hardware_specs = {
'elasticsearch': {
'master_node': {
'cpu_cores': 4,
'memory_gb': 16,
'disk_gb': 100,
'disk_type': 'SSD'
},
'data_node': {
'cpu_cores': 16,
'memory_gb': 64,
'disk_gb': 2000,
'disk_type': 'SSD'
},
'coordinating_node': {
'cpu_cores': 8,
'memory_gb': 32,
'disk_gb': 200,
'disk_type': 'SSD'
}
},
'logstash': {
'cpu_cores': 8,
'memory_gb': 16,
'disk_gb': 100,
'disk_type': 'SSD'
},
'kibana': {
'cpu_cores': 4,
'memory_gb': 8,
'disk_gb': 50,
'disk_type': 'SSD'
}
}
def calculate_storage_requirements(self,
daily_volume_gb: float,
retention_days: int,
compression_ratio: float = None) -> Dict[str, Any]:
"""计算存储需求"""
if compression_ratio is None:
compression_ratio = self.sizing_factors['compression_ratio']
# 基础存储计算
raw_storage = daily_volume_gb * retention_days
compressed_storage = raw_storage * compression_ratio
# 考虑副本和开销
replica_factor = self.sizing_factors['replica_factor']
overhead_factor = self.sizing_factors['overhead_factor']
index_overhead = self.sizing_factors['index_overhead']
total_storage = compressed_storage * (1 + replica_factor) * overhead_factor
index_storage = total_storage * index_overhead
# 增长预留
growth_factor = self.sizing_factors['growth_factor']
recommended_storage = total_storage * growth_factor
return {
'raw_storage_gb': round(raw_storage, 2),
'compressed_storage_gb': round(compressed_storage, 2),
'total_storage_gb': round(total_storage, 2),
'index_storage_gb': round(index_storage, 2),
'recommended_storage_gb': round(recommended_storage, 2),
'compression_ratio': compression_ratio,
'replica_factor': replica_factor,
'retention_days': retention_days
}
def calculate_node_requirements(self,
daily_volume_gb: float,
peak_ingestion_rate_mb_s: float,
concurrent_users: int,
retention_days: int) -> Dict[str, Any]:
"""计算节点需求"""
storage_req = self.calculate_storage_requirements(daily_volume_gb, retention_days)
# 数据节点计算
data_node_capacity = self.hardware_specs['elasticsearch']['data_node']['disk_gb']
required_data_nodes = math.ceil(storage_req['recommended_storage_gb'] / data_node_capacity)
# 主节点计算(奇数个,最少3个)
master_nodes = max(3, 3 if required_data_nodes <= 10 else 5)
# 协调节点计算(基于并发用户数)
coordinating_nodes = max(1, math.ceil(concurrent_users / 50))
# Logstash节点计算(基于峰值摄取速率)
logstash_throughput_mb_s = 50 # 单个Logstash节点处理能力
logstash_nodes = max(1, math.ceil(peak_ingestion_rate_mb_s / logstash_throughput_mb_s))
# Kibana节点计算(基于并发用户数)
kibana_nodes = max(1, math.ceil(concurrent_users / 100))
return {
'elasticsearch': {
'master_nodes': master_nodes,
'data_nodes': required_data_nodes,
'coordinating_nodes': coordinating_nodes,
'total_nodes': master_nodes + required_data_nodes + coordinating_nodes
},
'logstash': {
'nodes': logstash_nodes
},
'kibana': {
'nodes': kibana_nodes
},
'storage_requirements': storage_req
}
def calculate_resource_requirements(self, node_requirements: Dict[str, Any]) -> Dict[str, Any]:
"""计算资源需求"""
total_resources = {
'cpu_cores': 0,
'memory_gb': 0,
'disk_gb': 0,
'estimated_cost_usd_month': 0
}
# Elasticsearch资源
es_req = node_requirements['elasticsearch']
# 主节点资源
master_spec = self.hardware_specs['elasticsearch']['master_node']
master_resources = {
'cpu_cores': master_spec['cpu_cores'] * es_req['master_nodes'],
'memory_gb': master_spec['memory_gb'] * es_req['master_nodes'],
'disk_gb': master_spec['disk_gb'] * es_req['master_nodes']
}
# 数据节点资源
data_spec = self.hardware_specs['elasticsearch']['data_node']
data_resources = {
'cpu_cores': data_spec['cpu_cores'] * es_req['data_nodes'],
'memory_gb': data_spec['memory_gb'] * es_req['data_nodes'],
'disk_gb': data_spec['disk_gb'] * es_req['data_nodes']
}
# 协调节点资源
coord_spec = self.hardware_specs['elasticsearch']['coordinating_node']
coord_resources = {
'cpu_cores': coord_spec['cpu_cores'] * es_req['coordinating_nodes'],
'memory_gb': coord_spec['memory_gb'] * es_req['coordinating_nodes'],
'disk_gb': coord_spec['disk_gb'] * es_req['coordinating_nodes']
}
# Logstash资源
logstash_spec = self.hardware_specs['logstash']
logstash_resources = {
'cpu_cores': logstash_spec['cpu_cores'] * node_requirements['logstash']['nodes'],
'memory_gb': logstash_spec['memory_gb'] * node_requirements['logstash']['nodes'],
'disk_gb': logstash_spec['disk_gb'] * node_requirements['logstash']['nodes']
}
# Kibana资源
kibana_spec = self.hardware_specs['kibana']
kibana_resources = {
'cpu_cores': kibana_spec['cpu_cores'] * node_requirements['kibana']['nodes'],
'memory_gb': kibana_spec['memory_gb'] * node_requirements['kibana']['nodes'],
'disk_gb': kibana_spec['disk_gb'] * node_requirements['kibana']['nodes']
}
# 汇总资源
for resource_type in ['cpu_cores', 'memory_gb', 'disk_gb']:
total_resources[resource_type] = (
master_resources[resource_type] +
data_resources[resource_type] +
coord_resources[resource_type] +
logstash_resources[resource_type] +
kibana_resources[resource_type]
)
# 估算成本(基于AWS实例价格)
cost_per_core_month = 50 # USD
cost_per_gb_memory_month = 10 # USD
cost_per_gb_ssd_month = 0.1 # USD
total_resources['estimated_cost_usd_month'] = (
total_resources['cpu_cores'] * cost_per_core_month +
total_resources['memory_gb'] * cost_per_gb_memory_month +
total_resources['disk_gb'] * cost_per_gb_ssd_month
)
return {
'total_resources': total_resources,
'breakdown': {
'elasticsearch_master': master_resources,
'elasticsearch_data': data_resources,
'elasticsearch_coordinating': coord_resources,
'logstash': logstash_resources,
'kibana': kibana_resources
}
}
def generate_sizing_report(self,
daily_volume_gb: float,
peak_ingestion_rate_mb_s: float,
concurrent_users: int,
retention_days: int,
output_file: str = None) -> Dict[str, Any]:
"""生成完整的容量规划报告"""
# 计算节点需求
node_req = self.calculate_node_requirements(
daily_volume_gb, peak_ingestion_rate_mb_s, concurrent_users, retention_days
)
# 计算资源需求
resource_req = self.calculate_resource_requirements(node_req)
# 生成报告
report = {
'planning_parameters': {
'daily_volume_gb': daily_volume_gb,
'peak_ingestion_rate_mb_s': peak_ingestion_rate_mb_s,
'concurrent_users': concurrent_users,
'retention_days': retention_days,
'planning_date': datetime.now().isoformat()
},
'node_requirements': node_req,
'resource_requirements': resource_req,
'recommendations': self._generate_recommendations(node_req, resource_req),
'sizing_factors': self.sizing_factors
}
if output_file:
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(report, f, indent=2, ensure_ascii=False)
print(f"Capacity planning report saved to {output_file}")
return report
def _generate_recommendations(self, node_req: Dict, resource_req: Dict) -> list:
"""生成优化建议"""
recommendations = []
total_nodes = node_req['elasticsearch']['total_nodes']
if total_nodes > 20:
recommendations.append({
'type': 'architecture',
'priority': 'high',
'message': f'Large cluster detected ({total_nodes} nodes). Consider implementing dedicated master nodes and hot/warm architecture.'
})
total_storage = resource_req['total_resources']['disk_gb']
if total_storage > 10000: # 10TB
recommendations.append({
'type': 'storage',
'priority': 'medium',
'message': 'Large storage requirement detected. Consider implementing tiered storage with hot/warm/cold architecture.'
})
memory_per_data_node = resource_req['breakdown']['elasticsearch_data']['memory_gb'] / node_req['elasticsearch']['data_nodes']
if memory_per_data_node > 64:
recommendations.append({
'type': 'memory',
'priority': 'medium',
'message': 'High memory requirement per data node. Consider increasing the number of data nodes instead.'
})
monthly_cost = resource_req['total_resources']['estimated_cost_usd_month']
if monthly_cost > 10000:
recommendations.append({
'type': 'cost',
'priority': 'high',
'message': f'High estimated monthly cost (${monthly_cost:,.2f}). Consider cost optimization strategies.'
})
return recommendations
# 使用示例
if __name__ == "__main__":
planner = ELKCapacityPlanner()
# 示例参数
report = planner.generate_sizing_report(
daily_volume_gb=500,
peak_ingestion_rate_mb_s=100,
concurrent_users=50,
retention_days=90,
output_file='elk_capacity_plan.json'
)
print("\n=== ELK Stack Capacity Planning Report ===")
print(f"Total Elasticsearch Nodes: {report['node_requirements']['elasticsearch']['total_nodes']}")
print(f"Total CPU Cores: {report['resource_requirements']['total_resources']['cpu_cores']}")
print(f"Total Memory: {report['resource_requirements']['total_resources']['memory_gb']} GB")
print(f"Total Storage: {report['resource_requirements']['total_resources']['disk_gb']} GB")
print(f"Estimated Monthly Cost: ${report['resource_requirements']['total_resources']['estimated_cost_usd_month']:,.2f}")
if report['recommendations']:
print("\n=== Recommendations ===")
for rec in report['recommendations']:
print(f"[{rec['priority'].upper()}] {rec['message']}")
性能优化最佳实践
1. 索引优化策略
索引模板优化配置:
{
"index_patterns": ["logs-*"],
"template": {
"settings": {
"number_of_shards": 1,
"number_of_replicas": 1,
"refresh_interval": "30s",
"index.codec": "best_compression",
"index.merge.policy.max_merge_at_once": 5,
"index.merge.policy.segments_per_tier": 5,
"index.translog.flush_threshold_size": "1gb",
"index.translog.sync_interval": "30s",
"index.query.default_field": ["message", "log.level"],
"index.mapping.total_fields.limit": 2000,
"index.mapping.depth.limit": 20,
"index.mapping.nested_fields.limit": 100,
"index.max_result_window": 50000,
"index.max_rescore_window": 50000,
"index.highlight.max_analyzed_offset": 1000000,
"index.max_terms_count": 100000,
"index.max_regex_length": 10000
},
"mappings": {
"dynamic_templates": [
{
"strings_as_keywords": {
"match_mapping_type": "string",
"match": "*_id",
"mapping": {
"type": "keyword",
"index": true,
"doc_values": true
}
}
},
{
"strings_as_text": {
"match_mapping_type": "string",
"mapping": {
"type": "text",
"analyzer": "standard",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
}
}
},
{
"longs_as_integers": {
"match_mapping_type": "long",
"mapping": {
"type": "integer"
}
}
}
],
"properties": {
"@timestamp": {
"type": "date",
"format": "strict_date_optional_time||epoch_millis"
},
"message": {
"type": "text",
"analyzer": "standard",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 8192
}
}
},
"log": {
"properties": {
"level": {
"type": "keyword"
},
"logger": {
"type": "keyword"
},
"file": {
"properties": {
"path": {
"type": "keyword"
},
"line": {
"type": "integer"
}
}
}
}
},
"host": {
"properties": {
"name": {
"type": "keyword"
},
"ip": {
"type": "ip"
}
}
},
"user": {
"properties": {
"id": {
"type": "keyword"
},
"name": {
"type": "keyword"
}
}
},
"http": {
"properties": {
"request": {
"properties": {
"method": {
"type": "keyword"
},
"url": {
"type": "keyword"
},
"bytes": {
"type": "long"
}
}
},
"response": {
"properties": {
"status_code": {
"type": "short"
},
"bytes": {
"type": "long"
},
"time": {
"type": "float"
}
}
}
}
}
}
}
},
"priority": 100,
"version": 1,
"_meta": {
"description": "Optimized template for log data",
"created_by": "elk-admin",
"created_at": "2024-01-01"
}
}
2. 查询优化策略
查询性能优化脚本:
#!/usr/bin/env python3
# query_optimizer.py
import json
import time
from typing import Dict, List, Any
from elasticsearch import Elasticsearch
from datetime import datetime, timedelta
class QueryOptimizer:
def __init__(self, es_host='localhost:9200', es_user=None, es_password=None):
if es_user and es_password:
self.es = Elasticsearch(
[es_host],
http_auth=(es_user, es_password),
verify_certs=True
)
else:
self.es = Elasticsearch([es_host])
self.optimization_rules = {
'time_range_optimization': {
'description': 'Optimize time range queries',
'max_range_days': 30,
'recommended_format': 'epoch_millis'
},
'field_optimization': {
'description': 'Optimize field queries',
'prefer_keyword_fields': True,
'avoid_wildcard_prefix': True
},
'aggregation_optimization': {
'description': 'Optimize aggregation queries',
'max_buckets': 10000,
'use_composite_aggs': True
},
'pagination_optimization': {
'description': 'Optimize pagination',
'max_from_size': 10000,
'use_search_after': True
}
}
def analyze_query(self, query: Dict[str, Any]) -> Dict[str, Any]:
"""分析查询并提供优化建议"""
analysis = {
'query': query,
'issues': [],
'recommendations': [],
'optimized_query': None,
'estimated_performance_gain': 0
}
# 检查时间范围
time_issues = self._check_time_range(query)
analysis['issues'].extend(time_issues)
# 检查字段使用
field_issues = self._check_field_usage(query)
analysis['issues'].extend(field_issues)
# 检查聚合
agg_issues = self._check_aggregations(query)
analysis['issues'].extend(agg_issues)
# 检查分页
pagination_issues = self._check_pagination(query)
analysis['issues'].extend(pagination_issues)
# 生成优化建议
analysis['recommendations'] = self._generate_recommendations(analysis['issues'])
# 生成优化后的查询
analysis['optimized_query'] = self._optimize_query(query, analysis['issues'])
# 估算性能提升
analysis['estimated_performance_gain'] = self._estimate_performance_gain(analysis['issues'])
return analysis
def _check_time_range(self, query: Dict) -> List[Dict]:
"""检查时间范围查询"""
issues = []
def check_range_recursive(obj, path=""):
if isinstance(obj, dict):
for key, value in obj.items():
current_path = f"{path}.{key}" if path else key
if key == "range" and "@timestamp" in value:
timestamp_range = value["@timestamp"]
# 检查时间范围是否过大
if "gte" in timestamp_range and "lte" in timestamp_range:
try:
start_time = datetime.fromisoformat(timestamp_range["gte"].replace('Z', '+00:00'))
end_time = datetime.fromisoformat(timestamp_range["lte"].replace('Z', '+00:00'))
range_days = (end_time - start_time).days
if range_days > self.optimization_rules['time_range_optimization']['max_range_days']:
issues.append({
'type': 'time_range_too_large',
'severity': 'medium',
'path': current_path,
'message': f'Time range is {range_days} days, consider reducing to improve performance',
'current_range_days': range_days,
'recommended_max_days': self.optimization_rules['time_range_optimization']['max_range_days']
})
except Exception:
pass
# 检查时间格式
for time_key in ['gte', 'lte', 'gt', 'lt']:
if time_key in timestamp_range:
time_value = timestamp_range[time_key]
if isinstance(time_value, str) and not time_value.isdigit():
issues.append({
'type': 'time_format_not_optimal',
'severity': 'low',
'path': f"{current_path}.{time_key}",
'message': 'Consider using epoch_millis format for better performance',
'current_format': 'iso_string',
'recommended_format': 'epoch_millis'
})
elif isinstance(value, (dict, list)):
check_range_recursive(value, current_path)
elif isinstance(obj, list):
for i, item in enumerate(obj):
check_range_recursive(item, f"{path}[{i}]")
check_range_recursive(query)
return issues
def _check_field_usage(self, query: Dict) -> List[Dict]:
"""检查字段使用"""
issues = []
def check_fields_recursive(obj, path=""):
if isinstance(obj, dict):
for key, value in obj.items():
current_path = f"{path}.{key}" if path else key
# 检查通配符查询
if key in ['wildcard', 'prefix'] and isinstance(value, dict):
for field_name, pattern in value.items():
if isinstance(pattern, str) and pattern.startswith('*'):
issues.append({
'type': 'wildcard_prefix',
'severity': 'high',
'path': current_path,
'field': field_name,
'message': 'Wildcard queries starting with * are very slow',
'pattern': pattern
})
# 检查文本字段的term查询
elif key == 'term' and isinstance(value, dict):
for field_name in value.keys():
if not field_name.endswith('.keyword') and '.' not in field_name:
issues.append({
'type': 'term_on_text_field',
'severity': 'medium',
'path': current_path,
'field': field_name,
'message': f'Consider using {field_name}.keyword for exact matches',
'recommendation': f'{field_name}.keyword'
})
elif isinstance(value, (dict, list)):
check_fields_recursive(value, current_path)
elif isinstance(obj, list):
for i, item in enumerate(obj):
check_fields_recursive(item, f"{path}[{i}]")
check_fields_recursive(query)
return issues
def _check_aggregations(self, query: Dict) -> List[Dict]:
"""检查聚合查询"""
issues = []
if 'aggs' in query or 'aggregations' in query:
aggs = query.get('aggs', query.get('aggregations', {}))
def check_aggs_recursive(agg_obj, path=""):
if isinstance(agg_obj, dict):
for agg_name, agg_config in agg_obj.items():
current_path = f"{path}.{agg_name}" if path else agg_name
if isinstance(agg_config, dict):
# 检查terms聚合的size
if 'terms' in agg_config:
terms_config = agg_config['terms']
size = terms_config.get('size', 10)
if size > self.optimization_rules['aggregation_optimization']['max_buckets']:
issues.append({
'type': 'large_aggregation_size',
'severity': 'high',
'path': current_path,
'message': f'Aggregation size {size} is too large, consider using composite aggregation',
'current_size': size,
'max_recommended': self.optimization_rules['aggregation_optimization']['max_buckets']
})
# 检查嵌套聚合
if 'aggs' in agg_config or 'aggregations' in agg_config:
nested_aggs = agg_config.get('aggs', agg_config.get('aggregations', {}))
check_aggs_recursive(nested_aggs, current_path)
check_aggs_recursive(aggs)
return issues
def _check_pagination(self, query: Dict) -> List[Dict]:
"""检查分页设置"""
issues = []
from_param = query.get('from', 0)
size_param = query.get('size', 10)
total_offset = from_param + size_param
if total_offset > self.optimization_rules['pagination_optimization']['max_from_size']:
issues.append({
'type': 'deep_pagination',
'severity': 'high',
'message': f'Deep pagination detected (from: {from_param}, size: {size_param})',
'total_offset': total_offset,
'recommendation': 'Use search_after for deep pagination',
'max_recommended_offset': self.optimization_rules['pagination_optimization']['max_from_size']
})
return issues
def _generate_recommendations(self, issues: List[Dict]) -> List[Dict]:
"""生成优化建议"""
recommendations = []
issue_types = set(issue['type'] for issue in issues)
if 'time_range_too_large' in issue_types:
recommendations.append({
'type': 'time_range',
'priority': 'high',
'action': 'Reduce time range or use time-based indices',
'description': 'Large time ranges can significantly impact query performance'
})
if 'wildcard_prefix' in issue_types:
recommendations.append({
'type': 'wildcard',
'priority': 'high',
'action': 'Avoid leading wildcards or use n-gram analysis',
'description': 'Leading wildcard queries require scanning all terms'
})
if 'term_on_text_field' in issue_types:
recommendations.append({
'type': 'field_mapping',
'priority': 'medium',
'action': 'Use keyword fields for exact matches',
'description': 'Term queries on analyzed text fields may not work as expected'
})
if 'large_aggregation_size' in issue_types:
recommendations.append({
'type': 'aggregation',
'priority': 'high',
'action': 'Use composite aggregations for large result sets',
'description': 'Large aggregations can consume significant memory'
})
if 'deep_pagination' in issue_types:
recommendations.append({
'type': 'pagination',
'priority': 'high',
'action': 'Implement search_after for deep pagination',
'description': 'Deep pagination with from/size is inefficient'
})
return recommendations
def _optimize_query(self, query: Dict, issues: List[Dict]) -> Dict:
"""生成优化后的查询"""
optimized = query.copy()
# 优化分页
deep_pagination_issues = [i for i in issues if i['type'] == 'deep_pagination']
if deep_pagination_issues:
# 移除from参数,添加search_after示例
if 'from' in optimized:
del optimized['from']
optimized['search_after'] = ['example_sort_value']
optimized['sort'] = [{'@timestamp': {'order': 'desc'}}]
# 优化字段查询
def optimize_fields_recursive(obj):
if isinstance(obj, dict):
new_obj = {}
for key, value in obj.items():
if key == 'term' and isinstance(value, dict):
new_term = {}
for field_name, term_value in value.items():
# 检查是否需要添加.keyword
term_on_text_issues = [
i for i in issues
if i['type'] == 'term_on_text_field' and i.get('field') == field_name
]
if term_on_text_issues:
new_term[f"{field_name}.keyword"] = term_value
else:
new_term[field_name] = term_value
new_obj[key] = new_term
elif isinstance(value, (dict, list)):
new_obj[key] = optimize_fields_recursive(value)
else:
new_obj[key] = value
return new_obj
elif isinstance(obj, list):
return [optimize_fields_recursive(item) for item in obj]
else:
return obj
optimized = optimize_fields_recursive(optimized)
return optimized
def _estimate_performance_gain(self, issues: List[Dict]) -> int:
"""估算性能提升百分比"""
gain = 0
severity_weights = {
'high': 30,
'medium': 15,
'low': 5
}
for issue in issues:
severity = issue.get('severity', 'low')
gain += severity_weights.get(severity, 5)
return min(gain, 80) # 最大80%的性能提升
def benchmark_query(self, query: Dict, iterations: int = 5) -> Dict[str, Any]:
"""基准测试查询性能"""
results = {
'query': query,
'iterations': iterations,
'execution_times': [],
'average_time': 0,
'min_time': 0,
'max_time': 0,
'total_hits': 0,
'errors': []
}
for i in range(iterations):
try:
start_time = time.time()
response = self.es.search(body=query, index='*')
end_time = time.time()
execution_time = (end_time - start_time) * 1000 # 转换为毫秒
results['execution_times'].append(execution_time)
if i == 0: # 只记录第一次的命中数
results['total_hits'] = response['hits']['total']['value']
except Exception as e:
results['errors'].append({
'iteration': i + 1,
'error': str(e)
})
if results['execution_times']:
results['average_time'] = sum(results['execution_times']) / len(results['execution_times'])
results['min_time'] = min(results['execution_times'])
results['max_time'] = max(results['execution_times'])
return results
def compare_queries(self, original_query: Dict, optimized_query: Dict, iterations: int = 5) -> Dict[str, Any]:
"""比较原始查询和优化后查询的性能"""
print("Benchmarking original query...")
original_results = self.benchmark_query(original_query, iterations)
print("Benchmarking optimized query...")
optimized_results = self.benchmark_query(optimized_query, iterations)
comparison = {
'original': original_results,
'optimized': optimized_results,
'improvement': {
'average_time_reduction_ms': original_results['average_time'] - optimized_results['average_time'],
'performance_improvement_percent': 0,
'faster': False
}
}
if original_results['average_time'] > 0:
improvement_percent = (
(original_results['average_time'] - optimized_results['average_time']) /
original_results['average_time']
) * 100
comparison['improvement']['performance_improvement_percent'] = improvement_percent
comparison['improvement']['faster'] = improvement_percent > 0
return comparison
# 使用示例
if __name__ == "__main__":
optimizer = QueryOptimizer()
# 示例查询
sample_query = {
"query": {
"bool": {
"must": [
{
"range": {
"@timestamp": {
"gte": "2024-01-01T00:00:00Z",
"lte": "2024-03-01T00:00:00Z"
}
}
},
{
"term": {
"message": "error"
}
},
{
"wildcard": {
"user.name": "*admin*"
}
}
],
"filter": [
{
"term": {
"log.level": "ERROR"
}
}
]
}
},
"aggs": {
"top_users": {
"terms": {
"field": "user.name",
"size": 50000
}
}
},
"from": 50000,
"size": 100
}
# 分析查询
analysis = optimizer.analyze_query(sample_query)
print("=== Query Analysis ===")
print(f"Issues found: {len(analysis['issues'])}")
for issue in analysis['issues']:
print(f" [{issue['severity'].upper()}] {issue['message']}")
print("\n=== Recommendations ===")
for rec in analysis['recommendations']:
print(f" [{rec['priority'].upper()}] {rec['action']}: {rec['description']}")
print(f"\nEstimated performance gain: {analysis['estimated_performance_gain']}%")
# 比较性能(如果Elasticsearch可用)
try:
comparison = optimizer.compare_queries(sample_query, analysis['optimized_query'])
print(f"\n=== Performance Comparison ===")
print(f"Original average time: {comparison['original']['average_time']:.2f}ms")
print(f"Optimized average time: {comparison['optimized']['average_time']:.2f}ms")
print(f"Performance improvement: {comparison['improvement']['performance_improvement_percent']:.1f}%")
except Exception as e:
print(f"\nCould not perform benchmark: {e}")
3. 数据生命周期管理
ILM策略配置:
{
"policy": {
"phases": {
"hot": {
"min_age": "0ms",
"actions": {
"rollover": {
"max_size": "50gb",
"max_age": "1d",
"max_docs": 100000000
},
"set_priority": {
"priority": 100
}
}
},
"warm": {
"min_age": "7d",
"actions": {
"set_priority": {
"priority": 50
},
"allocate": {
"number_of_replicas": 0,
"include": {
"box_type": "warm"
}
},
"forcemerge": {
"max_num_segments": 1
},
"shrink": {
"number_of_shards": 1
}
}
},
"cold": {
"min_age": "30d",
"actions": {
"set_priority": {
"priority": 0
},
"allocate": {
"number_of_replicas": 0,
"include": {
"box_type": "cold"
}
}
}
},
"frozen": {
"min_age": "90d",
"actions": {
"freeze": {}
}
},
"delete": {
"min_age": "365d",
"actions": {
"delete": {}
}
}
}
}
}
运维管理最佳实践
1. 监控和告警
综合监控脚本:
#!/usr/bin/env python3
# elk_monitoring.py
import json
import time
import smtplib
import requests
from datetime import datetime, timedelta
from typing import Dict, List, Any
from email.mime.text import MimeText
from email.mime.multipart import MimeMultipart
from elasticsearch import Elasticsearch
class ELKMonitor:
def __init__(self, config_file='monitoring_config.json'):
with open(config_file, 'r') as f:
self.config = json.load(f)
self.es = Elasticsearch(
[self.config['elasticsearch']['host']],
http_auth=(self.config['elasticsearch']['user'], self.config['elasticsearch']['password']),
verify_certs=True
)
self.alerts = []
self.metrics = {}
def check_cluster_health(self) -> Dict[str, Any]:
"""检查集群健康状态"""
try:
health = self.es.cluster.health()
health_status = health['status']
if health_status == 'red':
self.alerts.append({
'type': 'cluster_health',
'severity': 'critical',
'message': 'Cluster status is RED',
'details': health
})
elif health_status == 'yellow':
self.alerts.append({
'type': 'cluster_health',
'severity': 'warning',
'message': 'Cluster status is YELLOW',
'details': health
})
# 检查节点数量
if health['number_of_nodes'] < self.config['thresholds']['min_nodes']:
self.alerts.append({
'type': 'node_count',
'severity': 'critical',
'message': f"Node count ({health['number_of_nodes']}) below minimum ({self.config['thresholds']['min_nodes']})",
'current_nodes': health['number_of_nodes'],
'min_required': self.config['thresholds']['min_nodes']
})
return health
except Exception as e:
self.alerts.append({
'type': 'cluster_health_check_failed',
'severity': 'critical',
'message': f'Failed to check cluster health: {e}'
})
return {}
def check_node_stats(self) -> Dict[str, Any]:
"""检查节点统计信息"""
try:
stats = self.es.nodes.stats()
node_alerts = []
for node_id, node_stats in stats['nodes'].items():
node_name = node_stats['name']
# 检查JVM堆内存使用率
jvm_heap_percent = node_stats['jvm']['mem']['heap_used_percent']
if jvm_heap_percent > self.config['thresholds']['jvm_heap_percent']:
node_alerts.append({
'type': 'high_jvm_heap',
'severity': 'warning' if jvm_heap_percent < 90 else 'critical',
'node': node_name,
'message': f'High JVM heap usage: {jvm_heap_percent}%',
'current_percent': jvm_heap_percent,
'threshold': self.config['thresholds']['jvm_heap_percent']
})
# 检查磁盘使用率
fs_stats = node_stats.get('fs', {}).get('total', {})
if fs_stats:
disk_used_percent = (fs_stats['used_in_bytes'] / fs_stats['total_in_bytes']) * 100
if disk_used_percent > self.config['thresholds']['disk_usage_percent']:
node_alerts.append({
'type': 'high_disk_usage',
'severity': 'warning' if disk_used_percent < 90 else 'critical',
'node': node_name,
'message': f'High disk usage: {disk_used_percent:.1f}%',
'current_percent': disk_used_percent,
'threshold': self.config['thresholds']['disk_usage_percent']
})
# 检查CPU使用率
cpu_percent = node_stats.get('os', {}).get('cpu', {}).get('percent', 0)
if cpu_percent > self.config['thresholds']['cpu_percent']:
node_alerts.append({
'type': 'high_cpu_usage',
'severity': 'warning',
'node': node_name,
'message': f'High CPU usage: {cpu_percent}%',
'current_percent': cpu_percent,
'threshold': self.config['thresholds']['cpu_percent']
})
self.alerts.extend(node_alerts)
return stats
except Exception as e:
self.alerts.append({
'type': 'node_stats_check_failed',
'severity': 'warning',
'message': f'Failed to check node stats: {e}'
})
return {}
def check_index_health(self) -> Dict[str, Any]:
"""检查索引健康状态"""
try:
indices = self.es.cat.indices(format='json')
index_alerts = []
for index in indices:
index_name = index['index']
# 检查索引状态
if index['health'] == 'red':
index_alerts.append({
'type': 'index_red',
'severity': 'critical',
'index': index_name,
'message': f'Index {index_name} is in RED state'
})
elif index['health'] == 'yellow':
index_alerts.append({
'type': 'index_yellow',
'severity': 'warning',
'index': index_name,
'message': f'Index {index_name} is in YELLOW state'
})
# 检查索引大小
store_size = index.get('store.size', '0b')
if store_size.endswith('gb') or store_size.endswith('tb'):
size_value = float(store_size[:-2])
size_unit = store_size[-2:]
if size_unit == 'gb' and size_value > self.config['thresholds']['max_index_size_gb']:
index_alerts.append({
'type': 'large_index',
'severity': 'warning',
'index': index_name,
'message': f'Large index detected: {store_size}',
'current_size': store_size,
'threshold': f"{self.config['thresholds']['max_index_size_gb']}gb"
})
elif size_unit == 'tb':
index_alerts.append({
'type': 'very_large_index',
'severity': 'critical',
'index': index_name,
'message': f'Very large index detected: {store_size}',
'current_size': store_size
})
self.alerts.extend(index_alerts)
return {'indices': indices, 'total_indices': len(indices)}
except Exception as e:
self.alerts.append({
'type': 'index_health_check_failed',
'severity': 'warning',
'message': f'Failed to check index health: {e}'
})
return {}
def check_ingestion_rate(self) -> Dict[str, Any]:
"""检查数据摄取速率"""
try:
# 获取最近5分钟的文档数量
now = datetime.now()
five_minutes_ago = now - timedelta(minutes=5)
query = {
"query": {
"range": {
"@timestamp": {
"gte": five_minutes_ago.isoformat(),
"lte": now.isoformat()
}
}
},
"size": 0
}
result = self.es.search(index='*', body=query)
doc_count = result['hits']['total']['value']
docs_per_minute = doc_count / 5
# 检查摄取速率是否过低
if docs_per_minute < self.config['thresholds']['min_ingestion_rate_per_minute']:
self.alerts.append({
'type': 'low_ingestion_rate',
'severity': 'warning',
'message': f'Low ingestion rate: {docs_per_minute:.1f} docs/min',
'current_rate': docs_per_minute,
'threshold': self.config['thresholds']['min_ingestion_rate_per_minute']
})
return {
'docs_last_5_minutes': doc_count,
'docs_per_minute': docs_per_minute
}
except Exception as e:
self.alerts.append({
'type': 'ingestion_rate_check_failed',
'severity': 'warning',
'message': f'Failed to check ingestion rate: {e}'
})
return {}
def check_logstash_health(self) -> Dict[str, Any]:
"""检查Logstash健康状态"""
logstash_stats = []
for logstash_node in self.config['logstash']['nodes']:
try:
# 检查Logstash API
response = requests.get(
f"http://{logstash_node['host']}:{logstash_node['api_port']}/_node/stats",
timeout=10
)
if response.status_code == 200:
stats = response.json()
# 检查JVM堆内存
jvm_heap_percent = stats['jvm']['mem']['heap_used_percent']
if jvm_heap_percent > self.config['thresholds']['logstash_jvm_heap_percent']:
self.alerts.append({
'type': 'logstash_high_jvm_heap',
'severity': 'warning',
'node': logstash_node['host'],
'message': f'Logstash high JVM heap: {jvm_heap_percent}%',
'current_percent': jvm_heap_percent,
'threshold': self.config['thresholds']['logstash_jvm_heap_percent']
})
# 检查管道状态
pipelines = stats.get('pipelines', {})
for pipeline_id, pipeline_stats in pipelines.items():
events_out = pipeline_stats['events']['out']
events_in = pipeline_stats['events']['in']
if events_in > 0:
processing_ratio = events_out / events_in
if processing_ratio < 0.95: # 95%的处理率
self.alerts.append({
'type': 'logstash_low_processing_ratio',
'severity': 'warning',
'node': logstash_node['host'],
'pipeline': pipeline_id,
'message': f'Low processing ratio: {processing_ratio:.2%}',
'events_in': events_in,
'events_out': events_out
})
logstash_stats.append({
'node': logstash_node['host'],
'status': 'healthy',
'stats': stats
})
else:
self.alerts.append({
'type': 'logstash_api_error',
'severity': 'critical',
'node': logstash_node['host'],
'message': f'Logstash API returned status {response.status_code}'
})
except Exception as e:
self.alerts.append({
'type': 'logstash_connection_failed',
'severity': 'critical',
'node': logstash_node['host'],
'message': f'Failed to connect to Logstash: {e}'
})
return {'nodes': logstash_stats}
def check_kibana_health(self) -> Dict[str, Any]:
"""检查Kibana健康状态"""
kibana_stats = []
for kibana_node in self.config['kibana']['nodes']:
try:
# 检查Kibana状态API
response = requests.get(
f"http://{kibana_node['host']}:{kibana_node['port']}/api/status",
timeout=10
)
if response.status_code == 200:
status = response.json()
overall_status = status['status']['overall']['state']
if overall_status != 'green':
self.alerts.append({
'type': 'kibana_unhealthy',
'severity': 'warning' if overall_status == 'yellow' else 'critical',
'node': kibana_node['host'],
'message': f'Kibana status is {overall_status}',
'status_details': status
})
kibana_stats.append({
'node': kibana_node['host'],
'status': overall_status,
'details': status
})
else:
self.alerts.append({
'type': 'kibana_api_error',
'severity': 'critical',
'node': kibana_node['host'],
'message': f'Kibana API returned status {response.status_code}'
})
except Exception as e:
self.alerts.append({
'type': 'kibana_connection_failed',
'severity': 'critical',
'node': kibana_node['host'],
'message': f'Failed to connect to Kibana: {e}'
})
return {'nodes': kibana_stats}
def send_alerts(self):
"""发送告警通知"""
if not self.alerts:
return
# 按严重程度分组
critical_alerts = [a for a in self.alerts if a['severity'] == 'critical']
warning_alerts = [a for a in self.alerts if a['severity'] == 'warning']
if critical_alerts or warning_alerts:
self._send_email_alert(critical_alerts, warning_alerts)
self._send_slack_alert(critical_alerts, warning_alerts)
def _send_email_alert(self, critical_alerts: List, warning_alerts: List):
"""发送邮件告警"""
try:
email_config = self.config['alerting']['email']
msg = MimeMultipart()
msg['From'] = email_config['from']
msg['To'] = ', '.join(email_config['to'])
msg['Subject'] = f'ELK Stack Alert - {len(critical_alerts)} Critical, {len(warning_alerts)} Warning'
body = f"""ELK Stack Monitoring Alert
Timestamp: {datetime.now().isoformat()}
Critical Alerts ({len(critical_alerts)}):
"""
for alert in critical_alerts:
body += f"- {alert['message']}\n"
body += f"\nWarning Alerts ({len(warning_alerts)}):\n"
for alert in warning_alerts:
body += f"- {alert['message']}\n"
msg.attach(MimeText(body, 'plain'))
server = smtplib.SMTP(email_config['smtp_server'], email_config['smtp_port'])
server.starttls()
server.login(email_config['username'], email_config['password'])
server.send_message(msg)
server.quit()
print(f"Email alert sent to {email_config['to']}")
except Exception as e:
print(f"Failed to send email alert: {e}")
def _send_slack_alert(self, critical_alerts: List, warning_alerts: List):
"""发送Slack告警"""
try:
slack_config = self.config['alerting']['slack']
color = 'danger' if critical_alerts else 'warning'
payload = {
'channel': slack_config['channel'],
'username': 'ELK Monitor',
'icon_emoji': ':warning:',
'attachments': [{
'color': color,
'title': f'ELK Stack Alert - {len(critical_alerts)} Critical, {len(warning_alerts)} Warning',
'text': f'Monitoring detected {len(self.alerts)} issues',
'fields': [],
'ts': int(time.time())
}]
}
if critical_alerts:
payload['attachments'][0]['fields'].append({
'title': 'Critical Alerts',
'value': '\n'.join([a['message'] for a in critical_alerts[:5]]),
'short': False
})
if warning_alerts:
payload['attachments'][0]['fields'].append({
'title': 'Warning Alerts',
'value': '\n'.join([a['message'] for a in warning_alerts[:5]]),
'short': False
})
response = requests.post(slack_config['webhook_url'], json=payload)
if response.status_code == 200:
print("Slack alert sent successfully")
else:
print(f"Failed to send Slack alert: {response.status_code}")
except Exception as e:
print(f"Failed to send Slack alert: {e}")
def run_monitoring_cycle(self):
"""运行一次完整的监控周期"""
print(f"Starting monitoring cycle at {datetime.now()}")
# 清空之前的告警
self.alerts = []
# 执行各项检查
self.metrics['cluster_health'] = self.check_cluster_health()
self.metrics['node_stats'] = self.check_node_stats()
self.metrics['index_health'] = self.check_index_health()
self.metrics['ingestion_rate'] = self.check_ingestion_rate()
self.metrics['logstash_health'] = self.check_logstash_health()
self.metrics['kibana_health'] = self.check_kibana_health()
# 发送告警
self.send_alerts()
print(f"Monitoring cycle completed. Found {len(self.alerts)} alerts.")
return {
'timestamp': datetime.now().isoformat(),
'alerts': self.alerts,
'metrics': self.metrics
}
def start_continuous_monitoring(self, interval_minutes: int = 5):
"""启动持续监控"""
print(f"Starting continuous monitoring with {interval_minutes} minute intervals")
while True:
try:
self.run_monitoring_cycle()
time.sleep(interval_minutes * 60)
except KeyboardInterrupt:
print("Monitoring stopped by user")
break
except Exception as e:
print(f"Error in monitoring cycle: {e}")
time.sleep(60) # 等待1分钟后重试
# 监控配置文件示例
monitoring_config = {
"elasticsearch": {
"host": "localhost:9200",
"user": "elastic",
"password": "your_password"
},
"logstash": {
"nodes": [
{"host": "logstash1.example.com", "api_port": 9600},
{"host": "logstash2.example.com", "api_port": 9600}
]
},
"kibana": {
"nodes": [
{"host": "kibana1.example.com", "port": 5601},
{"host": "kibana2.example.com", "port": 5601}
]
},
"thresholds": {
"min_nodes": 3,
"jvm_heap_percent": 80,
"disk_usage_percent": 85,
"cpu_percent": 80,
"max_index_size_gb": 50,
"min_ingestion_rate_per_minute": 100,
"logstash_jvm_heap_percent": 75
},
"alerting": {
"email": {
"enabled": True,
"smtp_server": "smtp.gmail.com",
"smtp_port": 587,
"username": "your_email@gmail.com",
"password": "your_password",
"from": "elk-monitor@company.com",
"to": ["admin@company.com", "ops@company.com"]
},
"slack": {
"enabled": True,
"webhook_url": "https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK",
"channel": "#elk-alerts"
}
}
}
# 保存配置文件
with open('monitoring_config.json', 'w') as f:
json.dump(monitoring_config, f, indent=2)
# 使用示例
if __name__ == "__main__":
monitor = ELKMonitor()
# 运行单次监控
result = monitor.run_monitoring_cycle()
# 或启动持续监控
# monitor.start_continuous_monitoring(interval_minutes=5)
2. 备份和恢复策略
自动化备份脚本:
#!/usr/bin/env python3
# elk_backup.py
import json
import os
import time
import boto3
import subprocess
from datetime import datetime, timedelta
from typing import Dict, List, Any
from elasticsearch import Elasticsearch
class ELKBackupManager:
def __init__(self, config_file='backup_config.json'):
with open(config_file, 'r') as f:
self.config = json.load(f)
self.es = Elasticsearch(
[self.config['elasticsearch']['host']],
http_auth=(self.config['elasticsearch']['user'], self.config['elasticsearch']['password']),
verify_certs=True
)
# 初始化S3客户端(如果配置了)
if self.config.get('s3', {}).get('enabled', False):
self.s3_client = boto3.client(
's3',
aws_access_key_id=self.config['s3']['access_key'],
aws_secret_access_key=self.config['s3']['secret_key'],
region_name=self.config['s3']['region']
)
def create_snapshot_repository(self, repo_name: str, repo_config: Dict) -> bool:
"""创建快照仓库"""
try:
# 检查仓库是否已存在
try:
existing_repo = self.es.snapshot.get_repository(repository=repo_name)
print(f"Repository {repo_name} already exists")
return True
except:
pass
# 创建新仓库
self.es.snapshot.create_repository(
repository=repo_name,
body=repo_config
)
print(f"Created snapshot repository: {repo_name}")
return True
except Exception as e:
print(f"Failed to create repository {repo_name}: {e}")
return False
def create_snapshot(self, repo_name: str, snapshot_name: str, indices: List[str] = None) -> Dict[str, Any]:
"""创建快照"""
try:
snapshot_body = {
"ignore_unavailable": True,
"include_global_state": True,
"metadata": {
"created_by": "elk_backup_manager",
"created_at": datetime.now().isoformat(),
"description": f"Automated backup - {snapshot_name}"
}
}
if indices:
snapshot_body["indices"] = ",".join(indices)
# 创建快照
response = self.es.snapshot.create(
repository=repo_name,
snapshot=snapshot_name,
body=snapshot_body,
wait_for_completion=False
)
print(f"Started snapshot creation: {snapshot_name}")
return response
except Exception as e:
print(f"Failed to create snapshot {snapshot_name}: {e}")
return {}
def wait_for_snapshot_completion(self, repo_name: str, snapshot_name: str, timeout_minutes: int = 60) -> bool:
"""等待快照完成"""
start_time = time.time()
timeout_seconds = timeout_minutes * 60
while time.time() - start_time < timeout_seconds:
try:
snapshot_status = self.es.snapshot.status(
repository=repo_name,
snapshot=snapshot_name
)
if snapshot_status['snapshots']:
state = snapshot_status['snapshots'][0]['state']
if state == 'SUCCESS':
print(f"Snapshot {snapshot_name} completed successfully")
return True
elif state == 'FAILED':
print(f"Snapshot {snapshot_name} failed")
return False
elif state in ['IN_PROGRESS', 'STARTED']:
# 显示进度
stats = snapshot_status['snapshots'][0]['stats']
total_size = stats.get('total', {}).get('size_in_bytes', 0)
processed_size = stats.get('processed', {}).get('size_in_bytes', 0)
if total_size > 0:
progress = (processed_size / total_size) * 100
print(f"Snapshot progress: {progress:.1f}%")
time.sleep(30) # 等待30秒后再检查
else:
print(f"Unknown snapshot state: {state}")
time.sleep(10)
else:
print(f"No snapshot status found for {snapshot_name}")
time.sleep(10)
except Exception as e:
print(f"Error checking snapshot status: {e}")
time.sleep(10)
print(f"Snapshot {snapshot_name} timed out after {timeout_minutes} minutes")
return False
def list_snapshots(self, repo_name: str) -> List[Dict]:
"""列出快照"""
try:
response = self.es.snapshot.get(
repository=repo_name,
snapshot="*"
)
snapshots = response.get('snapshots', [])
# 按创建时间排序
snapshots.sort(key=lambda x: x['start_time'], reverse=True)
return snapshots
except Exception as e:
print(f"Failed to list snapshots: {e}")
return []
def delete_old_snapshots(self, repo_name: str, retention_days: int = 30) -> int:
"""删除旧快照"""
try:
snapshots = self.list_snapshots(repo_name)
cutoff_date = datetime.now() - timedelta(days=retention_days)
deleted_count = 0
for snapshot in snapshots:
# 解析快照创建时间
start_time_str = snapshot['start_time']
start_time = datetime.fromisoformat(start_time_str.replace('Z', '+00:00'))
if start_time < cutoff_date:
try:
self.es.snapshot.delete(
repository=repo_name,
snapshot=snapshot['snapshot']
)
print(f"Deleted old snapshot: {snapshot['snapshot']}")
deleted_count += 1
except Exception as e:
print(f"Failed to delete snapshot {snapshot['snapshot']}: {e}")
print(f"Deleted {deleted_count} old snapshots")
return deleted_count
except Exception as e:
print(f"Failed to delete old snapshots: {e}")
return 0
def restore_snapshot(self, repo_name: str, snapshot_name: str, indices: List[str] = None,
rename_pattern: str = None, rename_replacement: str = None) -> bool:
"""恢复快照"""
try:
restore_body = {
"ignore_unavailable": True,
"include_global_state": False
}
if indices:
restore_body["indices"] = ",".join(indices)
if rename_pattern and rename_replacement:
restore_body["rename_pattern"] = rename_pattern
restore_body["rename_replacement"] = rename_replacement
response = self.es.snapshot.restore(
repository=repo_name,
snapshot=snapshot_name,
body=restore_body,
wait_for_completion=True
)
print(f"Snapshot {snapshot_name} restored successfully")
return True
except Exception as e:
print(f"Failed to restore snapshot {snapshot_name}: {e}")
return False
def backup_kibana_objects(self, output_dir: str) -> bool:
"""备份Kibana对象"""
try:
os.makedirs(output_dir, exist_ok=True)
kibana_config = self.config['kibana']
kibana_url = f"http://{kibana_config['host']}:{kibana_config['port']}"
# 导出所有保存的对象
export_url = f"{kibana_url}/api/saved_objects/_export"
export_body = {
"type": [
"dashboard",
"visualization",
"search",
"index-pattern",
"config"
],
"includeReferencesDeep": True
}
import requests
response = requests.post(
export_url,
json=export_body,
headers={'Content-Type': 'application/json'},
timeout=300
)
if response.status_code == 200:
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
backup_file = os.path.join(output_dir, f'kibana_objects_{timestamp}.ndjson')
with open(backup_file, 'w') as f:
f.write(response.text)
print(f"Kibana objects backed up to: {backup_file}")
return True
else:
print(f"Failed to export Kibana objects: {response.status_code}")
return False
except Exception as e:
print(f"Failed to backup Kibana objects: {e}")
return False
def backup_logstash_configs(self, output_dir: str) -> bool:
"""备份Logstash配置"""
try:
os.makedirs(output_dir, exist_ok=True)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
backup_file = os.path.join(output_dir, f'logstash_configs_{timestamp}.tar.gz')
# 压缩Logstash配置目录
logstash_config_dir = self.config['logstash']['config_dir']
subprocess.run([
'tar', '-czf', backup_file, '-C',
os.path.dirname(logstash_config_dir),
os.path.basename(logstash_config_dir)
], check=True)
print(f"Logstash configs backed up to: {backup_file}")
return True
except Exception as e:
print(f"Failed to backup Logstash configs: {e}")
return False
def upload_to_s3(self, local_file: str, s3_key: str) -> bool:
"""上传文件到S3"""
if not self.config.get('s3', {}).get('enabled', False):
return True
try:
bucket_name = self.config['s3']['bucket']
self.s3_client.upload_file(
local_file,
bucket_name,
s3_key
)
print(f"Uploaded {local_file} to s3://{bucket_name}/{s3_key}")
return True
except Exception as e:
print(f"Failed to upload to S3: {e}")
return False
def run_full_backup(self) -> Dict[str, Any]:
"""运行完整备份"""
backup_result = {
'timestamp': datetime.now().isoformat(),
'elasticsearch_snapshot': False,
'kibana_objects': False,
'logstash_configs': False,
's3_upload': False,
'errors': []
}
try:
# 1. 创建Elasticsearch快照
repo_name = self.config['elasticsearch']['snapshot_repository']
snapshot_name = f"backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
# 创建快照仓库(如果不存在)
repo_config = self.config['elasticsearch']['repository_config']
if self.create_snapshot_repository(repo_name, repo_config):
# 创建快照
if self.create_snapshot(repo_name, snapshot_name):
# 等待快照完成
if self.wait_for_snapshot_completion(repo_name, snapshot_name):
backup_result['elasticsearch_snapshot'] = True
# 清理旧快照
retention_days = self.config['elasticsearch'].get('retention_days', 30)
self.delete_old_snapshots(repo_name, retention_days)
else:
backup_result['errors'].append('Elasticsearch snapshot timed out')
else:
backup_result['errors'].append('Failed to create Elasticsearch snapshot')
else:
backup_result['errors'].append('Failed to create snapshot repository')
# 2. 备份Kibana对象
backup_dir = self.config['backup']['local_directory']
if self.backup_kibana_objects(backup_dir):
backup_result['kibana_objects'] = True
else:
backup_result['errors'].append('Failed to backup Kibana objects')
# 3. 备份Logstash配置
if self.backup_logstash_configs(backup_dir):
backup_result['logstash_configs'] = True
else:
backup_result['errors'].append('Failed to backup Logstash configs')
# 4. 上传到S3(如果配置了)
if self.config.get('s3', {}).get('enabled', False):
s3_success = True
# 上传Kibana备份
for file in os.listdir(backup_dir):
if file.startswith('kibana_objects_') or file.startswith('logstash_configs_'):
local_file = os.path.join(backup_dir, file)
s3_key = f"elk-backups/{datetime.now().strftime('%Y/%m/%d')}/{file}"
if not self.upload_to_s3(local_file, s3_key):
s3_success = False
backup_result['errors'].append(f'Failed to upload {file} to S3')
backup_result['s3_upload'] = s3_success
else:
backup_result['s3_upload'] = True # 未配置S3,视为成功
except Exception as e:
backup_result['errors'].append(f'Backup process failed: {e}')
# 生成备份报告
self._generate_backup_report(backup_result)
return backup_result
def _generate_backup_report(self, backup_result: Dict[str, Any]):
"""生成备份报告"""
report = f"""ELK Stack Backup Report
========================
Timestamp: {backup_result['timestamp']}
Backup Status:
- Elasticsearch Snapshot: {'✓' if backup_result['elasticsearch_snapshot'] else '✗'}
- Kibana Objects: {'✓' if backup_result['kibana_objects'] else '✗'}
- Logstash Configs: {'✓' if backup_result['logstash_configs'] else '✗'}
- S3 Upload: {'✓' if backup_result['s3_upload'] else '✗'}
Errors:
"""
if backup_result['errors']:
for error in backup_result['errors']:
report += f"- {error}\n"
else:
report += "No errors occurred.\n"
# 保存报告
report_dir = self.config['backup']['local_directory']
os.makedirs(report_dir, exist_ok=True)
report_file = os.path.join(
report_dir,
f"backup_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
)
with open(report_file, 'w') as f:
f.write(report)
print(f"Backup report saved to: {report_file}")
print(report)
# 备份配置文件示例
backup_config = {
"elasticsearch": {
"host": "localhost:9200",
"user": "elastic",
"password": "your_password",
"snapshot_repository": "backup_repository",
"repository_config": {
"type": "fs",
"settings": {
"location": "/opt/elasticsearch/backups",
"compress": True
}
},
"retention_days": 30
},
"kibana": {
"host": "localhost",
"port": 5601
},
"logstash": {
"config_dir": "/etc/logstash/conf.d"
},
"backup": {
"local_directory": "/opt/elk-backups"
},
"s3": {
"enabled": True,
"bucket": "my-elk-backups",
"access_key": "your_access_key",
"secret_key": "your_secret_key",
"region": "us-west-2"
}
}
# 保存配置文件
with open('backup_config.json', 'w') as f:
json.dump(backup_config, f, indent=2)
# 使用示例
if __name__ == "__main__":
backup_manager = ELKBackupManager()
# 运行完整备份
result = backup_manager.run_full_backup()
if all([result['elasticsearch_snapshot'], result['kibana_objects'], result['logstash_configs']]):
print("Backup completed successfully!")
else:
print("Backup completed with errors. Check the report for details.")
实战案例分析
案例1:电商平台日志分析系统
业务场景: - 日均订单量:100万+ - 日志类型:应用日志、访问日志、错误日志、性能日志 - 数据量:每日500GB+ - 查询需求:实时监控、业务分析、故障排查
架构设计:
# 电商平台ELK架构
apiVersion: v1
kind: ConfigMap
metadata:
name: ecommerce-elk-config
data:
elasticsearch.yml: |
cluster.name: ecommerce-logs
node.name: ${HOSTNAME}
network.host: 0.0.0.0
discovery.seed_hosts: ["es-master-1", "es-master-2", "es-master-3"]
cluster.initial_master_nodes: ["es-master-1", "es-master-2", "es-master-3"]
# 索引模板配置
index.number_of_shards: 3
index.number_of_replicas: 1
index.refresh_interval: 30s
# 内存设置
indices.memory.index_buffer_size: 20%
indices.memory.min_index_buffer_size: 96mb
# 查询缓存
indices.queries.cache.size: 20%
logstash.conf: |
input {
# 应用日志
beats {
port => 5044
type => "application"
}
# Nginx访问日志
beats {
port => 5045
type => "nginx"
}
# 数据库日志
beats {
port => 5046
type => "database"
}
}
filter {
# 应用日志处理
if [type] == "application" {
grok {
match => {
"message" => "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} \[%{DATA:thread}\] %{DATA:logger} - %{GREEDYDATA:message}"
}
}
# 解析JSON格式的应用日志
if [message] =~ /^\{.*\}$/ {
json {
source => "message"
}
}
# 提取订单信息
if [message] =~ /order/ {
grok {
match => {
"message" => "order_id:%{DATA:order_id} user_id:%{DATA:user_id} amount:%{NUMBER:amount:float}"
}
}
}
# 提取支付信息
if [message] =~ /payment/ {
grok {
match => {
"message" => "payment_id:%{DATA:payment_id} method:%{DATA:payment_method} status:%{DATA:payment_status}"
}
}
}
}
# Nginx访问日志处理
if [type] == "nginx" {
grok {
match => {
"message" => "%{NGINXACCESS}"
}
}
# 解析用户代理
useragent {
source => "agent"
}
# GeoIP解析
geoip {
source => "clientip"
target => "geoip"
}
# 计算响应时间分类
if [response_time] {
mutate {
convert => { "response_time" => "float" }
}
if [response_time] < 0.1 {
mutate { add_field => { "response_category" => "fast" } }
} else if [response_time] < 1.0 {
mutate { add_field => { "response_category" => "normal" } }
} else {
mutate { add_field => { "response_category" => "slow" } }
}
}
}
# 数据库日志处理
if [type] == "database" {
grok {
match => {
"message" => "%{TIMESTAMP_ISO8601:timestamp} %{DATA:query_type} %{DATA:table} %{NUMBER:duration:float}ms"
}
}
# 慢查询标记
if [duration] and [duration] > 1000 {
mutate {
add_field => { "slow_query" => "true" }
add_tag => [ "slow_query" ]
}
}
}
# 通用字段处理
date {
match => [ "timestamp", "ISO8601" ]
}
# 添加环境标签
mutate {
add_field => {
"environment" => "${ENVIRONMENT:production}"
"datacenter" => "${DATACENTER:dc1}"
}
}
# 移除不需要的字段
mutate {
remove_field => [ "host", "agent", "ecs", "log", "input" ]
}
}
output {
# 根据日志类型分发到不同索引
if [type] == "application" {
elasticsearch {
hosts => ["${ELASTICSEARCH_HOSTS}"]
index => "app-logs-%{+YYYY.MM.dd}"
template_name => "app-logs"
template_pattern => "app-logs-*"
template => "/etc/logstash/templates/app-logs.json"
}
} else if [type] == "nginx" {
elasticsearch {
hosts => ["${ELASTICSEARCH_HOSTS}"]
index => "nginx-logs-%{+YYYY.MM.dd}"
template_name => "nginx-logs"
template_pattern => "nginx-logs-*"
template => "/etc/logstash/templates/nginx-logs.json"
}
} else if [type] == "database" {
elasticsearch {
hosts => ["${ELASTICSEARCH_HOSTS}"]
index => "db-logs-%{+YYYY.MM.dd}"
template_name => "db-logs"
template_pattern => "db-logs-*"
template => "/etc/logstash/templates/db-logs.json"
}
}
# 错误日志额外发送到告警索引
if [level] == "ERROR" or [response] >= 500 {
elasticsearch {
hosts => ["${ELASTICSEARCH_HOSTS}"]
index => "alerts-%{+YYYY.MM.dd}"
}
}
}
kibana.yml: |
server.name: ecommerce-kibana
server.host: "0.0.0.0"
elasticsearch.hosts: ["http://elasticsearch:9200"]
# 安全配置
xpack.security.enabled: true
xpack.security.encryptionKey: "your-encryption-key-here"
# 监控配置
xpack.monitoring.enabled: true
# 告警配置
xpack.actions.enabled: true
性能优化配置:
#!/usr/bin/env python3
# ecommerce_optimization.py
import json
from elasticsearch import Elasticsearch
def setup_ecommerce_templates(es_client):
"""设置电商平台索引模板"""
# 应用日志模板
app_logs_template = {
"index_patterns": ["app-logs-*"],
"settings": {
"number_of_shards": 3,
"number_of_replicas": 1,
"refresh_interval": "30s",
"index.codec": "best_compression",
"index.mapping.total_fields.limit": 2000,
"index.max_result_window": 50000
},
"mappings": {
"properties": {
"@timestamp": {"type": "date"},
"level": {"type": "keyword"},
"logger": {"type": "keyword"},
"thread": {"type": "keyword"},
"message": {
"type": "text",
"analyzer": "standard",
"fields": {
"keyword": {"type": "keyword", "ignore_above": 256}
}
},
"order_id": {"type": "keyword"},
"user_id": {"type": "keyword"},
"amount": {"type": "float"},
"payment_id": {"type": "keyword"},
"payment_method": {"type": "keyword"},
"payment_status": {"type": "keyword"},
"environment": {"type": "keyword"},
"datacenter": {"type": "keyword"}
}
}
}
# Nginx访问日志模板
nginx_logs_template = {
"index_patterns": ["nginx-logs-*"],
"settings": {
"number_of_shards": 5,
"number_of_replicas": 1,
"refresh_interval": "10s",
"index.codec": "best_compression"
},
"mappings": {
"properties": {
"@timestamp": {"type": "date"},
"clientip": {"type": "ip"},
"request": {
"type": "text",
"fields": {
"keyword": {"type": "keyword", "ignore_above": 256}
}
},
"response": {"type": "short"},
"bytes": {"type": "long"},
"response_time": {"type": "float"},
"response_category": {"type": "keyword"},
"referrer": {"type": "keyword"},
"useragent": {
"properties": {
"device": {"type": "keyword"},
"name": {"type": "keyword"},
"os": {"type": "keyword"},
"version": {"type": "keyword"}
}
},
"geoip": {
"properties": {
"country_name": {"type": "keyword"},
"city_name": {"type": "keyword"},
"location": {"type": "geo_point"}
}
}
}
}
}
# 数据库日志模板
db_logs_template = {
"index_patterns": ["db-logs-*"],
"settings": {
"number_of_shards": 2,
"number_of_replicas": 1,
"refresh_interval": "60s"
},
"mappings": {
"properties": {
"@timestamp": {"type": "date"},
"query_type": {"type": "keyword"},
"table": {"type": "keyword"},
"duration": {"type": "float"},
"slow_query": {"type": "boolean"}
}
}
}
# 创建模板
templates = {
"app-logs": app_logs_template,
"nginx-logs": nginx_logs_template,
"db-logs": db_logs_template
}
for template_name, template_body in templates.items():
try:
es_client.indices.put_template(
name=template_name,
body=template_body
)
print(f"Created template: {template_name}")
except Exception as e:
print(f"Failed to create template {template_name}: {e}")
def setup_ilm_policies(es_client):
"""设置索引生命周期管理策略"""
# 应用日志ILM策略
app_logs_policy = {
"policy": {
"phases": {
"hot": {
"actions": {
"rollover": {
"max_size": "30gb",
"max_age": "1d"
},
"set_priority": {"priority": 100}
}
},
"warm": {
"min_age": "3d",
"actions": {
"allocate": {"number_of_replicas": 0},
"forcemerge": {"max_num_segments": 1},
"set_priority": {"priority": 50}
}
},
"cold": {
"min_age": "30d",
"actions": {
"allocate": {"number_of_replicas": 0},
"set_priority": {"priority": 0}
}
},
"delete": {
"min_age": "90d",
"actions": {"delete": {}}
}
}
}
}
# Nginx日志ILM策略(保留时间更短)
nginx_logs_policy = {
"policy": {
"phases": {
"hot": {
"actions": {
"rollover": {
"max_size": "50gb",
"max_age": "1d"
},
"set_priority": {"priority": 100}
}
},
"warm": {
"min_age": "1d",
"actions": {
"allocate": {"number_of_replicas": 0},
"forcemerge": {"max_num_segments": 1},
"set_priority": {"priority": 50}
}
},
"cold": {
"min_age": "7d",
"actions": {
"allocate": {"number_of_replicas": 0},
"set_priority": {"priority": 0}
}
},
"delete": {
"min_age": "30d",
"actions": {"delete": {}}
}
}
}
}
# 创建ILM策略
policies = {
"app-logs-policy": app_logs_policy,
"nginx-logs-policy": nginx_logs_policy
}
for policy_name, policy_body in policies.items():
try:
es_client.ilm.put_lifecycle(
policy=policy_name,
body=policy_body
)
print(f"Created ILM policy: {policy_name}")
except Exception as e:
print(f"Failed to create ILM policy {policy_name}: {e}")
# 使用示例
if __name__ == "__main__":
es = Elasticsearch(['localhost:9200'])
print("Setting up ecommerce ELK configuration...")
setup_ecommerce_templates(es)
setup_ilm_policies(es)
print("Configuration completed!")
关键指标监控:
#!/usr/bin/env python3
# ecommerce_monitoring.py
import json
from datetime import datetime, timedelta
from elasticsearch import Elasticsearch
class EcommerceMonitoring:
def __init__(self, es_host='localhost:9200'):
self.es = Elasticsearch([es_host])
def get_order_metrics(self, time_range='1h'):
"""获取订单相关指标"""
query = {
"query": {
"bool": {
"must": [
{"exists": {"field": "order_id"}},
{
"range": {
"@timestamp": {
"gte": f"now-{time_range}"
}
}
}
]
}
},
"aggs": {
"total_orders": {"cardinality": {"field": "order_id"}},
"total_amount": {"sum": {"field": "amount"}},
"avg_amount": {"avg": {"field": "amount"}},
"orders_by_hour": {
"date_histogram": {
"field": "@timestamp",
"calendar_interval": "1h"
},
"aggs": {
"order_count": {"cardinality": {"field": "order_id"}},
"revenue": {"sum": {"field": "amount"}}
}
}
},
"size": 0
}
result = self.es.search(index='app-logs-*', body=query)
return result['aggregations']
def get_performance_metrics(self, time_range='1h'):
"""获取性能指标"""
query = {
"query": {
"range": {
"@timestamp": {
"gte": f"now-{time_range}"
}
}
},
"aggs": {
"avg_response_time": {"avg": {"field": "response_time"}},
"95th_response_time": {
"percentiles": {
"field": "response_time",
"percents": [95]
}
},
"response_categories": {
"terms": {"field": "response_category"}
},
"error_rate": {
"filters": {
"filters": {
"errors": {"range": {"response": {"gte": 400}}},
"total": {"match_all": {}}
}
}
}
},
"size": 0
}
result = self.es.search(index='nginx-logs-*', body=query)
return result['aggregations']
def get_error_analysis(self, time_range='1h'):
"""获取错误分析"""
query = {
"query": {
"bool": {
"must": [
{
"range": {
"@timestamp": {
"gte": f"now-{time_range}"
}
}
}
],
"should": [
{"term": {"level": "ERROR"}},
{"range": {"response": {"gte": 400}}}
],
"minimum_should_match": 1
}
},
"aggs": {
"error_types": {
"terms": {"field": "level"}
},
"http_errors": {
"terms": {"field": "response"}
},
"error_timeline": {
"date_histogram": {
"field": "@timestamp",
"calendar_interval": "5m"
}
}
},
"size": 10,
"sort": [{"@timestamp": {"order": "desc"}}]
}
result = self.es.search(index=['app-logs-*', 'nginx-logs-*'], body=query)
return {
'aggregations': result['aggregations'],
'recent_errors': result['hits']['hits']
}
def generate_dashboard_data(self):
"""生成仪表板数据"""
return {
'order_metrics': self.get_order_metrics(),
'performance_metrics': self.get_performance_metrics(),
'error_analysis': self.get_error_analysis(),
'timestamp': datetime.now().isoformat()
}
# 使用示例
if __name__ == "__main__":
monitor = EcommerceMonitoring()
dashboard_data = monitor.generate_dashboard_data()
print(json.dumps(dashboard_data, indent=2, default=str))
案例2:金融风控日志分析
业务场景: - 实时交易监控 - 异常行为检测 - 合规审计要求 - 高可用性要求
核心配置:
# 金融风控ELK配置
apiVersion: v1
kind: ConfigMap
metadata:
name: fintech-elk-config
data:
logstash-security.conf: |
input {
# 交易日志
kafka {
bootstrap_servers => "kafka1:9092,kafka2:9092,kafka3:9092"
topics => ["transaction-logs"]
group_id => "elk-consumer"
codec => json
type => "transaction"
}
# 用户行为日志
kafka {
bootstrap_servers => "kafka1:9092,kafka2:9092,kafka3:9092"
topics => ["user-behavior"]
group_id => "elk-consumer"
codec => json
type => "behavior"
}
# 系统安全日志
kafka {
bootstrap_servers => "kafka1:9092,kafka2:9092,kafka3:9092"
topics => ["security-logs"]
group_id => "elk-consumer"
codec => json
type => "security"
}
}
filter {
# 交易日志处理
if [type] == "transaction" {
# 敏感数据脱敏
mutate {
gsub => [
"card_number", "(\d{4})(\d{8})(\d{4})", "\1****\3",
"account_number", "(\d{4})(\d+)(\d{4})", "\1****\3"
]
}
# 风险评分计算
ruby {
code => "
amount = event.get('amount').to_f
hour = Time.parse(event.get('@timestamp')).hour
risk_score = 0
risk_score += 30 if amount > 10000
risk_score += 20 if hour < 6 || hour > 22
risk_score += 15 if event.get('country') != 'CN'
event.set('risk_score', risk_score)
if risk_score > 50
event.set('risk_level', 'high')
event.tag('high_risk')
elsif risk_score > 30
event.set('risk_level', 'medium')
else
event.set('risk_level', 'low')
end
"
}
}
# 用户行为分析
if [type] == "behavior" {
# 异常登录检测
if [event_type] == "login" {
# 检查IP地理位置变化
geoip {
source => "ip_address"
target => "geoip"
}
# 检查设备指纹
fingerprint {
source => ["user_agent", "screen_resolution", "timezone"]
target => "device_fingerprint"
}
}
# 频率限制检查
throttle {
before_count => 10
after_count => 5
period => 60
key => "%{user_id}"
add_tag => [ "frequent_activity" ]
}
}
# 安全事件处理
if [type] == "security" {
# 威胁情报匹配
translate {
field => "ip_address"
destination => "threat_intel"
dictionary_path => "/etc/logstash/threat_intel.yml"
fallback => "clean"
}
# 安全等级分类
if [event_type] == "failed_login" {
mutate { add_field => { "security_level" => "warning" } }
}
if [event_type] == "privilege_escalation" {
mutate { add_field => { "security_level" => "critical" } }
}
}
# 合规性标记
mutate {
add_field => {
"compliance_required" => "true"
"retention_period" => "7_years"
"data_classification" => "confidential"
}
}
}
output {
# 正常日志输出
elasticsearch {
hosts => ["${ELASTICSEARCH_HOSTS}"]
index => "%{type}-logs-%{+YYYY.MM.dd}"
# 安全配置
ssl => true
ssl_certificate_verification => true
user => "${ELASTICSEARCH_USER}"
password => "${ELASTICSEARCH_PASSWORD}"
}
# 高风险交易实时告警
if "high_risk" in [tags] {
http {
url => "${ALERT_WEBHOOK_URL}"
http_method => "post"
format => "json"
mapping => {
"alert_type" => "high_risk_transaction"
"user_id" => "%{user_id}"
"amount" => "%{amount}"
"risk_score" => "%{risk_score}"
"timestamp" => "%{@timestamp}"
}
}
}
# 安全事件告警
if [security_level] == "critical" {
http {
url => "${SECURITY_WEBHOOK_URL}"
http_method => "post"
format => "json"
}
}
# 合规审计日志
file {
path => "/var/log/compliance/audit-%{+YYYY-MM-dd}.log"
codec => json_lines
}
}
总结
本章详细介绍了ELK Stack的最佳实践与案例分析,涵盖了以下核心内容:
技术亮点
分层架构设计
- 完整的容量规划方法
- 性能优化索引模板
- 查询优化策略和工具
查询优化
- 智能查询分析器
- 性能基准测试
- 自动化优化建议
运维管理
- 全面的监控告警系统
- 自动化备份恢复策略
- 多渠道告警通知
实战案例
- 电商平台日志分析系统
- 金融风控日志分析
- 行业最佳实践
实践价值
- 性能提升:通过查询优化和架构设计,显著提升系统性能
- 运维效率:自动化监控和备份,降低运维成本
- 业务价值:实时分析和告警,支持业务决策
- 合规保障:完整的审计和数据保护机制
下一步学习
在下一章中,我们将学习ELK Stack的未来发展与新特性,包括: - Elastic Stack 8.x新特性 - 云原生部署策略 - AI/ML集成应用 - 边缘计算场景 - 技术发展趋势
通过本章的学习,您已经掌握了ELK Stack的最佳实践和实战经验,能够设计和实施企业级的日志分析解决方案。