概述
随着大数据、云计算、人工智能等技术的快速发展,ELK Stack也在不断演进,推出了许多令人兴奋的新特性和功能。本章将深入探讨Elastic Stack 8.x的新特性、云原生部署策略、AI/ML集成应用、边缘计算场景以及技术发展趋势。
Elastic Stack 8.x 新特性
1. 安全性增强
默认启用安全功能:
# elasticsearch.yml - 8.x默认安全配置
xpack.security.enabled: true
xpack.security.enrollment.enabled: true
# 自动生成证书
xpack.security.http.ssl:
enabled: true
keystore.path: certs/http.p12
xpack.security.transport.ssl:
enabled: true
verification_mode: certificate
keystore.path: certs/transport.p12
truststore.path: certs/transport.p12
# API密钥认证
xpack.security.authc.api_key.enabled: true
增强的身份验证:
#!/usr/bin/env python3
# elastic_8x_security.py
import json
import requests
from elasticsearch import Elasticsearch
from elasticsearch.client import SecurityClient
class Elastic8xSecurity:
def __init__(self, host='localhost:9200', username='elastic', password=None):
self.host = host
self.username = username
self.password = password
# 使用API密钥认证
self.es = Elasticsearch(
[f'https://{host}'],
api_key=('api_key_id', 'api_key_secret'),
verify_certs=True,
ca_certs='/path/to/ca.crt'
)
self.security = SecurityClient(self.es)
def create_service_token(self, service_name: str, token_name: str) -> dict:
"""创建服务令牌(8.x新特性)"""
try:
response = self.security.create_service_token(
namespace='elastic',
service=service_name,
name=token_name
)
return {
'token': response['token']['value'],
'name': response['token']['name'],
'created': True
}
except Exception as e:
return {'error': str(e), 'created': False}
def setup_fleet_server_token(self) -> dict:
"""设置Fleet Server令牌"""
return self.create_service_token('fleet-server', 'fleet-server-token')
def setup_kibana_service_account(self) -> dict:
"""设置Kibana服务账户"""
try:
# 创建Kibana系统用户的服务令牌
response = self.security.create_service_token(
namespace='elastic',
service='kibana',
name='kibana-token'
)
return {
'service_token': response['token']['value'],
'setup_complete': True
}
except Exception as e:
return {'error': str(e), 'setup_complete': False}
def configure_cross_cluster_api_key(self, cluster_name: str) -> dict:
"""配置跨集群API密钥"""
try:
api_key_body = {
'name': f'cross-cluster-{cluster_name}',
'role_descriptors': {
'cross_cluster_search': {
'cluster': ['cross_cluster_search'],
'indices': [
{
'names': ['*'],
'privileges': ['read', 'read_cross_cluster']
}
]
}
},
'metadata': {
'cluster': cluster_name,
'purpose': 'cross_cluster_search'
}
}
response = self.security.create_api_key(body=api_key_body)
return {
'api_key_id': response['id'],
'api_key': response['api_key'],
'encoded': response['encoded'],
'cluster': cluster_name
}
except Exception as e:
return {'error': str(e)}
# 使用示例
if __name__ == "__main__":
security = Elastic8xSecurity()
# 创建Fleet Server令牌
fleet_token = security.setup_fleet_server_token()
print(f"Fleet Server Token: {fleet_token}")
# 设置Kibana服务账户
kibana_setup = security.setup_kibana_service_account()
print(f"Kibana Setup: {kibana_setup}")
2. 性能优化
向量搜索支持:
#!/usr/bin/env python3
# vector_search_8x.py
import numpy as np
from elasticsearch import Elasticsearch
from sentence_transformers import SentenceTransformer
class VectorSearchEngine:
def __init__(self, es_host='localhost:9200'):
self.es = Elasticsearch([es_host])
self.model = SentenceTransformer('all-MiniLM-L6-v2')
def create_vector_index(self, index_name: str):
"""创建支持向量搜索的索引"""
mapping = {
"mappings": {
"properties": {
"title": {"type": "text"},
"content": {"type": "text"},
"title_vector": {
"type": "dense_vector",
"dims": 384, # MiniLM模型的维度
"index": True,
"similarity": "cosine"
},
"content_vector": {
"type": "dense_vector",
"dims": 384,
"index": True,
"similarity": "cosine"
},
"timestamp": {"type": "date"}
}
},
"settings": {
"number_of_shards": 1,
"number_of_replicas": 0
}
}
try:
self.es.indices.create(index=index_name, body=mapping)
print(f"Created vector index: {index_name}")
return True
except Exception as e:
print(f"Failed to create index: {e}")
return False
def index_document_with_vectors(self, index_name: str, doc_id: str,
title: str, content: str):
"""索引文档并生成向量"""
# 生成向量
title_vector = self.model.encode(title).tolist()
content_vector = self.model.encode(content).tolist()
doc = {
"title": title,
"content": content,
"title_vector": title_vector,
"content_vector": content_vector,
"timestamp": "now"
}
try:
response = self.es.index(
index=index_name,
id=doc_id,
body=doc
)
return response
except Exception as e:
print(f"Failed to index document: {e}")
return None
def vector_search(self, index_name: str, query_text: str,
field: str = "content_vector", size: int = 10):
"""执行向量搜索"""
# 生成查询向量
query_vector = self.model.encode(query_text).tolist()
search_body = {
"knn": {
"field": field,
"query_vector": query_vector,
"k": size,
"num_candidates": size * 10
},
"_source": ["title", "content", "timestamp"]
}
try:
response = self.es.search(
index=index_name,
body=search_body
)
results = []
for hit in response['hits']['hits']:
results.append({
'id': hit['_id'],
'score': hit['_score'],
'title': hit['_source']['title'],
'content': hit['_source']['content'][:200] + '...',
'timestamp': hit['_source']['timestamp']
})
return results
except Exception as e:
print(f"Vector search failed: {e}")
return []
def hybrid_search(self, index_name: str, query_text: str, size: int = 10):
"""混合搜索:结合文本搜索和向量搜索"""
query_vector = self.model.encode(query_text).tolist()
search_body = {
"query": {
"bool": {
"should": [
{
"multi_match": {
"query": query_text,
"fields": ["title^2", "content"],
"type": "best_fields"
}
}
]
}
},
"knn": {
"field": "content_vector",
"query_vector": query_vector,
"k": size,
"num_candidates": size * 10,
"boost": 0.5
},
"size": size,
"_source": ["title", "content", "timestamp"]
}
try:
response = self.es.search(
index=index_name,
body=search_body
)
results = []
for hit in response['hits']['hits']:
results.append({
'id': hit['_id'],
'score': hit['_score'],
'title': hit['_source']['title'],
'content': hit['_source']['content'][:200] + '...',
'timestamp': hit['_source']['timestamp']
})
return results
except Exception as e:
print(f"Hybrid search failed: {e}")
return []
# 使用示例
if __name__ == "__main__":
vector_engine = VectorSearchEngine()
# 创建向量索引
vector_engine.create_vector_index("documents")
# 索引一些示例文档
documents = [
("1", "Elasticsearch Vector Search", "Elasticsearch 8.x introduces native vector search capabilities for semantic search and machine learning applications."),
("2", "Kibana Data Visualization", "Kibana provides powerful data visualization tools for exploring and analyzing your Elasticsearch data."),
("3", "Logstash Data Processing", "Logstash is a data processing pipeline that ingests data from multiple sources simultaneously.")
]
for doc_id, title, content in documents:
vector_engine.index_document_with_vectors("documents", doc_id, title, content)
# 执行向量搜索
results = vector_engine.vector_search("documents", "machine learning search")
print("Vector Search Results:")
for result in results:
print(f"- {result['title']} (Score: {result['score']:.4f})")
# 执行混合搜索
hybrid_results = vector_engine.hybrid_search("documents", "data visualization")
print("\nHybrid Search Results:")
for result in hybrid_results:
print(f"- {result['title']} (Score: {result['score']:.4f})")
TSDB(时间序列数据库)模式:
# 时间序列索引模板
PUT _index_template/metrics-template
{
"index_patterns": ["metrics-*"],
"data_stream": {},
"template": {
"settings": {
"index": {
"mode": "time_series",
"routing_path": ["host.name", "service.name"],
"number_of_shards": 2,
"number_of_replicas": 0
}
},
"mappings": {
"properties": {
"@timestamp": {
"type": "date"
},
"host.name": {
"type": "keyword",
"time_series_dimension": true
},
"service.name": {
"type": "keyword",
"time_series_dimension": true
},
"cpu.usage": {
"type": "double",
"time_series_metric": "gauge"
},
"memory.usage": {
"type": "double",
"time_series_metric": "gauge"
},
"request.count": {
"type": "long",
"time_series_metric": "counter"
}
}
}
}
}
3. Kibana 8.x 新特性
Lens可视化增强:
#!/usr/bin/env python3
# kibana_8x_lens.py
import json
import requests
from typing import Dict, List, Any
class KibanaLensManager:
def __init__(self, kibana_url='http://localhost:5601',
username='elastic', password='changeme'):
self.kibana_url = kibana_url
self.auth = (username, password)
self.headers = {
'Content-Type': 'application/json',
'kbn-xsrf': 'true'
}
def create_advanced_lens_visualization(self, space_id='default') -> Dict[str, Any]:
"""创建高级Lens可视化"""
lens_config = {
"attributes": {
"title": "Advanced Metrics Dashboard",
"type": "lens",
"visualizationType": "lnsXY",
"state": {
"datasourceStates": {
"indexpattern": {
"layers": {
"layer1": {
"columnOrder": ["col1", "col2", "col3"],
"columns": {
"col1": {
"label": "@timestamp",
"dataType": "date",
"operationType": "date_histogram",
"sourceField": "@timestamp",
"isBucketed": True,
"scale": "interval",
"params": {
"interval": "auto"
}
},
"col2": {
"label": "Average CPU Usage",
"dataType": "number",
"operationType": "average",
"sourceField": "cpu.usage",
"isBucketed": False,
"scale": "ratio"
},
"col3": {
"label": "95th Percentile Response Time",
"dataType": "number",
"operationType": "percentile",
"sourceField": "response_time",
"isBucketed": False,
"scale": "ratio",
"params": {
"percentile": 95
}
}
}
}
}
}
},
"visualization": {
"legend": {
"isVisible": True,
"position": "right"
},
"valueLabels": "hide",
"fittingFunction": "Linear",
"axisTitlesVisibilitySettings": {
"x": True,
"yLeft": True,
"yRight": True
},
"tickLabelsVisibilitySettings": {
"x": True,
"yLeft": True,
"yRight": True
},
"layers": [
{
"layerId": "layer1",
"accessors": ["col2", "col3"],
"position": "top",
"seriesType": "line",
"showGridlines": False,
"layerType": "data",
"xAccessor": "col1"
}
]
},
"query": {
"query": "",
"language": "kuery"
},
"filters": []
},
"references": [
{
"type": "index-pattern",
"id": "metrics-*",
"name": "indexpattern-datasource-current-indexpattern"
}
]
}
}
url = f"{self.kibana_url}/s/{space_id}/api/saved_objects/lens"
try:
response = requests.post(
url,
json=lens_config,
auth=self.auth,
headers=self.headers
)
if response.status_code == 200:
return response.json()
else:
return {'error': f'Failed to create visualization: {response.status_code}'}
except Exception as e:
return {'error': str(e)}
def create_runtime_field_visualization(self, space_id='default') -> Dict[str, Any]:
"""创建使用运行时字段的可视化"""
lens_config = {
"attributes": {
"title": "Runtime Field Analysis",
"type": "lens",
"visualizationType": "lnsPie",
"state": {
"datasourceStates": {
"indexpattern": {
"layers": {
"layer1": {
"columnOrder": ["col1", "col2"],
"columns": {
"col1": {
"label": "Error Category",
"dataType": "string",
"operationType": "terms",
"sourceField": "error_category",
"isBucketed": True,
"scale": "ordinal",
"params": {
"size": 10,
"orderBy": {
"type": "column",
"columnId": "col2"
},
"orderDirection": "desc"
}
},
"col2": {
"label": "Count",
"dataType": "number",
"operationType": "count",
"isBucketed": False,
"scale": "ratio"
}
}
}
}
}
},
"visualization": {
"shape": "pie",
"layers": [
{
"layerId": "layer1",
"groups": ["col1"],
"metric": "col2",
"numberDisplay": "percent",
"categoryDisplay": "default",
"legendDisplay": "visible",
"nestedLegend": False,
"layerType": "data"
}
]
},
"query": {
"query": "level:ERROR",
"language": "kuery"
},
"filters": []
},
"references": [
{
"type": "index-pattern",
"id": "logs-*",
"name": "indexpattern-datasource-current-indexpattern"
}
]
}
}
url = f"{self.kibana_url}/s/{space_id}/api/saved_objects/lens"
try:
response = requests.post(
url,
json=lens_config,
auth=self.auth,
headers=self.headers
)
return response.json() if response.status_code == 200 else {'error': response.text}
except Exception as e:
return {'error': str(e)}
def create_runtime_field(self, index_pattern: str, field_name: str,
script: str, field_type: str = 'keyword') -> Dict[str, Any]:
"""创建运行时字段"""
runtime_field_config = {
"runtimeField": {
"name": field_name,
"type": field_type,
"script": {
"source": script
}
}
}
url = f"{self.kibana_url}/api/index_patterns/{index_pattern}/runtime_field"
try:
response = requests.post(
url,
json=runtime_field_config,
auth=self.auth,
headers=self.headers
)
return response.json() if response.status_code == 200 else {'error': response.text}
except Exception as e:
return {'error': str(e)}
# 使用示例
if __name__ == "__main__":
lens_manager = KibanaLensManager()
# 创建运行时字段
error_category_script = """
if (doc['message.keyword'].size() > 0) {
String message = doc['message.keyword'].value;
if (message.contains('OutOfMemoryError')) {
emit('Memory Error');
} else if (message.contains('ConnectionTimeout')) {
emit('Network Error');
} else if (message.contains('SQLException')) {
emit('Database Error');
} else {
emit('Other Error');
}
}
"""
runtime_field = lens_manager.create_runtime_field(
'logs-*',
'error_category',
error_category_script
)
print(f"Runtime field created: {runtime_field}")
# 创建高级可视化
advanced_viz = lens_manager.create_advanced_lens_visualization()
print(f"Advanced visualization created: {advanced_viz}")
# 创建运行时字段可视化
runtime_viz = lens_manager.create_runtime_field_visualization()
print(f"Runtime field visualization created: {runtime_viz}")
云原生部署策略
1. Kubernetes Operator
ECK (Elastic Cloud on Kubernetes) 部署:
# eck-deployment.yaml
apiVersion: v1
kind: Namespace
metadata:
name: elastic-system
---
apiVersion: elasticsearch.k8s.elastic.co/v1
kind: Elasticsearch
metadata:
name: elasticsearch-cluster
namespace: elastic-system
spec:
version: 8.11.0
nodeSets:
- name: master
count: 3
config:
node.roles: ["master"]
xpack.security.enabled: true
xpack.security.enrollment.enabled: true
podTemplate:
spec:
containers:
- name: elasticsearch
resources:
requests:
memory: 4Gi
cpu: 2
limits:
memory: 4Gi
cpu: 2
env:
- name: ES_JAVA_OPTS
value: "-Xms2g -Xmx2g"
volumeClaimTemplates:
- metadata:
name: elasticsearch-data
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 100Gi
storageClassName: fast-ssd
- name: data
count: 6
config:
node.roles: ["data", "ingest"]
xpack.security.enabled: true
podTemplate:
spec:
containers:
- name: elasticsearch
resources:
requests:
memory: 8Gi
cpu: 4
limits:
memory: 8Gi
cpu: 4
env:
- name: ES_JAVA_OPTS
value: "-Xms4g -Xmx4g"
volumeClaimTemplates:
- metadata:
name: elasticsearch-data
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 500Gi
storageClassName: fast-ssd
- name: coordinating
count: 2
config:
node.roles: []
xpack.security.enabled: true
podTemplate:
spec:
containers:
- name: elasticsearch
resources:
requests:
memory: 2Gi
cpu: 1
limits:
memory: 2Gi
cpu: 1
env:
- name: ES_JAVA_OPTS
value: "-Xms1g -Xmx1g"
http:
tls:
selfSignedCertificate:
disabled: false
service:
spec:
type: LoadBalancer
ports:
- name: https
port: 9200
targetPort: 9200
---
apiVersion: kibana.k8s.elastic.co/v1
kind: Kibana
metadata:
name: kibana
namespace: elastic-system
spec:
version: 8.11.0
count: 2
elasticsearchRef:
name: elasticsearch-cluster
config:
server.publicBaseUrl: "https://kibana.example.com"
xpack.security.enabled: true
xpack.fleet.enabled: true
xpack.encryptedSavedObjects.encryptionKey: "your-encryption-key-32-chars-long"
podTemplate:
spec:
containers:
- name: kibana
resources:
requests:
memory: 2Gi
cpu: 1
limits:
memory: 2Gi
cpu: 1
http:
tls:
selfSignedCertificate:
disabled: false
service:
spec:
type: LoadBalancer
ports:
- name: https
port: 5601
targetPort: 5601
---
apiVersion: beat.k8s.elastic.co/v1beta1
kind: Beat
metadata:
name: filebeat
namespace: elastic-system
spec:
type: filebeat
version: 8.11.0
elasticsearchRef:
name: elasticsearch-cluster
kibanaRef:
name: kibana
config:
filebeat.autodiscover:
providers:
- type: kubernetes
node: ${NODE_NAME}
hints.enabled: true
hints.default_config:
type: container
paths:
- /var/log/containers/*${data.kubernetes.container.id}.log
processors:
- add_kubernetes_metadata:
host: ${NODE_NAME}
matchers:
- logs_path:
logs_path: "/var/log/containers/"
- drop_event:
when:
equals:
kubernetes.container.name: "filebeat"
output.elasticsearch:
hosts: ["elasticsearch-cluster-es-http:9200"]
protocol: "https"
ssl.certificate_authorities:
- /mnt/elastic/tls.crt
daemonSet:
podTemplate:
spec:
serviceAccountName: filebeat
automountServiceAccountToken: true
terminationGracePeriodSeconds: 30
dnsPolicy: ClusterFirstWithHostNet
hostNetwork: true
containers:
- name: filebeat
securityContext:
runAsUser: 0
volumeMounts:
- name: varlogcontainers
mountPath: /var/log/containers
- name: varlogpods
mountPath: /var/log/pods
- name: varlibdockercontainers
mountPath: /var/lib/docker/containers
env:
- name: NODE_NAME
valueFrom:
fieldRef:
fieldPath: spec.nodeName
volumes:
- name: varlogcontainers
hostPath:
path: /var/log/containers
- name: varlogpods
hostPath:
path: /var/log/pods
- name: varlibdockercontainers
hostPath:
path: /var/lib/docker/containers
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: filebeat
namespace: elastic-system
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: filebeat
rules:
- apiGroups: [""]
resources:
- nodes
- namespaces
- events
- pods
- services
- configmaps
verbs: ["get", "list", "watch"]
- apiGroups: ["apps"]
resources:
- daemonsets
- deployments
- replicasets
- statefulsets
verbs: ["get", "list", "watch"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: filebeat
subjects:
- kind: ServiceAccount
name: filebeat
namespace: elastic-system
roleRef:
kind: ClusterRole
name: filebeat
apiGroup: rbac.authorization.k8s.io
自动化部署脚本:
#!/usr/bin/env python3
# eck_deployment_manager.py
import yaml
import subprocess
import time
import requests
from kubernetes import client, config
from typing import Dict, List, Any
class ECKDeploymentManager:
def __init__(self, kubeconfig_path=None):
if kubeconfig_path:
config.load_kube_config(config_file=kubeconfig_path)
else:
config.load_incluster_config()
self.v1 = client.CoreV1Api()
self.apps_v1 = client.AppsV1Api()
self.custom_api = client.CustomObjectsApi()
def install_eck_operator(self) -> bool:
"""安装ECK Operator"""
try:
# 安装ECK CRDs
subprocess.run([
'kubectl', 'create', '-f',
'https://download.elastic.co/downloads/eck/2.10.0/crds.yaml'
], check=True)
# 安装ECK Operator
subprocess.run([
'kubectl', 'apply', '-f',
'https://download.elastic.co/downloads/eck/2.10.0/operator.yaml'
], check=True)
print("ECK Operator installed successfully")
return True
except subprocess.CalledProcessError as e:
print(f"Failed to install ECK Operator: {e}")
return False
def wait_for_operator_ready(self, namespace='elastic-system', timeout=300) -> bool:
"""等待Operator就绪"""
start_time = time.time()
while time.time() - start_time < timeout:
try:
pods = self.v1.list_namespaced_pod(
namespace=namespace,
label_selector='control-plane=elastic-operator'
)
if pods.items:
pod = pods.items[0]
if pod.status.phase == 'Running':
print("ECK Operator is ready")
return True
time.sleep(10)
except Exception as e:
print(f"Error checking operator status: {e}")
time.sleep(10)
print("Timeout waiting for ECK Operator")
return False
def deploy_elasticsearch_cluster(self, cluster_config: Dict[str, Any]) -> bool:
"""部署Elasticsearch集群"""
try:
self.custom_api.create_namespaced_custom_object(
group='elasticsearch.k8s.elastic.co',
version='v1',
namespace=cluster_config['metadata']['namespace'],
plural='elasticsearches',
body=cluster_config
)
print(f"Elasticsearch cluster {cluster_config['metadata']['name']} deployed")
return True
except Exception as e:
print(f"Failed to deploy Elasticsearch cluster: {e}")
return False
def deploy_kibana(self, kibana_config: Dict[str, Any]) -> bool:
"""部署Kibana"""
try:
self.custom_api.create_namespaced_custom_object(
group='kibana.k8s.elastic.co',
version='v1',
namespace=kibana_config['metadata']['namespace'],
plural='kibanas',
body=kibana_config
)
print(f"Kibana {kibana_config['metadata']['name']} deployed")
return True
except Exception as e:
print(f"Failed to deploy Kibana: {e}")
return False
def deploy_beats(self, beats_config: Dict[str, Any]) -> bool:
"""部署Beats"""
try:
self.custom_api.create_namespaced_custom_object(
group='beat.k8s.elastic.co',
version='v1beta1',
namespace=beats_config['metadata']['namespace'],
plural='beats',
body=beats_config
)
print(f"Beat {beats_config['metadata']['name']} deployed")
return True
except Exception as e:
print(f"Failed to deploy Beat: {e}")
return False
def get_elasticsearch_password(self, cluster_name: str, namespace: str) -> str:
"""获取Elasticsearch密码"""
try:
secret = self.v1.read_namespaced_secret(
name=f"{cluster_name}-es-elastic-user",
namespace=namespace
)
password = secret.data['elastic']
import base64
return base64.b64decode(password).decode('utf-8')
except Exception as e:
print(f"Failed to get Elasticsearch password: {e}")
return None
def wait_for_cluster_ready(self, cluster_name: str, namespace: str, timeout=600) -> bool:
"""等待集群就绪"""
start_time = time.time()
while time.time() - start_time < timeout:
try:
cluster = self.custom_api.get_namespaced_custom_object(
group='elasticsearch.k8s.elastic.co',
version='v1',
namespace=namespace,
plural='elasticsearches',
name=cluster_name
)
if cluster.get('status', {}).get('phase') == 'Ready':
print(f"Elasticsearch cluster {cluster_name} is ready")
return True
print(f"Cluster status: {cluster.get('status', {}).get('phase', 'Unknown')}")
time.sleep(30)
except Exception as e:
print(f"Error checking cluster status: {e}")
time.sleep(30)
print(f"Timeout waiting for cluster {cluster_name}")
return False
def setup_monitoring(self, namespace: str) -> bool:
"""设置监控"""
monitoring_config = {
'apiVersion': 'beat.k8s.elastic.co/v1beta1',
'kind': 'Beat',
'metadata': {
'name': 'metricbeat',
'namespace': namespace
},
'spec': {
'type': 'metricbeat',
'version': '8.11.0',
'elasticsearchRef': {
'name': 'elasticsearch-cluster'
},
'kibanaRef': {
'name': 'kibana'
},
'config': {
'metricbeat.autodiscover': {
'providers': [
{
'type': 'kubernetes',
'scope': 'cluster',
'hints.enabled': True,
'templates': [
{
'condition': {
'contains': {
'kubernetes.labels.app': 'elasticsearch'
}
},
'config': [
{
'module': 'elasticsearch',
'metricsets': ['node', 'node_stats'],
'period': '10s',
'hosts': ['${data.host}:${data.ports.http}']
}
]
}
]
}
]
},
'processors': [
{
'add_kubernetes_metadata': {}
}
]
},
'deployment': {
'podTemplate': {
'spec': {
'serviceAccountName': 'metricbeat',
'automountServiceAccountToken': True,
'containers': [
{
'name': 'metricbeat',
'securityContext': {
'runAsUser': 0
}
}
]
}
}
}
}
}
return self.deploy_beats(monitoring_config)
def full_deployment(self, config_file: str) -> Dict[str, Any]:
"""完整部署流程"""
deployment_result = {
'operator_installed': False,
'elasticsearch_deployed': False,
'kibana_deployed': False,
'beats_deployed': False,
'monitoring_setup': False,
'cluster_ready': False,
'elasticsearch_password': None
}
try:
# 读取配置文件
with open(config_file, 'r') as f:
configs = list(yaml.safe_load_all(f))
# 1. 安装ECK Operator
if self.install_eck_operator():
deployment_result['operator_installed'] = True
# 等待Operator就绪
if self.wait_for_operator_ready():
# 2. 部署Elasticsearch
es_config = next((c for c in configs if c['kind'] == 'Elasticsearch'), None)
if es_config and self.deploy_elasticsearch_cluster(es_config):
deployment_result['elasticsearch_deployed'] = True
# 等待集群就绪
if self.wait_for_cluster_ready(
es_config['metadata']['name'],
es_config['metadata']['namespace']
):
deployment_result['cluster_ready'] = True
# 获取密码
password = self.get_elasticsearch_password(
es_config['metadata']['name'],
es_config['metadata']['namespace']
)
deployment_result['elasticsearch_password'] = password
# 3. 部署Kibana
kibana_config = next((c for c in configs if c['kind'] == 'Kibana'), None)
if kibana_config and self.deploy_kibana(kibana_config):
deployment_result['kibana_deployed'] = True
# 4. 部署Beats
beats_configs = [c for c in configs if c['kind'] == 'Beat']
if all(self.deploy_beats(beat_config) for beat_config in beats_configs):
deployment_result['beats_deployed'] = True
# 5. 设置监控
if self.setup_monitoring(es_config['metadata']['namespace']):
deployment_result['monitoring_setup'] = True
except Exception as e:
deployment_result['error'] = str(e)
return deployment_result
# 使用示例
if __name__ == "__main__":
manager = ECKDeploymentManager()
# 执行完整部署
result = manager.full_deployment('eck-deployment.yaml')
print("Deployment Result:")
for key, value in result.items():
print(f" {key}: {value}")
if result.get('elasticsearch_password'):
print(f"\nElasticsearch Password: {result['elasticsearch_password']}")
print("Save this password securely!")
2. 多云部署策略
Terraform多云部署:
# terraform/main.tf
terraform {
required_providers {
aws = {
source = "hashicorp/aws"
version = "~> 5.0"
}
azurerm = {
source = "hashicorp/azurerm"
version = "~> 3.0"
}
google = {
source = "hashicorp/google"
version = "~> 4.0"
}
}
}
# AWS Provider
provider "aws" {
region = var.aws_region
}
# Azure Provider
provider "azurerm" {
features {}
}
# GCP Provider
provider "google" {
project = var.gcp_project
region = var.gcp_region
}
# Variables
variable "aws_region" {
description = "AWS region"
type = string
default = "us-west-2"
}
variable "azure_location" {
description = "Azure location"
type = string
default = "West US 2"
}
variable "gcp_project" {
description = "GCP project ID"
type = string
}
variable "gcp_region" {
description = "GCP region"
type = string
default = "us-west1"
}
variable "cluster_name" {
description = "ELK cluster name"
type = string
default = "elk-multicloud"
}
# AWS EKS Cluster
module "aws_eks" {
source = "./modules/aws-eks"
cluster_name = "${var.cluster_name}-aws"
region = var.aws_region
node_groups = {
elasticsearch = {
instance_types = ["r5.2xlarge"]
min_size = 3
max_size = 6
desired_size = 3
labels = {
role = "elasticsearch"
}
taints = [
{
key = "elasticsearch"
value = "true"
effect = "NO_SCHEDULE"
}
]
}
kibana = {
instance_types = ["m5.large"]
min_size = 2
max_size = 4
desired_size = 2
labels = {
role = "kibana"
}
}
logstash = {
instance_types = ["c5.xlarge"]
min_size = 2
max_size = 8
desired_size = 3
labels = {
role = "logstash"
}
}
}
}
# Azure AKS Cluster
module "azure_aks" {
source = "./modules/azure-aks"
cluster_name = "${var.cluster_name}-azure"
location = var.azure_location
resource_group_name = "${var.cluster_name}-rg"
node_pools = {
elasticsearch = {
vm_size = "Standard_D8s_v3"
node_count = 3
node_labels = {
role = "elasticsearch"
}
node_taints = [
"elasticsearch=true:NoSchedule"
]
}
kibana = {
vm_size = "Standard_D4s_v3"
node_count = 2
node_labels = {
role = "kibana"
}
}
}
}
# GCP GKE Cluster
module "gcp_gke" {
source = "./modules/gcp-gke"
cluster_name = "${var.cluster_name}-gcp"
project = var.gcp_project
region = var.gcp_region
node_pools = {
elasticsearch = {
machine_type = "n1-standard-8"
node_count = 3
node_config = {
labels = {
role = "elasticsearch"
}
taint = [
{
key = "elasticsearch"
value = "true"
effect = "NO_SCHEDULE"
}
]
}
}
kibana = {
machine_type = "n1-standard-4"
node_count = 2
node_config = {
labels = {
role = "kibana"
}
}
}
}
}
# Cross-cluster networking
resource "aws_vpc_peering_connection" "aws_to_azure" {
count = var.enable_cross_cloud_networking ? 1 : 0
vpc_id = module.aws_eks.vpc_id
peer_vpc_id = module.azure_aks.vnet_id
tags = {
Name = "aws-azure-peering"
}
}
# Outputs
output "aws_cluster_endpoint" {
value = module.aws_eks.cluster_endpoint
}
output "azure_cluster_endpoint" {
value = module.azure_aks.cluster_endpoint
}
output "gcp_cluster_endpoint" {
value = module.gcp_gke.cluster_endpoint
}
output "kubeconfig_commands" {
value = {
aws = "aws eks update-kubeconfig --region ${var.aws_region} --name ${module.aws_eks.cluster_name}"
azure = "az aks get-credentials --resource-group ${module.azure_aks.resource_group_name} --name ${module.azure_aks.cluster_name}"
gcp = "gcloud container clusters get-credentials ${module.gcp_gke.cluster_name} --region ${var.gcp_region} --project ${var.gcp_project}"
}
}
多云管理脚本:
#!/usr/bin/env python3
# multicloud_elk_manager.py
import json
import subprocess
import yaml
from typing import Dict, List, Any
from concurrent.futures import ThreadPoolExecutor, as_completed
class MultiCloudELKManager:
def __init__(self, config_file='multicloud-config.yaml'):
with open(config_file, 'r') as f:
self.config = yaml.safe_load(f)
self.clusters = self.config['clusters']
def execute_kubectl_command(self, cluster: str, command: List[str]) -> Dict[str, Any]:
"""在指定集群执行kubectl命令"""
try:
# 切换到指定集群的kubeconfig
env = os.environ.copy()
env['KUBECONFIG'] = self.clusters[cluster]['kubeconfig']
result = subprocess.run(
command,
capture_output=True,
text=True,
env=env,
check=True
)
return {
'cluster': cluster,
'success': True,
'output': result.stdout,
'error': None
}
except subprocess.CalledProcessError as e:
return {
'cluster': cluster,
'success': False,
'output': e.stdout,
'error': e.stderr
}
def deploy_to_cluster(self, cluster: str, manifest_file: str) -> Dict[str, Any]:
"""部署到指定集群"""
command = ['kubectl', 'apply', '-f', manifest_file]
return self.execute_kubectl_command(cluster, command)
def deploy_to_all_clusters(self, manifest_file: str) -> List[Dict[str, Any]]:
"""并行部署到所有集群"""
results = []
with ThreadPoolExecutor(max_workers=len(self.clusters)) as executor:
futures = {
executor.submit(self.deploy_to_cluster, cluster, manifest_file): cluster
for cluster in self.clusters.keys()
}
for future in as_completed(futures):
result = future.result()
results.append(result)
return results
def get_cluster_status(self, cluster: str) -> Dict[str, Any]:
"""获取集群状态"""
commands = {
'nodes': ['kubectl', 'get', 'nodes', '-o', 'json'],
'elasticsearch': ['kubectl', 'get', 'elasticsearch', '-A', '-o', 'json'],
'kibana': ['kubectl', 'get', 'kibana', '-A', '-o', 'json'],
'beats': ['kubectl', 'get', 'beats', '-A', '-o', 'json']
}
status = {'cluster': cluster}
for resource, command in commands.items():
result = self.execute_kubectl_command(cluster, command)
results.append({
'cluster': cluster,
'template_result': result
})
return results
def monitor_cross_cluster_health(self) -> Dict[str, Any]:
"""监控跨集群健康状态"""
health_status = {}
for cluster in self.clusters.keys():
command = [
'kubectl', 'exec', '-n', 'elastic-system',
'elasticsearch-cluster-es-default-0', '--',
'curl', '-X', 'GET',
'https://localhost:9200/_cluster/health',
'--insecure', '-u', 'elastic:${ELASTIC_PASSWORD}'
]
result = self.execute_kubectl_command(cluster, command)
if result['success']:
try:
health_status[cluster] = json.loads(result['output'])
except json.JSONDecodeError:
health_status[cluster] = {'error': 'Invalid JSON response'}
else:
health_status[cluster] = {'error': result['error']}
return health_status
# 使用示例
if __name__ == "__main__":
import os
manager = MultiCloudELKManager()
# 获取所有集群状态
print("Getting cluster status...")
status = manager.get_all_clusters_status()
for cluster_status in status:
print(f"Cluster {cluster_status['cluster']}: {len(cluster_status.get('nodes', {}).get('items', []))} nodes")
# 设置跨集群搜索
print("\nSetting up cross-cluster search...")
cross_cluster_result = manager.setup_cross_cluster_search()
print(f"Primary cluster: {cross_cluster_result['primary_cluster']}")
print(f"Remote clusters: {cross_cluster_result['remote_clusters']}")
# 创建全局索引模板
print("\nCreating global index template...")
template_results = manager.create_global_index_template()
for result in template_results:
print(f"Template creation for {result['cluster']}: {'Success' if result['template_result']['success'] else 'Failed'}")
AI/ML集成应用
1. 机器学习异常检测
Elasticsearch ML作业配置:
#!/usr/bin/env python3
# elasticsearch_ml_manager.py
import json
import requests
from elasticsearch import Elasticsearch
from elasticsearch.client import MlClient
from typing import Dict, List, Any
class ElasticsearchMLManager:
def __init__(self, es_host='localhost:9200', username='elastic', password='changeme'):
self.es = Elasticsearch(
[es_host],
http_auth=(username, password),
verify_certs=False
)
self.ml_client = MlClient(self.es)
def create_anomaly_detection_job(self, job_id: str, index_pattern: str) -> Dict[str, Any]:
"""创建异常检测作业"""
job_config = {
'job_id': job_id,
'description': 'Anomaly detection for system metrics',
'analysis_config': {
'bucket_span': '15m',
'detectors': [
{
'function': 'mean',
'field_name': 'cpu.usage',
'detector_description': 'CPU usage anomaly detection'
},
{
'function': 'mean',
'field_name': 'memory.usage',
'detector_description': 'Memory usage anomaly detection'
},
{
'function': 'count',
'detector_description': 'Event count anomaly detection'
},
{
'function': 'rare',
'by_field_name': 'user.name',
'detector_description': 'Rare user activity detection'
}
],
'influencers': ['host.name', 'user.name', 'process.name']
},
'data_description': {
'time_field': '@timestamp',
'time_format': 'epoch_ms'
},
'model_plot_config': {
'enabled': True,
'terms': 'host.name'
},
'analysis_limits': {
'model_memory_limit': '512mb'
}
}
try:
response = self.ml_client.put_job(job_id=job_id, body=job_config)
# 创建数据馈送
datafeed_config = {
'datafeed_id': f'{job_id}-datafeed',
'job_id': job_id,
'indices': [index_pattern],
'query': {
'match_all': {}
},
'scroll_size': 1000,
'frequency': '150s',
'query_delay': '60s'
}
datafeed_response = self.ml_client.put_datafeed(
datafeed_id=f'{job_id}-datafeed',
body=datafeed_config
)
return {
'job_created': True,
'job_response': response,
'datafeed_created': True,
'datafeed_response': datafeed_response
}
except Exception as e:
return {'error': str(e), 'job_created': False}
def start_ml_job(self, job_id: str) -> Dict[str, Any]:
"""启动ML作业"""
try:
# 打开作业
open_response = self.ml_client.open_job(job_id=job_id)
# 启动数据馈送
start_response = self.ml_client.start_datafeed(
datafeed_id=f'{job_id}-datafeed',
start='now-7d'
)
return {
'job_opened': True,
'datafeed_started': True,
'open_response': open_response,
'start_response': start_response
}
except Exception as e:
return {'error': str(e), 'started': False}
def get_anomalies(self, job_id: str, start_time: str = 'now-24h',
end_time: str = 'now') -> List[Dict[str, Any]]:
"""获取异常检测结果"""
try:
response = self.ml_client.get_records(
job_id=job_id,
start=start_time,
end=end_time,
anomaly_score=50 # 只返回异常分数大于50的记录
)
anomalies = []
for record in response.get('records', []):
anomaly = {
'timestamp': record['timestamp'],
'anomaly_score': record['record_score'],
'probability': record['probability'],
'function': record['function'],
'field_name': record.get('field_name'),
'actual': record.get('actual'),
'typical': record.get('typical'),
'influencers': record.get('influencers', [])
}
anomalies.append(anomaly)
return sorted(anomalies, key=lambda x: x['anomaly_score'], reverse=True)
except Exception as e:
return [{'error': str(e)}]
def create_data_frame_analytics_job(self, job_id: str, source_index: str,
dest_index: str, analysis_type: str = 'outlier_detection') -> Dict[str, Any]:
"""创建数据框分析作业"""
if analysis_type == 'outlier_detection':
analysis_config = {
'outlier_detection': {
'n_neighbors': 20,
'method': 'lof',
'feature_influence_threshold': 0.1
}
}
elif analysis_type == 'regression':
analysis_config = {
'regression': {
'dependent_variable': 'response_time',
'training_percent': 80,
'prediction_field_name': 'predicted_response_time'
}
}
elif analysis_type == 'classification':
analysis_config = {
'classification': {
'dependent_variable': 'error_type',
'training_percent': 80,
'prediction_field_name': 'predicted_error_type',
'num_top_classes': 5
}
}
else:
return {'error': 'Unsupported analysis type'}
job_config = {
'id': job_id,
'source': {
'index': [source_index],
'query': {
'match_all': {}
}
},
'dest': {
'index': dest_index
},
'analysis': analysis_config,
'analyzed_fields': {
'includes': ['cpu.usage', 'memory.usage', 'response_time', 'request_count'],
'excludes': ['@timestamp', 'host.name']
},
'model_memory_limit': '1gb'
}
try:
response = self.ml_client.put_data_frame_analytics(id=job_id, body=job_config)
# 启动作业
start_response = self.ml_client.start_data_frame_analytics(id=job_id)
return {
'job_created': True,
'job_started': True,
'create_response': response,
'start_response': start_response
}
except Exception as e:
return {'error': str(e), 'job_created': False}
def create_ml_pipeline(self, pipeline_id: str, model_id: str) -> Dict[str, Any]:
"""创建ML推理管道"""
pipeline_config = {
'description': 'ML inference pipeline for real-time predictions',
'processors': [
{
'inference': {
'model_id': model_id,
'target_field': 'ml_prediction',
'field_map': {
'cpu.usage': 'cpu_usage',
'memory.usage': 'memory_usage',
'response_time': 'response_time'
}
}
},
{
'script': {
'source': """
if (ctx.ml_prediction.predicted_value > 0.8) {
ctx.alert_level = 'high';
} else if (ctx.ml_prediction.predicted_value > 0.5) {
ctx.alert_level = 'medium';
} else {
ctx.alert_level = 'low';
}
"""
}
}
]
}
try:
response = self.es.ingest.put_pipeline(
id=pipeline_id,
body=pipeline_config
)
return {'pipeline_created': True, 'response': response}
except Exception as e:
return {'error': str(e), 'pipeline_created': False}
# 使用示例
if __name__ == "__main__":
ml_manager = ElasticsearchMLManager()
# 创建异常检测作业
anomaly_job = ml_manager.create_anomaly_detection_job(
'system-metrics-anomaly',
'metrics-*'
)
print(f"Anomaly detection job: {anomaly_job}")
# 启动作业
if anomaly_job.get('job_created'):
start_result = ml_manager.start_ml_job('system-metrics-anomaly')
print(f"Job start result: {start_result}")
# 创建离群值检测作业
outlier_job = ml_manager.create_data_frame_analytics_job(
'outlier-detection-job',
'metrics-*',
'outlier-results',
'outlier_detection'
)
print(f"Outlier detection job: {outlier_job}")
2. 自然语言处理集成
NLP管道配置:
#!/usr/bin/env python3
# nlp_pipeline_manager.py
import json
import requests
from elasticsearch import Elasticsearch
from transformers import pipeline, AutoTokenizer, AutoModel
import torch
from typing import Dict, List, Any
class NLPPipelineManager:
def __init__(self, es_host='localhost:9200', username='elastic', password='changeme'):
self.es = Elasticsearch(
[es_host],
http_auth=(username, password),
verify_certs=False
)
# 初始化NLP模型
self.sentiment_analyzer = pipeline('sentiment-analysis')
self.ner_pipeline = pipeline('ner', aggregation_strategy='simple')
self.summarizer = pipeline('summarization')
def create_nlp_ingest_pipeline(self, pipeline_id: str) -> Dict[str, Any]:
"""创建NLP摄取管道"""
pipeline_config = {
'description': 'NLP processing pipeline for log analysis',
'processors': [
{
'script': {
'source': """
// 文本预处理
if (ctx.message != null) {
ctx.processed_message = ctx.message.toLowerCase();
ctx.message_length = ctx.message.length();
}
"""
}
},
{
'inference': {
'model_id': 'sentiment-analysis-model',
'target_field': 'sentiment',
'field_map': {
'message': 'text_field'
}
}
},
{
'inference': {
'model_id': 'ner-model',
'target_field': 'entities',
'field_map': {
'message': 'text_field'
}
}
},
{
'script': {
'source': """
// 提取关键词
if (ctx.message != null) {
String[] words = ctx.message.split(' ');
List keywords = new ArrayList();
for (String word : words) {
if (word.length() > 5 && !word.matches('.*\\d.*')) {
keywords.add(word.toLowerCase());
}
}
ctx.keywords = keywords;
}
"""
}
},
{
'set': {
'field': 'nlp_processed',
'value': true
}
}
]
}
try:
response = self.es.ingest.put_pipeline(
id=pipeline_id,
body=pipeline_config
)
return {'pipeline_created': True, 'response': response}
except Exception as e:
return {'error': str(e), 'pipeline_created': False}
def analyze_log_sentiment(self, log_messages: List[str]) -> List[Dict[str, Any]]:
"""分析日志情感"""
results = []
for message in log_messages:
try:
sentiment_result = self.sentiment_analyzer(message)
analysis = {
'message': message,
'sentiment': sentiment_result[0]['label'],
'confidence': sentiment_result[0]['score'],
'message_length': len(message)
}
results.append(analysis)
except Exception as e:
results.append({
'message': message,
'error': str(e)
})
return results
def extract_entities(self, log_messages: List[str]) -> List[Dict[str, Any]]:
"""提取命名实体"""
results = []
for message in log_messages:
try:
entities = self.ner_pipeline(message)
extracted_entities = []
for entity in entities:
extracted_entities.append({
'text': entity['word'],
'label': entity['entity_group'],
'confidence': entity['score'],
'start': entity['start'],
'end': entity['end']
})
results.append({
'message': message,
'entities': extracted_entities,
'entity_count': len(extracted_entities)
})
except Exception as e:
results.append({
'message': message,
'error': str(e)
})
return results
def summarize_logs(self, log_messages: List[str], max_length: int = 150) -> Dict[str, Any]:
"""总结日志内容"""
try:
# 合并所有日志消息
combined_text = ' '.join(log_messages)
# 如果文本太长,截取前1000个字符
if len(combined_text) > 1000:
combined_text = combined_text[:1000]
summary_result = self.summarizer(
combined_text,
max_length=max_length,
min_length=30,
do_sample=False
)
return {
'original_message_count': len(log_messages),
'original_text_length': len(combined_text),
'summary': summary_result[0]['summary_text'],
'summary_length': len(summary_result[0]['summary_text'])
}
except Exception as e:
return {'error': str(e)}
def create_intelligent_alerting(self, index_pattern: str) -> Dict[str, Any]:
"""创建智能告警"""
watcher_config = {
'trigger': {
'schedule': {
'interval': '5m'
}
},
'input': {
'search': {
'request': {
'search_type': 'query_then_fetch',
'indices': [index_pattern],
'body': {
'query': {
'bool': {
'must': [
{
'range': {
'@timestamp': {
'gte': 'now-5m'
}
}
},
{
'term': {
'sentiment.keyword': 'NEGATIVE'
}
}
]
}
},
'aggs': {
'error_types': {
'terms': {
'field': 'entities.label.keyword',
'size': 10
}
},
'avg_confidence': {
'avg': {
'field': 'sentiment.confidence'
}
}
}
}
}
}
},
'condition': {
'compare': {
'ctx.payload.hits.total': {
'gt': 10
}
}
},
'actions': {
'send_email': {
'email': {
'to': ['admin@example.com'],
'subject': 'High Volume of Negative Sentiment Logs Detected',
'body': """
Alert: {{ctx.payload.hits.total}} negative sentiment logs detected in the last 5 minutes.
Average confidence: {{ctx.payload.aggregations.avg_confidence.value}}
Top error types:
{{#ctx.payload.aggregations.error_types.buckets}}
- {{key}}: {{doc_count}} occurrences
{{/ctx.payload.aggregations.error_types.buckets}}
Please investigate immediately.
"""
}
},
'create_incident': {
'webhook': {
'scheme': 'https',
'host': 'api.pagerduty.com',
'port': 443,
'method': 'post',
'path': '/incidents',
'params': {},
'headers': {
'Content-Type': 'application/json',
'Authorization': 'Token token={{ctx.metadata.pagerduty_token}}'
},
'body': '''
{
"incident": {
"type": "incident",
"title": "High Volume Negative Sentiment Logs",
"service": {
"id": "{{ctx.metadata.service_id}}",
"type": "service_reference"
},
"urgency": "high",
"body": {
"type": "incident_body",
"details": "{{ctx.payload.hits.total}} negative sentiment logs detected"
}
}
}
'''
}
}
}
}
try:
response = self.es.watcher.put_watch(
id='intelligent-log-alerting',
body=watcher_config
)
return {'watcher_created': True, 'response': response}
except Exception as e:
return {'error': str(e), 'watcher_created': False}
# 使用示例
if __name__ == "__main__":
nlp_manager = NLPPipelineManager()
# 示例日志消息
sample_logs = [
"ERROR: Database connection failed for user john.doe",
"INFO: User authentication successful for admin@example.com",
"WARN: High memory usage detected on server-01",
"ERROR: Payment processing failed for transaction ID 12345",
"INFO: Backup completed successfully"
]
# 分析情感
sentiment_results = nlp_manager.analyze_log_sentiment(sample_logs)
print("Sentiment Analysis Results:")
for result in sentiment_results:
print(f" {result['sentiment']} ({result['confidence']:.2f}): {result['message'][:50]}...")
# 提取实体
entity_results = nlp_manager.extract_entities(sample_logs)
print("\nEntity Extraction Results:")
for result in entity_results:
if 'entities' in result:
print(f" Message: {result['message'][:50]}...")
for entity in result['entities']:
print(f" - {entity['text']} ({entity['label']}, {entity['confidence']:.2f})")
# 总结日志
summary_result = nlp_manager.summarize_logs(sample_logs)
print(f"\nLog Summary: {summary_result}")
# 创建NLP摄取管道
pipeline_result = nlp_manager.create_nlp_ingest_pipeline('nlp-log-processing')
print(f"\nNLP Pipeline Creation: {pipeline_result}")
边缘计算集成
1. 边缘节点部署
轻量级ELK部署配置:
# edge-elk-deployment.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: edge-elasticsearch-config
namespace: edge-logging
data:
elasticsearch.yml: |
cluster.name: edge-cluster
node.name: edge-node-1
node.roles: [master, data, ingest]
# 边缘环境优化配置
bootstrap.memory_lock: false
discovery.type: single-node
# 资源限制
indices.memory.index_buffer_size: 20%
indices.fielddata.cache.size: 15%
indices.queries.cache.size: 10%
# 网络配置
network.host: 0.0.0.0
http.port: 9200
transport.port: 9300
# 安全配置
xpack.security.enabled: true
xpack.security.transport.ssl.enabled: true
xpack.security.http.ssl.enabled: true
# 数据保留策略
indices.lifecycle.poll_interval: 10m
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: edge-elasticsearch
namespace: edge-logging
spec:
replicas: 1
selector:
matchLabels:
app: edge-elasticsearch
template:
metadata:
labels:
app: edge-elasticsearch
spec:
containers:
- name: elasticsearch
image: docker.elastic.co/elasticsearch/elasticsearch:8.11.0
resources:
limits:
memory: "2Gi"
cpu: "1000m"
requests:
memory: "1Gi"
cpu: "500m"
env:
- name: ES_JAVA_OPTS
value: "-Xms1g -Xmx1g"
- name: ELASTIC_PASSWORD
valueFrom:
secretKeyRef:
name: elastic-credentials
key: password
ports:
- containerPort: 9200
- containerPort: 9300
volumeMounts:
- name: config
mountPath: /usr/share/elasticsearch/config/elasticsearch.yml
subPath: elasticsearch.yml
- name: data
mountPath: /usr/share/elasticsearch/data
volumes:
- name: config
configMap:
name: edge-elasticsearch-config
- name: data
persistentVolumeClaim:
claimName: edge-elasticsearch-data
---
apiVersion: v1
kind: ConfigMap
metadata:
name: edge-logstash-config
namespace: edge-logging
data:
logstash.yml: |
http.host: "0.0.0.0"
path.config: /usr/share/logstash/pipeline
pipeline.workers: 2
pipeline.batch.size: 125
pipeline.batch.delay: 50
pipeline.conf: |
input {
beats {
port => 5044
}
# 本地文件输入
file {
path => "/var/log/edge/*.log"
start_position => "beginning"
sincedb_path => "/dev/null"
codec => "json"
}
# IoT设备数据输入
mqtt {
host => "mqtt-broker"
port => 1883
topic => "iot/+/logs"
codec => "json"
}
}
filter {
# 边缘设备标识
mutate {
add_field => { "edge_node" => "${EDGE_NODE_ID:unknown}" }
add_field => { "edge_location" => "${EDGE_LOCATION:unknown}" }
}
# 数据预处理
if [message] {
grok {
match => { "message" => "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{GREEDYDATA:content}" }
}
date {
match => [ "timestamp", "ISO8601" ]
}
}
# 边缘计算 - 本地数据聚合
if [sensor_data] {
ruby {
code => "
# 计算移动平均值
@moving_avg ||= []
@moving_avg << event.get('sensor_data').to_f
@moving_avg = @moving_avg.last(10) if @moving_avg.length > 10
avg = @moving_avg.sum / @moving_avg.length
event.set('moving_average', avg)
# 异常检测
if (event.get('sensor_data').to_f - avg).abs > (avg * 0.3)
event.set('anomaly_detected', true)
event.set('anomaly_score', (event.get('sensor_data').to_f - avg).abs / avg)
end
"
}
}
# 数据压缩和采样
if [level] == "DEBUG" {
# 只保留10%的DEBUG日志
ruby {
code => "event.cancel if rand > 0.1"
}
}
}
output {
# 本地Elasticsearch
elasticsearch {
hosts => ["edge-elasticsearch:9200"]
index => "edge-logs-%{+YYYY.MM.dd}"
user => "elastic"
password => "${ELASTIC_PASSWORD}"
ssl => true
ssl_certificate_verification => false
}
# 异常数据发送到中心
if [anomaly_detected] {
http {
url => "https://central-elk.example.com/api/alerts"
http_method => "post"
format => "json"
headers => {
"Authorization" => "Bearer ${CENTRAL_API_TOKEN}"
"Content-Type" => "application/json"
}
}
}
# 数据同步到中心(批量)
if [sync_to_central] {
elasticsearch {
hosts => ["https://central-elk.example.com:9200"]
index => "edge-sync-%{edge_node}-%{+YYYY.MM.dd}"
user => "edge_sync_user"
password => "${CENTRAL_SYNC_PASSWORD}"
ssl => true
}
}
}
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: edge-logstash
namespace: edge-logging
spec:
replicas: 1
selector:
matchLabels:
app: edge-logstash
template:
metadata:
labels:
app: edge-logstash
spec:
containers:
- name: logstash
image: docker.elastic.co/logstash/logstash:8.11.0
resources:
limits:
memory: "1Gi"
cpu: "500m"
requests:
memory: "512Mi"
cpu: "250m"
env:
- name: LS_JAVA_OPTS
value: "-Xms512m -Xmx512m"
- name: ELASTIC_PASSWORD
valueFrom:
secretKeyRef:
name: elastic-credentials
key: password
- name: EDGE_NODE_ID
valueFrom:
fieldRef:
fieldPath: spec.nodeName
- name: EDGE_LOCATION
value: "factory-floor-1"
ports:
- containerPort: 5044
- containerPort: 9600
volumeMounts:
- name: config
mountPath: /usr/share/logstash/config/logstash.yml
subPath: logstash.yml
- name: pipeline
mountPath: /usr/share/logstash/pipeline/logstash.conf
subPath: pipeline.conf
- name: logs
mountPath: /var/log/edge
volumes:
- name: config
configMap:
name: edge-logstash-config
- name: pipeline
configMap:
name: edge-logstash-config
- name: logs
hostPath:
path: /var/log/edge
type: DirectoryOrCreate
2. 边缘数据同步管理
边缘数据同步脚本:
#!/usr/bin/env python3
# edge_data_sync.py
import json
import time
import requests
from datetime import datetime, timedelta
from elasticsearch import Elasticsearch
from typing import Dict, List, Any
import logging
import threading
import queue
class EdgeDataSyncManager:
def __init__(self, edge_es_host='localhost:9200', central_es_host='central-elk.example.com:9200',
edge_credentials=('elastic', 'changeme'), central_credentials=('sync_user', 'sync_pass')):
# 边缘Elasticsearch连接
self.edge_es = Elasticsearch(
[edge_es_host],
http_auth=edge_credentials,
verify_certs=False
)
# 中心Elasticsearch连接
self.central_es = Elasticsearch(
[central_es_host],
http_auth=central_credentials,
verify_certs=True
)
self.sync_queue = queue.Queue(maxsize=1000)
self.sync_stats = {
'total_synced': 0,
'failed_syncs': 0,
'last_sync_time': None,
'sync_errors': []
}
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
self.logger = logging.getLogger(__name__)
def get_edge_data_for_sync(self, hours_back: int = 1) -> List[Dict[str, Any]]:
"""获取需要同步的边缘数据"""
try:
# 查询最近的数据
query = {
'query': {
'bool': {
'must': [
{
'range': {
'@timestamp': {
'gte': f'now-{hours_back}h',
'lte': 'now'
}
}
}
],
'must_not': [
{
'exists': {
'field': 'synced_to_central'
}
}
]
}
},
'sort': [
{'@timestamp': {'order': 'asc'}}
],
'size': 1000
}
response = self.edge_es.search(
index='edge-logs-*',
body=query
)
documents = []
for hit in response['hits']['hits']:
doc = hit['_source']
doc['_id'] = hit['_id']
doc['_index'] = hit['_index']
documents.append(doc)
self.logger.info(f"Found {len(documents)} documents to sync")
return documents
except Exception as e:
self.logger.error(f"Error getting edge data: {str(e)}")
return []
def compress_and_aggregate_data(self, documents: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""压缩和聚合数据以减少传输量"""
aggregated_data = {}
for doc in documents:
# 按小时聚合数据
timestamp = doc.get('@timestamp')
if timestamp:
hour_key = timestamp[:13] # YYYY-MM-DDTHH
if hour_key not in aggregated_data:
aggregated_data[hour_key] = {
'@timestamp': hour_key + ':00:00.000Z',
'edge_node': doc.get('edge_node'),
'edge_location': doc.get('edge_location'),
'log_count': 0,
'error_count': 0,
'warning_count': 0,
'info_count': 0,
'anomaly_count': 0,
'avg_response_time': 0,
'response_times': [],
'unique_users': set(),
'top_errors': {},
'sample_logs': []
}
agg = aggregated_data[hour_key]
agg['log_count'] += 1
# 统计日志级别
level = doc.get('level', '').lower()
if level == 'error':
agg['error_count'] += 1
elif level == 'warning' or level == 'warn':
agg['warning_count'] += 1
elif level == 'info':
agg['info_count'] += 1
# 异常检测统计
if doc.get('anomaly_detected'):
agg['anomaly_count'] += 1
# 响应时间统计
if 'response_time' in doc:
agg['response_times'].append(doc['response_time'])
# 用户统计
if 'user' in doc:
agg['unique_users'].add(doc['user'])
# 错误类型统计
if level == 'error' and 'message' in doc:
error_type = doc['message'][:50] # 取前50个字符作为错误类型
agg['top_errors'][error_type] = agg['top_errors'].get(error_type, 0) + 1
# 保存样本日志(最多5条)
if len(agg['sample_logs']) < 5:
agg['sample_logs'].append({
'timestamp': doc.get('@timestamp'),
'level': doc.get('level'),
'message': doc.get('message', '')[:100] # 截取前100个字符
})
# 计算平均响应时间并转换数据类型
result = []
for hour_key, agg in aggregated_data.items():
if agg['response_times']:
agg['avg_response_time'] = sum(agg['response_times']) / len(agg['response_times'])
agg['unique_user_count'] = len(agg['unique_users'])
agg['unique_users'] = list(agg['unique_users']) # 转换为列表
# 只保留前5个错误类型
agg['top_errors'] = dict(sorted(agg['top_errors'].items(), key=lambda x: x[1], reverse=True)[:5])
del agg['response_times'] # 删除原始响应时间数组
result.append(agg)
self.logger.info(f"Aggregated {len(documents)} documents into {len(result)} hourly summaries")
return result
def sync_to_central(self, documents: List[Dict[str, Any]], use_compression: bool = True) -> Dict[str, Any]:
"""同步数据到中心"""
if not documents:
return {'success': True, 'synced_count': 0}
try:
# 数据压缩和聚合
if use_compression:
sync_data = self.compress_and_aggregate_data(documents)
index_prefix = 'edge-aggregated'
else:
sync_data = documents
index_prefix = 'edge-raw'
# 批量索引到中心
actions = []
for doc in sync_data:
action = {
'_index': f"{index_prefix}-{datetime.now().strftime('%Y.%m.%d')}",
'_source': doc
}
actions.append(action)
# 执行批量索引
from elasticsearch.helpers import bulk
success_count, failed_items = bulk(
self.central_es,
actions,
chunk_size=100,
request_timeout=60
)
# 标记原始数据为已同步
if not use_compression:
for doc in documents:
self.edge_es.update(
index=doc['_index'],
id=doc['_id'],
body={
'doc': {
'synced_to_central': True,
'sync_timestamp': datetime.now().isoformat()
}
}
)
self.sync_stats['total_synced'] += success_count
self.sync_stats['last_sync_time'] = datetime.now().isoformat()
self.logger.info(f"Successfully synced {success_count} documents to central")
return {
'success': True,
'synced_count': success_count,
'failed_count': len(failed_items) if failed_items else 0
}
except Exception as e:
error_msg = f"Error syncing to central: {str(e)}"
self.logger.error(error_msg)
self.sync_stats['failed_syncs'] += 1
self.sync_stats['sync_errors'].append({
'timestamp': datetime.now().isoformat(),
'error': error_msg
})
return {'success': False, 'error': error_msg}
def start_continuous_sync(self, sync_interval: int = 300, compression: bool = True):
"""启动连续同步"""
def sync_worker():
while True:
try:
# 获取需要同步的数据
documents = self.get_edge_data_for_sync(hours_back=1)
if documents:
# 同步到中心
result = self.sync_to_central(documents, use_compression=compression)
if result['success']:
self.logger.info(f"Sync completed: {result['synced_count']} documents")
else:
self.logger.error(f"Sync failed: {result.get('error')}")
# 等待下次同步
time.sleep(sync_interval)
except Exception as e:
self.logger.error(f"Sync worker error: {str(e)}")
time.sleep(60) # 错误时等待1分钟后重试
# 启动同步线程
sync_thread = threading.Thread(target=sync_worker, daemon=True)
sync_thread.start()
self.logger.info(f"Started continuous sync with {sync_interval}s interval")
return sync_thread
def get_sync_statistics(self) -> Dict[str, Any]:
"""获取同步统计信息"""
# 获取边缘存储使用情况
try:
edge_stats = self.edge_es.cat.indices(index='edge-logs-*', format='json')
edge_storage = sum(int(idx.get('store.size', '0').replace('kb', '').replace('mb', '').replace('gb', '')) for idx in edge_stats)
except:
edge_storage = 0
# 获取待同步数据量
try:
pending_query = {
'query': {
'bool': {
'must_not': [
{'exists': {'field': 'synced_to_central'}}
]
}
}
}
pending_count = self.edge_es.count(index='edge-logs-*', body=pending_query)['count']
except:
pending_count = 0
return {
'sync_stats': self.sync_stats,
'edge_storage_mb': edge_storage,
'pending_sync_count': pending_count,
'edge_cluster_health': self.edge_es.cluster.health(),
'central_connectivity': self._test_central_connectivity()
}
def _test_central_connectivity(self) -> Dict[str, Any]:
"""测试中心连接"""
try:
start_time = time.time()
health = self.central_es.cluster.health()
latency = (time.time() - start_time) * 1000
return {
'connected': True,
'latency_ms': round(latency, 2),
'cluster_status': health['status']
}
except Exception as e:
return {
'connected': False,
'error': str(e)
}
# 使用示例
if __name__ == "__main__":
sync_manager = EdgeDataSyncManager(
edge_es_host='localhost:9200',
central_es_host='central-elk.example.com:9200',
edge_credentials=('elastic', 'changeme'),
central_credentials=('sync_user', 'sync_password')
)
# 获取同步统计
stats = sync_manager.get_sync_statistics()
print(f"Sync Statistics: {json.dumps(stats, indent=2)}")
# 执行一次性同步
documents = sync_manager.get_edge_data_for_sync(hours_back=2)
if documents:
result = sync_manager.sync_to_central(documents, use_compression=True)
print(f"Sync Result: {result}")
# 启动连续同步(每5分钟一次)
sync_thread = sync_manager.start_continuous_sync(sync_interval=300, compression=True)
# 保持程序运行
try:
while True:
time.sleep(60)
stats = sync_manager.get_sync_statistics()
print(f"Current stats: Synced={stats['sync_stats']['total_synced']}, Pending={stats['pending_sync_count']}")
except KeyboardInterrupt:
print("Stopping sync manager...")
总结
本章深入探讨了ELK Stack的未来发展方向和新特性,涵盖了以下核心内容:
技术演进亮点
Elastic Stack 8.x新特性
- 默认安全配置增强了系统安全性
- 向量搜索支持为AI应用奠定基础
- TSDB模式优化了时序数据处理
- 运行时字段提供了更灵活的数据处理能力
云原生部署策略
- Kubernetes Operator简化了集群管理
- 多云管理实现了跨平台部署
- 自动化运维提升了运维效率
- 弹性伸缩优化了资源利用
AI/ML集成应用
- 机器学习异常检测提供了智能监控
- 自然语言处理增强了日志分析能力
- 智能告警减少了误报和漏报
- 预测性分析支持了主动运维
边缘计算集成
- 边缘节点部署实现了分布式日志处理
- 数据压缩和聚合优化了网络传输
- 本地处理能力减少了延迟
- 智能同步策略平衡了实时性和效率
实践价值
- 技术前瞻性:掌握最新技术趋势,为未来架构升级做好准备
- 架构现代化:采用云原生和微服务架构,提升系统可扩展性
- 智能化运维:利用AI/ML技术,实现智能监控和预测性维护
- 边缘计算支持:适应IoT和边缘计算场景,扩展应用范围
发展趋势
- 技术融合:ELK Stack与AI、云原生、边缘计算等技术深度融合
- 智能化:从被动监控向主动预测和自动化处理发展
- 标准化:遵循云原生标准,提升互操作性
- 生态化:构建完整的可观测性生态系统
通过本章的学习,您已经掌握了ELK Stack的前沿技术和未来发展方向。这些知识将帮助您构建更加现代化、智能化和高效的日志分析系统,为企业的数字化转型提供强有力的技术支撑。
恭喜您完成了ELK Stack完整教程的学习!
从基础安装配置到高级应用集成,从传统部署到云原生架构,从基本日志分析到AI驱动的智能运维,您已经全面掌握了ELK Stack的核心技术和最佳实践。
希望这套教程能够帮助您在实际工作中构建高效、可靠、智能的日志分析系统,为企业的可观测性建设贡献力量。
继续学习,持续进步! “`.execute_kubectl_command(cluster, command) if result[‘success’]: try: status[resource] = json.loads(result[‘output’]) except json.JSONDecodeError: status[resource] = {‘error’: ‘Invalid JSON response’} else: status[resource] = {‘error’: result[‘error’]}
return status
def get_all_clusters_status(self) -> List[Dict[str, Any]]:
"""获取所有集群状态"""
results = []
with ThreadPoolExecutor(max_workers=len(self.clusters)) as executor:
futures = {
executor.submit(self.get_cluster_status, cluster): cluster
for cluster in self.clusters.keys()
}
for future in as_completed(futures):
result = future.result()
results.append(result)
return results
def setup_cross_cluster_search(self) -> Dict[str, Any]:
"""设置跨集群搜索"""
primary_cluster = self.config.get('primary_cluster', list(self.clusters.keys())[0])
remote_clusters = [c for c in self.clusters.keys() if c != primary_cluster]
setup_results = []
for remote_cluster in remote_clusters:
# 获取远程集群的连接信息
remote_config = self.clusters[remote_cluster]
# 在主集群中配置远程集群
cross_cluster_config = {
'persistent': {
f'cluster.remote.{remote_cluster}': {
'seeds': remote_config['seeds'],
'transport.ping_schedule': '30s'
}
}
}
# 应用配置
config_json = json.dumps(cross_cluster_config)
command = [
'kubectl', 'exec', '-n', 'elastic-system',
'elasticsearch-cluster-es-default-0', '--',
'curl', '-X', 'PUT',
'https://localhost:9200/_cluster/settings',
'-H', 'Content-Type: application/json',
'-d', config_json,
'--insecure', '-u', 'elastic:${ELASTIC_PASSWORD}'
]
result = self.execute_kubectl_command(primary_cluster, command)
setup_results.append({
'remote_cluster': remote_cluster,
'setup_result': result
})
return {
'primary_cluster': primary_cluster,
'remote_clusters': remote_clusters,
'setup_results': setup_results
}
def create_global_index_template(self) -> Dict[str, Any]:
"""创建全局索引模板"""
template = {
'index_patterns': ['logs-*', 'metrics-*'],
'template': {
'settings': {
'number_of_shards': 3,
'number_of_replicas': 1,
'index.lifecycle.name': 'global-policy',
'index.lifecycle.rollover_alias': 'logs'
},
'mappings': {
'properties': {
'@timestamp': {'type': 'date'},
'cluster': {'type': 'keyword'},
'cloud.provider': {'type': 'keyword'},
'cloud.region': {'type': 'keyword'},
'message': {
'type': 'text',
'fields': {
'keyword': {
'type': 'keyword',
'ignore_above': 256
}
}
}
}
}
}
}
results = []
for cluster in self.clusters.keys():
template_json = json.dumps(template)
command = [
'kubectl', 'exec', '-n', 'elastic-system',
'elasticsearch-cluster-es-default-0', '--',
'curl', '-X', 'PUT',
'https://localhost:9200/_index_template/global-template',
'-H', 'Content-Type: application/json',
'-d', template_json,
'--insecure', '-u', 'elastic:${ELASTIC_PASSWORD}'
]
result = self