目录
微服务架构集成
1. 微服务日志聚合架构
graph TB
subgraph "微服务集群"
A[API Gateway] --> B[User Service]
A --> C[Order Service]
A --> D[Payment Service]
A --> E[Notification Service]
B --> F[User DB]
C --> G[Order DB]
D --> H[Payment DB]
end
subgraph "日志收集层"
I[Filebeat Agent]
J[Metricbeat Agent]
K[APM Agent]
end
subgraph "消息队列"
L[Kafka/RabbitMQ]
end
subgraph "ELK Stack"
M[Logstash Cluster]
N[Elasticsearch Cluster]
O[Kibana]
end
B --> I
C --> I
D --> I
E --> I
B --> J
C --> J
D --> J
E --> J
B --> K
C --> K
D --> K
E --> K
I --> L
J --> L
K --> L
L --> M
M --> N
N --> O
2. 分布式追踪集成
Jaeger集成配置:
# docker-compose.yml
version: '3.8'
services:
jaeger:
image: jaegertracing/all-in-one:latest
ports:
- "16686:16686"
- "14268:14268"
environment:
- COLLECTOR_OTLP_ENABLED=true
- SPAN_STORAGE_TYPE=elasticsearch
- ES_SERVER_URLS=http://elasticsearch:9200
depends_on:
- elasticsearch
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.11.0
environment:
- discovery.type=single-node
- xpack.security.enabled=false
ports:
- "9200:9200"
kibana:
image: docker.elastic.co/kibana/kibana:8.11.0
ports:
- "5601:5601"
environment:
- ELASTICSEARCH_HOSTS=http://elasticsearch:9200
depends_on:
- elasticsearch
logstash:
image: docker.elastic.co/logstash/logstash:8.11.0
volumes:
- ./logstash/pipeline:/usr/share/logstash/pipeline
- ./logstash/config:/usr/share/logstash/config
ports:
- "5044:5044"
depends_on:
- elasticsearch
Spring Boot微服务集成:
// pom.xml
<dependencies>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-api</artifactId>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-exporter-jaeger</artifactId>
</dependency>
<dependency>
<groupId>net.logstash.logback</groupId>
<artifactId>logstash-logback-encoder</artifactId>
<version>7.4</version>
</dependency>
</dependencies>
// TracingConfiguration.java
@Configuration
public class TracingConfiguration {
@Bean
public OpenTelemetry openTelemetry() {
return OpenTelemetrySdk.builder()
.setTracerProvider(
SdkTracerProvider.builder()
.addSpanProcessor(BatchSpanProcessor.builder(
JaegerGrpcSpanExporter.builder()
.setEndpoint("http://jaeger:14250")
.build())
.build())
.setResource(Resource.getDefault()
.merge(Resource.create(
Attributes.of(ResourceAttributes.SERVICE_NAME, "user-service"))))
.build())
.build();
}
}
// UserController.java
@RestController
@RequestMapping("/api/users")
public class UserController {
private static final Logger logger = LoggerFactory.getLogger(UserController.class);
private final Tracer tracer;
public UserController(OpenTelemetry openTelemetry) {
this.tracer = openTelemetry.getTracer("user-service");
}
@GetMapping("/{id}")
public ResponseEntity<User> getUser(@PathVariable Long id) {
Span span = tracer.spanBuilder("get-user")
.setAttribute("user.id", id)
.startSpan();
try (Scope scope = span.makeCurrent()) {
logger.info("Getting user with id: {}", id);
// 业务逻辑
User user = userService.findById(id);
span.setStatus(StatusCode.OK);
return ResponseEntity.ok(user);
} catch (Exception e) {
span.setStatus(StatusCode.ERROR, e.getMessage());
logger.error("Error getting user: {}", e.getMessage(), e);
throw e;
} finally {
span.end();
}
}
}
Logback配置:
<!-- logback-spring.xml -->
<configuration>
<include resource="org/springframework/boot/logging/logback/defaults.xml"/>
<springProfile name="!local">
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder class="net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder">
<providers>
<timestamp/>
<logLevel/>
<loggerName/>
<message/>
<mdc/>
<arguments/>
<stackTrace/>
<pattern>
<pattern>
{
"service": "user-service",
"version": "${app.version:-unknown}",
"environment": "${spring.profiles.active:-unknown}",
"trace_id": "%X{traceId:-}",
"span_id": "%X{spanId:-}"
}
</pattern>
</pattern>
</providers>
</encoder>
</appender>
</springProfile>
<springProfile name="local">
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level [%X{traceId:-},%X{spanId:-}] %logger{36} - %msg%n</pattern>
</encoder>
</appender>
</springProfile>
<root level="INFO">
<appender-ref ref="STDOUT"/>
</root>
</configuration>
3. 服务网格集成
Istio + ELK集成:
# istio-telemetry.yaml
apiVersion: telemetry.istio.io/v1alpha1
kind: Telemetry
metadata:
name: default
namespace: istio-system
spec:
accessLogging:
- providers:
- name: otel
- providers:
- name: elasticsearch
metrics:
- providers:
- name: prometheus
tracing:
- providers:
- name: jaeger
---
apiVersion: install.istio.io/v1alpha1
kind: IstioOperator
metadata:
name: control-plane
spec:
meshConfig:
extensionProviders:
- name: elasticsearch
envoyFileAccessLog:
service: elasticsearch.logging.svc.cluster.local
port: 9200
logFormat:
labels:
custom_label: "istio-access-log"
- name: jaeger
zipkin:
service: jaeger-collector.istio-system.svc.cluster.local
port: 9411
Envoy访问日志配置:
{
"access_log": [
{
"name": "envoy.access_loggers.http_grpc",
"typed_config": {
"@type": "type.googleapis.com/envoy.extensions.access_loggers.grpc.v3.HttpGrpcAccessLogConfig",
"common_config": {
"log_name": "istio-access-log",
"grpc_service": {
"envoy_grpc": {
"cluster_name": "logstash-cluster"
}
}
},
"additional_request_headers_to_log": [
"x-request-id",
"x-trace-id",
"user-agent"
],
"additional_response_headers_to_log": [
"x-response-time"
]
}
}
]
}
云平台部署
1. AWS部署架构
# aws-elk-stack.yaml
AWSTemplateFormatVersion: '2010-09-09'
Description: 'ELK Stack on AWS'
Parameters:
VpcId:
Type: AWS::EC2::VPC::Id
Description: VPC ID for ELK deployment
SubnetIds:
Type: List<AWS::EC2::Subnet::Id>
Description: Subnet IDs for ELK deployment
InstanceType:
Type: String
Default: t3.large
Description: EC2 instance type
Resources:
# Elasticsearch Service
ElasticsearchDomain:
Type: AWS::Elasticsearch::Domain
Properties:
DomainName: !Sub '${AWS::StackName}-elasticsearch'
ElasticsearchVersion: '7.10'
ElasticsearchClusterConfig:
InstanceType: !Ref InstanceType
InstanceCount: 3
DedicatedMasterEnabled: true
MasterInstanceType: t3.small
MasterInstanceCount: 3
EBSOptions:
EBSEnabled: true
VolumeType: gp3
VolumeSize: 100
VPCOptions:
SubnetIds: !Ref SubnetIds
SecurityGroupIds:
- !Ref ElasticsearchSecurityGroup
AccessPolicies:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
AWS: '*'
Action: 'es:*'
Resource: !Sub 'arn:aws:es:${AWS::Region}:${AWS::AccountId}:domain/${AWS::StackName}-elasticsearch/*'
EncryptionAtRestOptions:
Enabled: true
NodeToNodeEncryptionOptions:
Enabled: true
DomainEndpointOptions:
EnforceHTTPS: true
# Kibana ALB
KibanaLoadBalancer:
Type: AWS::ElasticLoadBalancingV2::LoadBalancer
Properties:
Name: !Sub '${AWS::StackName}-kibana-alb'
Type: application
Scheme: internet-facing
Subnets: !Ref SubnetIds
SecurityGroups:
- !Ref KibanaSecurityGroup
# Logstash Auto Scaling Group
LogstashLaunchTemplate:
Type: AWS::EC2::LaunchTemplate
Properties:
LaunchTemplateName: !Sub '${AWS::StackName}-logstash'
LaunchTemplateData:
ImageId: ami-0abcdef1234567890 # Amazon Linux 2
InstanceType: !Ref InstanceType
SecurityGroupIds:
- !Ref LogstashSecurityGroup
UserData:
Fn::Base64: !Sub |
#!/bin/bash
yum update -y
yum install -y docker
service docker start
usermod -a -G docker ec2-user
# Install Logstash
docker run -d \
--name logstash \
--restart unless-stopped \
-p 5044:5044 \
-e ELASTICSEARCH_HOSTS=${ElasticsearchDomain.DomainEndpoint} \
docker.elastic.co/logstash/logstash:8.11.0
LogstashAutoScalingGroup:
Type: AWS::AutoScaling::AutoScalingGroup
Properties:
AutoScalingGroupName: !Sub '${AWS::StackName}-logstash-asg'
LaunchTemplate:
LaunchTemplateId: !Ref LogstashLaunchTemplate
Version: !GetAtt LogstashLaunchTemplate.LatestVersionNumber
MinSize: 2
MaxSize: 10
DesiredCapacity: 3
VPCZoneIdentifier: !Ref SubnetIds
TargetGroupARNs:
- !Ref LogstashTargetGroup
# Security Groups
ElasticsearchSecurityGroup:
Type: AWS::EC2::SecurityGroup
Properties:
GroupDescription: Security group for Elasticsearch
VpcId: !Ref VpcId
SecurityGroupIngress:
- IpProtocol: tcp
FromPort: 443
ToPort: 443
SourceSecurityGroupId: !Ref KibanaSecurityGroup
- IpProtocol: tcp
FromPort: 443
ToPort: 443
SourceSecurityGroupId: !Ref LogstashSecurityGroup
KibanaSecurityGroup:
Type: AWS::EC2::SecurityGroup
Properties:
GroupDescription: Security group for Kibana
VpcId: !Ref VpcId
SecurityGroupIngress:
- IpProtocol: tcp
FromPort: 80
ToPort: 80
CidrIp: 0.0.0.0/0
- IpProtocol: tcp
FromPort: 443
ToPort: 443
CidrIp: 0.0.0.0/0
LogstashSecurityGroup:
Type: AWS::EC2::SecurityGroup
Properties:
GroupDescription: Security group for Logstash
VpcId: !Ref VpcId
SecurityGroupIngress:
- IpProtocol: tcp
FromPort: 5044
ToPort: 5044
CidrIp: 10.0.0.0/8
Outputs:
ElasticsearchEndpoint:
Description: Elasticsearch domain endpoint
Value: !GetAtt ElasticsearchDomain.DomainEndpoint
Export:
Name: !Sub '${AWS::StackName}-elasticsearch-endpoint'
KibanaURL:
Description: Kibana URL
Value: !Sub 'https://${ElasticsearchDomain.DomainEndpoint}/_plugin/kibana/'
Export:
Name: !Sub '${AWS::StackName}-kibana-url'
2. Kubernetes部署
Helm Chart配置:
# values.yaml
elasticsearch:
enabled: true
replicas: 3
minimumMasterNodes: 2
esConfig:
elasticsearch.yml: |
cluster.name: "elasticsearch"
network.host: 0.0.0.0
discovery.seed_hosts: "elasticsearch-master-headless"
cluster.initial_master_nodes: "elasticsearch-master-0,elasticsearch-master-1,elasticsearch-master-2"
# Security
xpack.security.enabled: true
xpack.security.transport.ssl.enabled: true
xpack.security.transport.ssl.verification_mode: certificate
xpack.security.transport.ssl.keystore.path: /usr/share/elasticsearch/config/certs/elastic-certificates.p12
xpack.security.transport.ssl.truststore.path: /usr/share/elasticsearch/config/certs/elastic-certificates.p12
# Monitoring
xpack.monitoring.collection.enabled: true
resources:
requests:
cpu: "1000m"
memory: "2Gi"
limits:
cpu: "2000m"
memory: "4Gi"
volumeClaimTemplate:
accessModes: ["ReadWriteOnce"]
storageClassName: "fast-ssd"
resources:
requests:
storage: 100Gi
kibana:
enabled: true
replicas: 2
kibanaConfig:
kibana.yml: |
server.host: 0.0.0.0
elasticsearch.hosts: ["https://elasticsearch-master:9200"]
elasticsearch.username: "kibana_system"
elasticsearch.password: "${ELASTICSEARCH_PASSWORD}"
# Security
xpack.security.enabled: true
xpack.security.encryptionKey: "${KIBANA_ENCRYPTION_KEY}"
# Monitoring
xpack.monitoring.ui.container.elasticsearch.enabled: true
resources:
requests:
cpu: "500m"
memory: "1Gi"
limits:
cpu: "1000m"
memory: "2Gi"
service:
type: LoadBalancer
port: 5601
logstash:
enabled: true
replicas: 3
logstashConfig:
logstash.yml: |
http.host: 0.0.0.0
xpack.monitoring.elasticsearch.hosts: ["https://elasticsearch-master:9200"]
xpack.monitoring.elasticsearch.username: "logstash_system"
xpack.monitoring.elasticsearch.password: "${ELASTICSEARCH_PASSWORD}"
logstashPipeline:
logstash.conf: |
input {
beats {
port => 5044
}
http {
port => 8080
}
}
filter {
if [fields][log_type] == "application" {
grok {
match => { "message" => "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{GREEDYDATA:message}" }
}
date {
match => [ "timestamp", "ISO8601" ]
}
}
}
output {
elasticsearch {
hosts => ["https://elasticsearch-master:9200"]
user => "logstash_writer"
password => "${ELASTICSEARCH_PASSWORD}"
index => "logs-%{+YYYY.MM.dd}"
}
}
resources:
requests:
cpu: "500m"
memory: "1Gi"
limits:
cpu: "1000m"
memory: "2Gi"
service:
type: ClusterIP
ports:
- name: beats
port: 5044
protocol: TCP
- name: http
port: 8080
protocol: TCP
filebeat:
enabled: true
daemonset:
enabled: true
filebeatConfig:
filebeat.yml: |
filebeat.inputs:
- type: container
paths:
- /var/log/containers/*.log
processors:
- add_kubernetes_metadata:
host: ${NODE_NAME}
matchers:
- logs_path:
logs_path: "/var/log/containers/"
output.logstash:
hosts: ["logstash:5044"]
setup.kibana:
host: "kibana:5601"
resources:
requests:
cpu: "100m"
memory: "100Mi"
limits:
cpu: "200m"
memory: "200Mi"
metricbeat:
enabled: true
daemonset:
enabled: true
metricbeatConfig:
metricbeat.yml: |
metricbeat.modules:
- module: system
metricsets:
- cpu
- load
- memory
- network
- process
- process_summary
enabled: true
period: 10s
processes: ['.*']
- module: kubernetes
metricsets:
- node
- system
- pod
- container
- volume
enabled: true
period: 10s
host: ${NODE_NAME}
hosts: ["https://${NODE_NAME}:10250"]
output.elasticsearch:
hosts: ["https://elasticsearch-master:9200"]
username: "metricbeat_writer"
password: "${ELASTICSEARCH_PASSWORD}"
index: "metricbeat-%{+yyyy.MM.dd}"
resources:
requests:
cpu: "100m"
memory: "100Mi"
limits:
cpu: "200m"
memory: "200Mi"
部署脚本:
#!/bin/bash
# deploy-elk-k8s.sh
set -e
NAMESPACE="elk-stack"
RELEASE_NAME="elk"
CHART_VERSION="8.11.0"
# 创建命名空间
kubectl create namespace $NAMESPACE --dry-run=client -o yaml | kubectl apply -f -
# 创建密钥
kubectl create secret generic elasticsearch-credentials \
--from-literal=username=elastic \
--from-literal=password=$(openssl rand -base64 32) \
--namespace=$NAMESPACE
kubectl create secret generic kibana-encryption-key \
--from-literal=encryptionkey=$(openssl rand -base64 32) \
--namespace=$NAMESPACE
# 添加Elastic Helm仓库
helm repo add elastic https://helm.elastic.co
helm repo update
# 部署Elasticsearch
helm upgrade --install elasticsearch elastic/elasticsearch \
--version=$CHART_VERSION \
--namespace=$NAMESPACE \
--values=elasticsearch-values.yaml \
--wait
# 部署Kibana
helm upgrade --install kibana elastic/kibana \
--version=$CHART_VERSION \
--namespace=$NAMESPACE \
--values=kibana-values.yaml \
--wait
# 部署Logstash
helm upgrade --install logstash elastic/logstash \
--version=$CHART_VERSION \
--namespace=$NAMESPACE \
--values=logstash-values.yaml \
--wait
# 部署Filebeat
helm upgrade --install filebeat elastic/filebeat \
--version=$CHART_VERSION \
--namespace=$NAMESPACE \
--values=filebeat-values.yaml \
--wait
# 部署Metricbeat
helm upgrade --install metricbeat elastic/metricbeat \
--version=$CHART_VERSION \
--namespace=$NAMESPACE \
--values=metricbeat-values.yaml \
--wait
echo "ELK Stack deployed successfully!"
echo "Kibana URL: http://$(kubectl get svc kibana-kibana -n $NAMESPACE -o jsonpath='{.status.loadBalancer.ingress[0].ip}'):5601"
echo "Elasticsearch URL: http://$(kubectl get svc elasticsearch-master -n $NAMESPACE -o jsonpath='{.spec.clusterIP}'):9200"
第三方工具集成
1. Prometheus集成
Prometheus配置:
# prometheus.yml
global:
scrape_interval: 15s
evaluation_interval: 15s
rule_files:
- "elk_rules.yml"
scrape_configs:
- job_name: 'elasticsearch'
static_configs:
- targets: ['elasticsearch:9200']
metrics_path: /_prometheus/metrics
scrape_interval: 30s
- job_name: 'kibana'
static_configs:
- targets: ['kibana:5601']
metrics_path: /api/status
scrape_interval: 30s
- job_name: 'logstash'
static_configs:
- targets: ['logstash:9600']
metrics_path: /_node/stats
scrape_interval: 30s
remote_write:
- url: "http://elasticsearch:9200/_prometheus/api/v1/write"
queue_config:
max_samples_per_send: 1000
max_shards: 200
capacity: 2500
Elasticsearch Exporter:
# elasticsearch-exporter.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: elasticsearch-exporter
namespace: monitoring
spec:
replicas: 1
selector:
matchLabels:
app: elasticsearch-exporter
template:
metadata:
labels:
app: elasticsearch-exporter
spec:
containers:
- name: elasticsearch-exporter
image: quay.io/prometheuscommunity/elasticsearch-exporter:latest
args:
- '--es.uri=http://elasticsearch:9200'
- '--es.all'
- '--es.indices'
- '--es.indices_settings'
- '--es.shards'
- '--es.snapshots'
- '--es.timeout=30s'
ports:
- containerPort: 9114
name: http
resources:
requests:
cpu: 100m
memory: 128Mi
limits:
cpu: 200m
memory: 256Mi
---
apiVersion: v1
kind: Service
metadata:
name: elasticsearch-exporter
namespace: monitoring
labels:
app: elasticsearch-exporter
spec:
ports:
- port: 9114
targetPort: 9114
name: http
selector:
app: elasticsearch-exporter
2. Grafana集成
Grafana数据源配置:
{
"datasources": [
{
"name": "Elasticsearch",
"type": "elasticsearch",
"access": "proxy",
"url": "http://elasticsearch:9200",
"database": "logs-*",
"basicAuth": false,
"isDefault": false,
"jsonData": {
"timeField": "@timestamp",
"esVersion": "8.0.0",
"maxConcurrentShardRequests": 5,
"logMessageField": "message",
"logLevelField": "level"
}
},
{
"name": "Prometheus",
"type": "prometheus",
"access": "proxy",
"url": "http://prometheus:9090",
"isDefault": true,
"jsonData": {
"timeInterval": "15s"
}
}
]
}
ELK监控仪表板:
{
"dashboard": {
"title": "ELK Stack Monitoring",
"tags": ["elk", "monitoring"],
"timezone": "browser",
"panels": [
{
"title": "Elasticsearch Cluster Health",
"type": "stat",
"targets": [
{
"expr": "elasticsearch_cluster_health_status",
"legendFormat": "Cluster Status"
}
],
"fieldConfig": {
"defaults": {
"mappings": [
{"options": {"0": {"text": "Red", "color": "red"}}},
{"options": {"1": {"text": "Yellow", "color": "yellow"}}},
{"options": {"2": {"text": "Green", "color": "green"}}}
]
}
}
},
{
"title": "Elasticsearch Nodes",
"type": "stat",
"targets": [
{
"expr": "elasticsearch_cluster_health_number_of_nodes",
"legendFormat": "Total Nodes"
},
{
"expr": "elasticsearch_cluster_health_number_of_data_nodes",
"legendFormat": "Data Nodes"
}
]
},
{
"title": "Indexing Rate",
"type": "graph",
"targets": [
{
"expr": "rate(elasticsearch_indices_indexing_index_total[5m])",
"legendFormat": "{{node}}"
}
]
},
{
"title": "Search Rate",
"type": "graph",
"targets": [
{
"expr": "rate(elasticsearch_indices_search_query_total[5m])",
"legendFormat": "{{node}}"
}
]
},
{
"title": "JVM Heap Usage",
"type": "graph",
"targets": [
{
"expr": "elasticsearch_jvm_memory_used_bytes{area=\"heap\"} / elasticsearch_jvm_memory_max_bytes{area=\"heap\"} * 100",
"legendFormat": "{{node}}"
}
],
"yAxes": [
{
"unit": "percent",
"max": 100
}
]
},
{
"title": "Disk Usage",
"type": "graph",
"targets": [
{
"expr": "elasticsearch_filesystem_data_used_bytes / elasticsearch_filesystem_data_size_bytes * 100",
"legendFormat": "{{node}}"
}
]
}
]
}
}
3. Slack集成
Slack通知配置:
{
"webhook_url": "https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK",
"channel": "#elk-alerts",
"username": "ELK-Bot",
"icon_emoji": ":warning:",
"templates": {
"error_alert": {
"color": "danger",
"title": "ELK Stack Alert",
"fields": [
{
"title": "Alert Type",
"value": "{{alert_type}}",
"short": true
},
{
"title": "Severity",
"value": "{{severity}}",
"short": true
},
{
"title": "Message",
"value": "{{message}}",
"short": false
},
{
"title": "Timestamp",
"value": "{{timestamp}}",
"short": true
}
]
}
}
}
Logstash Slack输出插件:
# logstash-slack-output.conf
filter {
if [level] == "ERROR" or [level] == "FATAL" {
mutate {
add_tag => [ "alert" ]
}
}
}
output {
if "alert" in [tags] {
slack {
url => "https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK"
channel => "#alerts"
username => "Logstash"
icon_emoji => ":exclamation:"
format => "Error detected: %{message} from %{host} at %{@timestamp}"
}
}
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "logs-%{+YYYY.MM.dd}"
}
}
机器学习应用
1. 异常检测
ML Job配置:
{
"job_id": "log_anomaly_detection",
"description": "检测日志中的异常模式",
"analysis_config": {
"bucket_span": "15m",
"detectors": [
{
"detector_description": "count by level",
"function": "count",
"by_field_name": "level.keyword"
},
{
"detector_description": "rare message",
"function": "rare",
"by_field_name": "message.keyword"
},
{
"detector_description": "high response time",
"function": "high_mean",
"field_name": "response_time",
"over_field_name": "service.keyword"
}
],
"influencers": [
"level.keyword",
"service.keyword",
"host.name"
]
},
"data_description": {
"time_field": "@timestamp",
"time_format": "epoch_ms"
},
"model_plot_config": {
"enabled": true,
"terms": "level.keyword,service.keyword"
},
"analysis_limits": {
"model_memory_limit": "256mb",
"categorization_examples_limit": 4
},
"datafeed_config": {
"datafeed_id": "datafeed-log_anomaly_detection",
"indices": ["logs-*"],
"query": {
"bool": {
"must": [
{
"range": {
"@timestamp": {
"gte": "now-30d"
}
}
}
]
}
},
"scroll_size": 1000,
"frequency": "150s",
"query_delay": "60s"
}
}
异常检测API使用:
# ml_anomaly_detection.py
import requests
import json
from datetime import datetime, timedelta
class ElasticsearchMLClient:
def __init__(self, host, username, password):
self.host = host
self.auth = (username, password)
self.headers = {'Content-Type': 'application/json'}
def create_ml_job(self, job_config):
"""创建ML作业"""
url = f"{self.host}/_ml/anomaly_detectors/{job_config['job_id']}"
response = requests.put(url,
json=job_config,
auth=self.auth,
headers=self.headers)
return response.json()
def start_datafeed(self, datafeed_id, start_time=None):
"""启动数据馈送"""
url = f"{self.host}/_ml/datafeeds/{datafeed_id}/_start"
params = {}
if start_time:
params['start'] = start_time
response = requests.post(url,
params=params,
auth=self.auth,
headers=self.headers)
return response.json()
def get_anomalies(self, job_id, start_time, end_time, anomaly_score_threshold=50):
"""获取异常检测结果"""
url = f"{self.host}/_ml/anomaly_detectors/{job_id}/results/records"
query = {
"query": {
"bool": {
"must": [
{
"range": {
"timestamp": {
"gte": start_time,
"lte": end_time
}
}
},
{
"range": {
"record_score": {
"gte": anomaly_score_threshold
}
}
}
]
}
},
"sort": [
{
"record_score": {
"order": "desc"
}
}
]
}
response = requests.post(url,
json=query,
auth=self.auth,
headers=self.headers)
return response.json()
def send_alert(self, anomaly):
"""发送异常告警"""
alert_message = {
"timestamp": anomaly['timestamp'],
"job_id": anomaly['job_id'],
"record_score": anomaly['record_score'],
"typical": anomaly.get('typical', []),
"actual": anomaly.get('actual', []),
"field_name": anomaly.get('field_name', ''),
"by_field_value": anomaly.get('by_field_value', '')
}
# 发送到Slack或其他告警系统
slack_webhook = "https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK"
slack_message = {
"text": f"异常检测告警: {anomaly.get('field_name', '')} 异常分数: {anomaly['record_score']}",
"attachments": [
{
"color": "danger",
"fields": [
{
"title": "作业ID",
"value": anomaly['job_id'],
"short": True
},
{
"title": "时间",
"value": anomaly['timestamp'],
"short": True
},
{
"title": "异常分数",
"value": str(anomaly['record_score']),
"short": True
}
]
}
]
}
requests.post(slack_webhook, json=slack_message)
# 使用示例
if __name__ == "__main__":
client = ElasticsearchMLClient(
host="http://localhost:9200",
username="elastic",
password="your_password"
)
# 获取最近1小时的异常
end_time = datetime.now()
start_time = end_time - timedelta(hours=1)
anomalies = client.get_anomalies(
job_id="log_anomaly_detection",
start_time=start_time.isoformat(),
end_time=end_time.isoformat(),
anomaly_score_threshold=75
)
# 处理异常并发送告警
for anomaly in anomalies.get('records', []):
if anomaly['record_score'] > 90:
client.send_alert(anomaly)
2. 日志分类
文本分类模型:
{
"job_id": "log_classification",
"description": "日志消息分类",
"analysis_config": {
"categorization_field_name": "message",
"bucket_span": "1h",
"detectors": [
{
"detector_description": "categorization",
"function": "count",
"by_field_name": "mlcategory"
}
],
"categorization_filters": [
"\\b\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\b",
"\\b\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}",
"\\b[A-Fa-f0-9]{8}-[A-Fa-f0-9]{4}-[A-Fa-f0-9]{4}-[A-Fa-f0-9]{4}-[A-Fa-f0-9]{12}\\b"
]
},
"data_description": {
"time_field": "@timestamp"
},
"analysis_limits": {
"model_memory_limit": "512mb",
"categorization_examples_limit": 10
}
}
安全分析
1. SIEM集成
安全事件检测规则:
{
"rules": [
{
"name": "Brute Force Attack Detection",
"description": "检测暴力破解攻击",
"query": {
"bool": {
"must": [
{
"term": {
"event.action": "login"
}
},
{
"term": {
"event.outcome": "failure"
}
}
],
"filter": [
{
"range": {
"@timestamp": {
"gte": "now-5m"
}
}
}
]
}
},
"aggregation": {
"terms": {
"field": "source.ip",
"min_doc_count": 10
}
},
"threshold": 10,
"severity": "high",
"actions": [
"block_ip",
"send_alert"
]
},
{
"name": "Privilege Escalation",
"description": "检测权限提升",
"query": {
"bool": {
"should": [
{
"match": {
"message": "sudo"
}
},
{
"match": {
"message": "su -"
}
},
{
"match": {
"process.name": "sudo"
}
}
],
"minimum_should_match": 1
}
},
"severity": "medium",
"actions": [
"log_event",
"send_notification"
]
},
{
"name": "Suspicious File Access",
"description": "检测可疑文件访问",
"query": {
"bool": {
"must": [
{
"term": {
"event.action": "file_access"
}
},
{
"bool": {
"should": [
{
"wildcard": {
"file.path": "/etc/passwd*"
}
},
{
"wildcard": {
"file.path": "/etc/shadow*"
}
},
{
"wildcard": {
"file.path": "/root/.ssh/*"
}
}
]
}
}
]
}
},
"severity": "critical",
"actions": [
"immediate_alert",
"block_user"
]
}
]
}
安全事件响应自动化:
# security_response.py
import requests
import json
from datetime import datetime
class SecurityEventHandler:
def __init__(self, elasticsearch_host, firewall_api, notification_api):
self.es_host = elasticsearch_host
self.firewall_api = firewall_api
self.notification_api = notification_api
def handle_brute_force_attack(self, source_ip, event_count):
"""处理暴力破解攻击"""
# 1. 记录安全事件
security_event = {
"@timestamp": datetime.utcnow().isoformat(),
"event": {
"type": "security",
"action": "brute_force_detected",
"severity": "high"
},
"source": {
"ip": source_ip
},
"threat": {
"technique": "T1110", # MITRE ATT&CK
"tactic": "Credential Access"
},
"event_count": event_count,
"response_actions": []
}
# 2. 阻止IP地址
if event_count > 20:
block_result = self.block_ip(source_ip)
security_event["response_actions"].append({
"action": "ip_blocked",
"result": block_result
})
# 3. 发送告警
alert_result = self.send_security_alert(
title="Brute Force Attack Detected",
message=f"IP {source_ip} attempted {event_count} failed logins",
severity="high"
)
security_event["response_actions"].append({
"action": "alert_sent",
"result": alert_result
})
# 4. 记录到Elasticsearch
self.log_security_event(security_event)
return security_event
def block_ip(self, ip_address):
"""阻止IP地址"""
try:
response = requests.post(
f"{self.firewall_api}/block",
json={"ip": ip_address, "duration": "1h"},
timeout=10
)
return {"success": True, "response": response.json()}
except Exception as e:
return {"success": False, "error": str(e)}
def send_security_alert(self, title, message, severity):
"""发送安全告警"""
try:
alert_data = {
"title": title,
"message": message,
"severity": severity,
"timestamp": datetime.utcnow().isoformat(),
"source": "ELK-SIEM"
}
response = requests.post(
f"{self.notification_api}/alert",
json=alert_data,
timeout=10
)
return {"success": True, "response": response.json()}
except Exception as e:
return {"success": False, "error": str(e)}
def log_security_event(self, event):
"""记录安全事件到Elasticsearch"""
try:
response = requests.post(
f"{self.es_host}/security-events/_doc",
json=event,
headers={"Content-Type": "application/json"}
)
return response.json()
except Exception as e:
print(f"Failed to log security event: {e}")
return None
2. 威胁情报集成
威胁情报丰富化:
# logstash-threat-intel.conf
filter {
# IP地址威胁情报查询
if [source][ip] {
translate {
source => "[source][ip]"
target => "[threat][reputation]"
dictionary_path => "/etc/logstash/threat_intel/ip_reputation.yml"
fallback => "unknown"
}
# 查询外部威胁情报API
http {
url => "https://api.threatintel.com/v1/ip/%{[source][ip]}"
headers => {
"Authorization" => "Bearer YOUR_API_KEY"
}
target_body => "[threat][intel]"
verb => "GET"
}
}
# 域名威胁情报查询
if [url][domain] {
translate {
source => "[url][domain]"
target => "[threat][domain_reputation]"
dictionary_path => "/etc/logstash/threat_intel/domain_reputation.yml"
fallback => "unknown"
}
}
# 文件哈希威胁情报查询
if [file][hash][sha256] {
http {
url => "https://api.virustotal.com/api/v3/files/%{[file][hash][sha256]}"
headers => {
"x-apikey" => "YOUR_VIRUSTOTAL_API_KEY"
}
target_body => "[threat][virustotal]"
verb => "GET"
}
}
# 威胁评分计算
ruby {
code => "
threat_score = 0
# IP威胁评分
if event.get('[threat][reputation]') == 'malicious'
threat_score += 50
elsif event.get('[threat][reputation]') == 'suspicious'
threat_score += 25
end
# 域名威胁评分
if event.get('[threat][domain_reputation]') == 'malicious'
threat_score += 30
end
# VirusTotal评分
vt_data = event.get('[threat][virustotal]')
if vt_data && vt_data['data'] && vt_data['data']['attributes']
malicious_count = vt_data['data']['attributes']['last_analysis_stats']['malicious']
if malicious_count && malicious_count > 0
threat_score += [malicious_count * 5, 40].min
end
end
event.set('[threat][score]', threat_score)
# 威胁等级
if threat_score >= 80
event.set('[threat][level]', 'critical')
elsif threat_score >= 60
event.set('[threat][level]', 'high')
elsif threat_score >= 40
event.set('[threat][level]', 'medium')
elsif threat_score >= 20
event.set('[threat][level]', 'low')
else
event.set('[threat][level]', 'info')
end
"
}
}
output {
# 高威胁事件立即告警
if [threat][level] == "critical" or [threat][level] == "high" {
http {
url => "https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK"
http_method => "post"
format => "json"
mapping => {
"text" => "🚨 High Threat Event Detected"
"attachments" => [{
"color" => "danger"
"fields" => [{
"title" => "Threat Level"
"value" => "%{[threat][level]}"
"short" => true
}, {
"title" => "Threat Score"
"value" => "%{[threat][score]}"
"short" => true
}, {
"title" => "Source IP"
"value" => "%{[source][ip]}"
"short" => true
}, {
"title" => "Timestamp"
"value" => "%{@timestamp}"
"short" => true
}]
}]
}
}
}
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "security-events-%{+YYYY.MM.dd}"
}
}
APM性能监控
1. APM Agent集成
Java APM Agent配置:
# elastic-apm-agent.properties
service_name=user-service
service_version=1.0.0
environment=production
server_urls=http://apm-server:8200
secret_token=your_secret_token
# 应用性能配置
transaction_sample_rate=1.0
capture_body=all
capture_headers=true
stack_trace_limit=50
span_frames_min_duration=5ms
# 日志集成
log_correlation_enabled=true
log_ecs_reformatting=true
# 指标配置
metrics_interval=30s
disable_metrics=system.cpu.total.norm.pct,system.memory.usage,system.memory.actual.free
# 分布式追踪
distributed_tracing_enabled=true
trace_methods=com.example.service.*
# 错误配置
capture_exceptions=true
ignore_exceptions=java.lang.IllegalArgumentException
Spring Boot集成:
// pom.xml
<dependency>
<groupId>co.elastic.apm</groupId>
<artifactId>apm-agent-attach</artifactId>
<version>1.34.1</version>
</dependency>
// Application.java
@SpringBootApplication
public class Application {
public static void main(String[] args) {
// 程序化启动APM Agent
ElasticApmAttacher.attach();
SpringApplication.run(Application.class, args);
}
}
// APMConfiguration.java
@Configuration
public class APMConfiguration {
@Bean
public ElasticApm elasticApm() {
return ElasticApm.get();
}
@EventListener
public void handleApplicationReady(ApplicationReadyEvent event) {
// 设置全局标签
ElasticApm.currentTransaction()
.setLabel("service.version", "1.0.0")
.setLabel("deployment.environment", "production");
}
}
// CustomMetrics.java
@Component
public class CustomMetrics {
private final Timer.Sample sample;
private final Counter errorCounter;
public CustomMetrics() {
this.errorCounter = ElasticApm.get()
.getMetricRegistry()
.counter("custom.errors.total");
}
@EventListener
public void handleError(ErrorEvent event) {
errorCounter.increment();
// 添加自定义span
Span span = ElasticApm.currentSpan()
.createSpan()
.setName("error.processing")
.setType("custom");
try {
span.setLabel("error.type", event.getErrorType());
span.setLabel("error.message", event.getMessage());
} finally {
span.end();
}
}
}
// TransactionInterceptor.java
@Component
public class TransactionInterceptor implements HandlerInterceptor {
@Override
public boolean preHandle(HttpServletRequest request,
HttpServletResponse response,
Object handler) {
Transaction transaction = ElasticApm.startTransaction();
transaction.setName(request.getMethod() + " " + request.getRequestURI());
transaction.setType(Transaction.TYPE_REQUEST);
// 添加用户信息
String userId = request.getHeader("X-User-ID");
if (userId != null) {
transaction.setUser(userId, null, null, null);
}
return true;
}
@Override
public void afterCompletion(HttpServletRequest request,
HttpServletResponse response,
Object handler, Exception ex) {
Transaction transaction = ElasticApm.currentTransaction();
if (transaction != null) {
transaction.setResult(String.valueOf(response.getStatus()));
if (ex != null) {
transaction.captureException(ex);
}
transaction.end();
}
}
}
2. APM Server配置
APM Server配置文件:
# apm-server.yml
apm-server:
host: "0.0.0.0:8200"
# 认证配置
auth:
secret_token: "your_secret_token"
api_key:
enabled: true
limit: 100
# 数据处理配置
max_request_size: 1048576
max_header_size: 1048576
idle_timeout: 45s
read_timeout: 30s
write_timeout: 30s
shutdown_timeout: 5s
# 采样配置
sampling:
keep_unsampled: false
# RUM配置
rum:
enabled: true
event_rate:
limit: 300
lru_size: 1000
allow_origins: ["*"]
allow_headers: ["*"]
library_pattern: "node_modules|bower_components|~"
exclude_from_grouping: "^/webpack"
# 数据流配置
data_streams:
enabled: true
namespace: "default"
# Elasticsearch输出配置
output.elasticsearch:
hosts: ["elasticsearch:9200"]
username: "apm_system"
password: "${ELASTICSEARCH_PASSWORD}"
# 索引配置
indices:
- index: "apm-%{[observer.version]}-sourcemap"
when.contains:
processor.event: "sourcemap"
- index: "apm-%{[observer.version]}-error-%{+yyyy.MM.dd}"
when.contains:
processor.event: "error"
- index: "apm-%{[observer.version]}-transaction-%{+yyyy.MM.dd}"
when.contains:
processor.event: "transaction"
- index: "apm-%{[observer.version]}-span-%{+yyyy.MM.dd}"
when.contains:
processor.event: "span"
- index: "apm-%{[observer.version]}-metric-%{+yyyy.MM.dd}"
when.contains:
processor.event: "metric"
# 模板配置
template:
enabled: true
pattern: "apm-*"
settings:
index:
number_of_shards: 1
number_of_replicas: 1
refresh_interval: "5s"
# 监控配置
monitoring:
enabled: true
elasticsearch:
hosts: ["elasticsearch:9200"]
username: "apm_system"
password: "${ELASTICSEARCH_PASSWORD}"
# 日志配置
logging:
level: info
to_files: true
files:
path: /var/log/apm-server
name: apm-server
keepfiles: 7
permissions: 0644
3. 性能监控仪表板
Kibana APM仪表板配置:
{
"version": "8.11.0",
"objects": [
{
"id": "apm-service-overview",
"type": "dashboard",
"attributes": {
"title": "APM Service Overview",
"hits": 0,
"description": "应用性能监控概览",
"panelsJSON": "[\n {\n \"version\": \"8.11.0\",\n \"gridData\": {\n \"x\": 0,\n \"y\": 0,\n \"w\": 24,\n \"h\": 15,\n \"i\": \"1\"\n },\n \"panelIndex\": \"1\",\n \"embeddableConfig\": {},\n \"panelRefName\": \"panel_1\"\n }\n]",
"timeRestore": false,
"timeTo": "now",
"timeFrom": "now-24h",
"refreshInterval": {
"pause": false,
"value": 10000
},
"kibanaSavedObjectMeta": {
"searchSourceJSON": "{\"query\":{\"query\":\"\",\"language\":\"kuery\"},\"filter\":[]}"
}
},
"references": [
{
"name": "panel_1",
"type": "visualization",
"id": "apm-transaction-overview"
}
]
},
{
"id": "apm-transaction-overview",
"type": "visualization",
"attributes": {
"title": "Transaction Overview",
"visState": "{\"title\":\"Transaction Overview\",\"type\":\"line\",\"params\":{\"grid\":{\"categoryLines\":false,\"style\":{\"color\":\"#eee\"}},\"categoryAxes\":[{\"id\":\"CategoryAxis-1\",\"type\":\"category\",\"position\":\"bottom\",\"show\":true,\"style\":{},\"scale\":{\"type\":\"linear\"},\"labels\":{\"show\":true,\"truncate\":100},\"title\":{}}],\"valueAxes\":[{\"id\":\"ValueAxis-1\",\"name\":\"LeftAxis-1\",\"type\":\"value\",\"position\":\"left\",\"show\":true,\"style\":{},\"scale\":{\"type\":\"linear\",\"mode\":\"normal\"},\"labels\":{\"show\":true,\"rotate\":0,\"filter\":false,\"truncate\":100},\"title\":{\"text\":\"Count\"}}],\"seriesParams\":[{\"show\":true,\"type\":\"line\",\"mode\":\"normal\",\"data\":{\"label\":\"Count\",\"id\":\"1\"},\"valueAxis\":\"ValueAxis-1\",\"drawLinesBetweenPoints\":true,\"showCircles\":true}],\"addTooltip\":true,\"addLegend\":true,\"legendPosition\":\"right\",\"times\":[],\"addTimeMarker\":false},\"aggs\":[{\"id\":\"1\",\"enabled\":true,\"type\":\"count\",\"schema\":\"metric\",\"params\":{}},{\"id\":\"2\",\"enabled\":true,\"type\":\"date_histogram\",\"schema\":\"segment\",\"params\":{\"field\":\"@timestamp\",\"interval\":\"auto\",\"customInterval\":\"2h\",\"min_doc_count\":1,\"extended_bounds\":{}}}]}",
"uiStateJSON": "{}",
"description": "",
"version": 1,
"kibanaSavedObjectMeta": {
"searchSourceJSON": "{\"index\":\"apm-*-transaction-*\",\"query\":{\"match_all\":{}},\"filter\":[]}"
}
}
}
]
}
数据管道优化
1. 数据流优化
Logstash管道优化配置:
# logstash.yml
node.name: logstash-node-1
path.data: /var/lib/logstash
path.config: /etc/logstash/conf.d
path.logs: /var/log/logstash
# 管道配置
pipeline:
workers: 4
batch:
size: 1000
delay: 50
# 队列配置
queue:
type: persisted
max_bytes: 1gb
checkpoint.writes: 1024
# JVM配置
config:
reload:
automatic: true
interval: 3s
# 监控配置
monitoring:
enabled: true
elasticsearch:
hosts: ["http://elasticsearch:9200"]
username: "logstash_system"
password: "${LOGSTASH_SYSTEM_PASSWORD}"
# 死信队列配置
dead_letter_queue:
enable: true
max_bytes: 1024mb
# 性能调优
log:
level: warn
api:
http:
host: 0.0.0.0
port: 9600
多管道配置:
# pipelines.yml
- pipeline.id: web-logs
path.config: "/etc/logstash/conf.d/web-logs.conf"
pipeline.workers: 2
pipeline.batch.size: 500
queue.type: persisted
queue.max_bytes: 512mb
- pipeline.id: app-logs
path.config: "/etc/logstash/conf.d/app-logs.conf"
pipeline.workers: 3
pipeline.batch.size: 1000
queue.type: persisted
queue.max_bytes: 1gb
- pipeline.id: security-logs
path.config: "/etc/logstash/conf.d/security-logs.conf"
pipeline.workers: 1
pipeline.batch.size: 100
queue.type: memory
- pipeline.id: metrics
path.config: "/etc/logstash/conf.d/metrics.conf"
pipeline.workers: 2
pipeline.batch.size: 2000
queue.type: persisted
queue.max_bytes: 2gb
2. 缓存优化
Redis缓存集成:
# logstash-redis-cache.conf
filter {
# IP地理位置缓存
if [source][ip] {
redis {
host => "redis"
port => 6379
db => 0
key => "geoip:%{[source][ip]}"
data_type => "get"
target => "[source][geo_cache]"
}
if ![source][geo_cache] {
geoip {
source => "[source][ip]"
target => "[source][geo]"
}
# 缓存地理位置信息
redis {
host => "redis"
port => 6379
db => 0
key => "geoip:%{[source][ip]}"
data_type => "set"
value => "%{[source][geo]}"
ttl => 86400 # 24小时过期
}
} else {
mutate {
add_field => { "[source][geo]" => "%{[source][geo_cache]}" }
}
}
}
# 用户代理缓存
if [user_agent][original] {
redis {
host => "redis"
port => 6379
db => 1
key => "useragent:%{[user_agent][original]}"
data_type => "get"
target => "[user_agent][cache]"
}
if ![user_agent][cache] {
useragent {
source => "[user_agent][original]"
target => "[user_agent]"
}
redis {
host => "redis"
port => 6379
db => 1
key => "useragent:%{[user_agent][original]}"
data_type => "set"
value => "%{[user_agent]}"
ttl => 604800 # 7天过期
}
} else {
mutate {
add_field => { "[user_agent]" => "%{[user_agent][cache]}" }
}
}
}
}
3. 数据压缩与归档
索引生命周期管理:
{
"policy": {
"phases": {
"hot": {
"min_age": "0ms",
"actions": {
"rollover": {
"max_size": "10gb",
"max_age": "1d",
"max_docs": 10000000
},
"set_priority": {
"priority": 100
}
}
},
"warm": {
"min_age": "1d",
"actions": {
"set_priority": {
"priority": 50
},
"allocate": {
"number_of_replicas": 0,
"include": {},
"exclude": {},
"require": {
"data_tier": "warm"
}
},
"forcemerge": {
"max_num_segments": 1
}
}
},
"cold": {
"min_age": "7d",
"actions": {
"set_priority": {
"priority": 0
},
"allocate": {
"number_of_replicas": 0,
"include": {},
"exclude": {},
"require": {
"data_tier": "cold"
}
}
}
},
"frozen": {
"min_age": "30d",
"actions": {
"searchable_snapshot": {
"snapshot_repository": "s3-repository"
}
}
},
"delete": {
"min_age": "365d",
"actions": {
"delete": {}
}
}
}
}
}
企业级解决方案
1. 高可用架构
多数据中心部署:
# elasticsearch-cluster.yml
cluster:
name: production-elk-cluster
initial_master_nodes:
- es-master-dc1-1
- es-master-dc1-2
- es-master-dc2-1
- es-master-dc2-2
- es-master-dc3-1
routing:
allocation:
awareness:
attributes: rack_id,zone
same_shard:
host: true
remote:
connect: true
node:
name: ${NODE_NAME}
roles: ["master", "data", "ingest"]
attr:
rack_id: ${RACK_ID}
zone: ${ZONE}
network:
host: 0.0.0.0
publish_host: ${PUBLISH_HOST}
discovery:
seed_hosts:
- es-master-dc1-1:9300
- es-master-dc1-2:9300
- es-master-dc2-1:9300
- es-master-dc2-2:9300
- es-master-dc3-1:9300
zen:
minimum_master_nodes: 3
gateway:
expected_nodes: 9
expected_master_nodes: 5
expected_data_nodes: 6
recover_after_time: 5m
path:
data: ["/data1/elasticsearch", "/data2/elasticsearch"]
logs: /var/log/elasticsearch
bootstrap:
memory_lock: true
xpack:
security:
enabled: true
transport:
ssl:
enabled: true
verification_mode: certificate
keystore:
path: certs/elastic-certificates.p12
truststore:
path: certs/elastic-certificates.p12
http:
ssl:
enabled: true
keystore:
path: certs/elastic-certificates.p12
monitoring:
collection:
enabled: true
ml:
enabled: true
watcher:
enabled: true
2. 灾难恢复
备份恢复策略:
#!/bin/bash
# backup-elk-cluster.sh
set -e
BACKUP_DATE=$(date +%Y%m%d_%H%M%S)
BACKUP_REPO="s3-backup-repo"
SNAPSHOT_NAME="elk-backup-${BACKUP_DATE}"
ES_HOST="https://elasticsearch:9200"
ES_USER="elastic"
ES_PASS="${ELASTICSEARCH_PASSWORD}"
# 创建快照仓库(如果不存在)
create_repository() {
echo "Creating snapshot repository..."
curl -X PUT "${ES_HOST}/_snapshot/${BACKUP_REPO}" \
-u "${ES_USER}:${ES_PASS}" \
-H "Content-Type: application/json" \
-d '{
"type": "s3",
"settings": {
"bucket": "elk-backups",
"region": "us-west-2",
"base_path": "elasticsearch-snapshots",
"compress": true,
"server_side_encryption": true
}
}'
}
# 创建快照
create_snapshot() {
echo "Creating snapshot: ${SNAPSHOT_NAME}"
curl -X PUT "${ES_HOST}/_snapshot/${BACKUP_REPO}/${SNAPSHOT_NAME}" \
-u "${ES_USER}:${ES_PASS}" \
-H "Content-Type: application/json" \
-d '{
"indices": "*",
"ignore_unavailable": true,
"include_global_state": true,
"metadata": {
"taken_by": "backup-script",
"taken_because": "scheduled backup"
}
}'
}
# 检查快照状态
check_snapshot_status() {
echo "Checking snapshot status..."
while true; do
STATUS=$(curl -s "${ES_HOST}/_snapshot/${BACKUP_REPO}/${SNAPSHOT_NAME}" \
-u "${ES_USER}:${ES_PASS}" | \
jq -r '.snapshots[0].state')
if [ "$STATUS" = "SUCCESS" ]; then
echo "Snapshot completed successfully"
break
elif [ "$STATUS" = "FAILED" ]; then
echo "Snapshot failed"
exit 1
else
echo "Snapshot in progress... Status: $STATUS"
sleep 30
fi
done
}
# 清理旧快照
cleanup_old_snapshots() {
echo "Cleaning up old snapshots..."
CUTOFF_DATE=$(date -d "30 days ago" +%Y%m%d)
curl -s "${ES_HOST}/_snapshot/${BACKUP_REPO}/_all" \
-u "${ES_USER}:${ES_PASS}" | \
jq -r '.snapshots[] | select(.start_time_in_millis < ('$(date -d "$CUTOFF_DATE" +%s)' * 1000)) | .snapshot' | \
while read -r old_snapshot; do
echo "Deleting old snapshot: $old_snapshot"
curl -X DELETE "${ES_HOST}/_snapshot/${BACKUP_REPO}/${old_snapshot}" \
-u "${ES_USER}:${ES_PASS}"
done
}
# 备份Kibana配置
backup_kibana_config() {
echo "Backing up Kibana configuration..."
curl -X POST "${ES_HOST}/.kibana/_search" \
-u "${ES_USER}:${ES_PASS}" \
-H "Content-Type: application/json" \
-d '{
"query": {"match_all": {}},
"size": 10000
}' > "/backup/kibana-config-${BACKUP_DATE}.json"
}
# 主执行流程
main() {
echo "Starting ELK backup process..."
create_repository
create_snapshot
check_snapshot_status
cleanup_old_snapshots
backup_kibana_config
echo "Backup process completed successfully"
# 发送通知
curl -X POST "https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK" \
-H "Content-Type: application/json" \
-d "{
\"text\": \"✅ ELK Cluster backup completed successfully\",
\"attachments\": [{
\"color\": \"good\",
\"fields\": [{
\"title\": \"Snapshot Name\",
\"value\": \"${SNAPSHOT_NAME}\",
\"short\": true
}, {
\"title\": \"Backup Date\",
\"value\": \"${BACKUP_DATE}\",
\"short\": true
}]
}]
}"
fi
main "$@"
3. 性能监控与告警
综合监控脚本:
# elk_monitoring.py
import requests
import json
import time
from datetime import datetime
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
class ELKMonitor:
def __init__(self, config):
self.config = config
self.es_host = config['elasticsearch']['host']
self.es_auth = (config['elasticsearch']['username'],
config['elasticsearch']['password'])
def check_cluster_health(self):
"""检查集群健康状态"""
try:
response = requests.get(
f"{self.es_host}/_cluster/health",
auth=self.es_auth,
timeout=10
)
health = response.json()
alerts = []
if health['status'] == 'red':
alerts.append({
'severity': 'critical',
'message': 'Cluster status is RED',
'details': health
})
elif health['status'] == 'yellow':
alerts.append({
'severity': 'warning',
'message': 'Cluster status is YELLOW',
'details': health
})
if health['number_of_nodes'] < self.config['thresholds']['min_nodes']:
alerts.append({
'severity': 'critical',
'message': f"Node count below threshold: {health['number_of_nodes']}",
'details': health
})
return alerts
except Exception as e:
return [{
'severity': 'critical',
'message': f'Failed to check cluster health: {str(e)}',
'details': {}
}]
def check_node_stats(self):
"""检查节点统计信息"""
try:
response = requests.get(
f"{self.es_host}/_nodes/stats",
auth=self.es_auth,
timeout=10
)
stats = response.json()
alerts = []
for node_id, node_stats in stats['nodes'].items():
node_name = node_stats['name']
# 检查JVM堆内存使用率
heap_used_percent = node_stats['jvm']['mem']['heap_used_percent']
if heap_used_percent > self.config['thresholds']['heap_usage']:
alerts.append({
'severity': 'warning',
'message': f'High heap usage on {node_name}: {heap_used_percent}%',
'details': node_stats['jvm']['mem']
})
# 检查磁盘使用率
for path, fs_stats in node_stats['fs']['data'].items():
disk_used_percent = (fs_stats['total_in_bytes'] - fs_stats['available_in_bytes']) / fs_stats['total_in_bytes'] * 100
if disk_used_percent > self.config['thresholds']['disk_usage']:
alerts.append({
'severity': 'warning',
'message': f'High disk usage on {node_name}: {disk_used_percent:.1f}%',
'details': fs_stats
})
return alerts
except Exception as e:
return [{
'severity': 'critical',
'message': f'Failed to check node stats: {str(e)}',
'details': {}
}]
def check_index_stats(self):
"""检查索引统计信息"""
try:
response = requests.get(
f"{self.es_host}/_cat/indices?format=json&bytes=b",
auth=self.es_auth,
timeout=10
)
indices = response.json()
alerts = []
for index in indices:
# 检查索引大小
if int(index['store.size']) > self.config['thresholds']['index_size']:
alerts.append({
'severity': 'info',
'message': f'Large index detected: {index["index"]} ({index["store.size"]} bytes)',
'details': index
})
# 检查分片状态
if index['health'] == 'red':
alerts.append({
'severity': 'critical',
'message': f'Index {index["index"]} is in RED state',
'details': index
})
return alerts
except Exception as e:
return [{
'severity': 'critical',
'message': f'Failed to check index stats: {str(e)}',
'details': {}
}]
def send_alert(self, alerts):
"""发送告警"""
if not alerts:
return
# 按严重程度分组
critical_alerts = [a for a in alerts if a['severity'] == 'critical']
warning_alerts = [a for a in alerts if a['severity'] == 'warning']
info_alerts = [a for a in alerts if a['severity'] == 'info']
# 构建告警消息
message = f"ELK Cluster Alert - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n"
if critical_alerts:
message += "🚨 CRITICAL ALERTS:\n"
for alert in critical_alerts:
message += f"- {alert['message']}\n"
message += "\n"
if warning_alerts:
message += "⚠️ WARNING ALERTS:\n"
for alert in warning_alerts:
message += f"- {alert['message']}\n"
message += "\n"
if info_alerts:
message += "ℹ️ INFO ALERTS:\n"
for alert in info_alerts:
message += f"- {alert['message']}\n"
# 发送邮件
if self.config.get('email', {}).get('enabled', False):
self.send_email_alert(message, critical_alerts)
# 发送Slack通知
if self.config.get('slack', {}).get('enabled', False):
self.send_slack_alert(message, critical_alerts)
def send_email_alert(self, message, critical_alerts):
"""发送邮件告警"""
try:
email_config = self.config['email']
msg = MIMEMultipart()
msg['From'] = email_config['from']
msg['To'] = ', '.join(email_config['to'])
msg['Subject'] = f"ELK Cluster Alert - {len(critical_alerts)} Critical Issues"
msg.attach(MIMEText(message, 'plain'))
server = smtplib.SMTP(email_config['smtp_host'], email_config['smtp_port'])
server.starttls()
server.login(email_config['username'], email_config['password'])
server.send_message(msg)
server.quit()
except Exception as e:
print(f"Failed to send email alert: {e}")
def send_slack_alert(self, message, critical_alerts):
"""发送Slack告警"""
try:
slack_config = self.config['slack']
color = "danger" if critical_alerts else "warning"
payload = {
"text": "ELK Cluster Alert",
"attachments": [{
"color": color,
"text": message,
"footer": "ELK Monitor",
"ts": int(time.time())
}]
}
requests.post(slack_config['webhook_url'], json=payload)
except Exception as e:
print(f"Failed to send Slack alert: {e}")
def run_monitoring(self):
"""运行监控检查"""
print(f"Starting ELK monitoring at {datetime.now()}")
all_alerts = []
# 检查集群健康状态
all_alerts.extend(self.check_cluster_health())
# 检查节点统计
all_alerts.extend(self.check_node_stats())
# 检查索引统计
all_alerts.extend(self.check_index_stats())
# 发送告警
if all_alerts:
self.send_alert(all_alerts)
print(f"Found {len(all_alerts)} alerts")
else:
print("No alerts found")
return all_alerts
# 配置文件示例
config = {
"elasticsearch": {
"host": "https://elasticsearch:9200",
"username": "elastic",
"password": "your_password"
},
"thresholds": {
"min_nodes": 3,
"heap_usage": 85,
"disk_usage": 85,
"index_size": 10737418240 # 10GB
},
"email": {
"enabled": True,
"smtp_host": "smtp.gmail.com",
"smtp_port": 587,
"username": "your_email@gmail.com",
"password": "your_password",
"from": "elk-monitor@company.com",
"to": ["admin@company.com"]
},
"slack": {
"enabled": True,
"webhook_url": "https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK"
}
}
if __name__ == "__main__":
monitor = ELKMonitor(config)
# 运行一次性检查
monitor.run_monitoring()
# 或者持续监控
# while True:
# monitor.run_monitoring()
# time.sleep(300) # 每5分钟检查一次
总结
本章详细介绍了ELK Stack的高级应用与集成,包括:
核心内容
- 微服务架构集成 - 分布式追踪、服务网格集成
- 云平台部署 - AWS、Kubernetes部署方案
- 第三方工具集成 - Prometheus、Grafana、Slack集成
- 机器学习应用 - 异常检测、日志分类
- 安全分析 - SIEM集成、威胁情报
- APM性能监控 - 应用性能监控集成
- 数据管道优化 - 缓存、压缩、归档策略
- 企业级解决方案 - 高可用、灾难恢复、监控告警
技术亮点
- 完整的微服务日志聚合架构
- 云原生部署最佳实践
- 智能化异常检测和告警
- 企业级安全分析能力
- 全方位性能监控体系
实践价值
- 提供生产级部署指导
- 实现智能化运维管理
- 构建完整的可观测性体系
- 满足企业级安全合规要求
下一章将学习ELK Stack的故障排除与性能调优,帮助您解决生产环境中的常见问题。