概述
在企业级环境中,ELK Stack的安全性至关重要。本章将详细介绍如何配置Elasticsearch、Kibana和Logstash的安全功能,包括身份认证、授权管理、网络安全、数据加密等方面,确保日志分析系统的安全性和合规性。
Elasticsearch安全配置
1. 启用X-Pack Security
基础安全配置
elasticsearch.yml安全配置:
# 集群配置
cluster.name: secure-elk-cluster
node.name: es-node-1
network.host: 0.0.0.0
http.port: 9200
transport.port: 9300
# 安全配置
xpack.security.enabled: true
xpack.security.enrollment.enabled: true
# 传输层安全
xpack.security.transport.ssl.enabled: true
xpack.security.transport.ssl.verification_mode: certificate
xpack.security.transport.ssl.client_authentication: required
xpack.security.transport.ssl.keystore.path: certs/elastic-certificates.p12
xpack.security.transport.ssl.truststore.path: certs/elastic-certificates.p12
# HTTP层安全
xpack.security.http.ssl.enabled: true
xpack.security.http.ssl.keystore.path: certs/elastic-certificates.p12
xpack.security.http.ssl.truststore.path: certs/elastic-certificates.p12
# 审计日志
xpack.security.audit.enabled: true
xpack.security.audit.logfile.events.include:
- access_denied
- access_granted
- anonymous_access_denied
- authentication_failed
- connection_denied
- tampered_request
- run_as_denied
- run_as_granted
# 密码策略
xpack.security.authc.password_hashing.algorithm: bcrypt
# API密钥设置
xpack.security.authc.api_key.enabled: true
xpack.security.authc.api_key.hashing.algorithm: pbkdf2_stretch
# 令牌服务
xpack.security.authc.token.enabled: true
xpack.security.authc.token.timeout: 20m
# 会话设置
xpack.security.session_timeout: 8h
SSL证书生成和配置
证书生成脚本:
#!/bin/bash
# generate-certificates.sh
CERT_DIR="/etc/elasticsearch/certs"
INSTANCE_FILE="instances.yml"
# 创建证书目录
mkdir -p "$CERT_DIR"
cd "$CERT_DIR"
# 创建实例配置文件
cat > "$INSTANCE_FILE" << EOF
instances:
- name: es-node-1
dns:
- es-node-1.local
- localhost
ip:
- 127.0.0.1
- 192.168.1.10
- name: es-node-2
dns:
- es-node-2.local
ip:
- 192.168.1.11
- name: es-node-3
dns:
- es-node-3.local
ip:
- 192.168.1.12
- name: kibana
dns:
- kibana.local
- localhost
ip:
- 127.0.0.1
- 192.168.1.20
- name: logstash
dns:
- logstash.local
ip:
- 192.168.1.30
EOF
# 生成CA证书
echo "Generating CA certificate..."
/usr/share/elasticsearch/bin/elasticsearch-certutil ca --out elastic-stack-ca.p12 --pass ""
# 生成节点证书
echo "Generating node certificates..."
/usr/share/elasticsearch/bin/elasticsearch-certutil cert \
--ca elastic-stack-ca.p12 \
--ca-pass "" \
--out elastic-certificates.p12 \
--pass "" \
--in "$INSTANCE_FILE"
# 生成HTTP证书
echo "Generating HTTP certificates..."
/usr/share/elasticsearch/bin/elasticsearch-certutil http
# 设置权限
chown -R elasticsearch:elasticsearch "$CERT_DIR"
chmod 660 "$CERT_DIR"/*.p12
echo "Certificate generation completed."
echo "Certificates location: $CERT_DIR"
echo "Please distribute certificates to all nodes."
# 验证证书
echo "\nVerifying certificates..."
openssl pkcs12 -in elastic-certificates.p12 -nodes -passin pass: | openssl x509 -noout -text | grep -E '(Subject:|DNS:|IP Address:)'
2. 用户和角色管理
内置角色配置
角色定义脚本:
#!/bin/bash
# setup-security.sh
ES_HOST="https://localhost:9200"
ES_USER="elastic"
ES_PASS="your_elastic_password"
# 创建自定义角色
echo "Creating custom roles..."
# 日志分析师角色
curl -X POST "$ES_HOST/_security/role/log_analyst" \
-u "$ES_USER:$ES_PASS" \
-H "Content-Type: application/json" \
-d '{
"cluster": ["monitor"],
"indices": [
{
"names": ["logs-*", "metrics-*"],
"privileges": ["read", "view_index_metadata"]
}
],
"applications": [
{
"application": "kibana-.kibana",
"privileges": ["read"],
"resources": ["*"]
}
]
}'
# 开发者角色
curl -X POST "$ES_HOST/_security/role/developer" \
-u "$ES_USER:$ES_PASS" \
-H "Content-Type: application/json" \
-d '{
"cluster": ["monitor", "manage_index_templates"],
"indices": [
{
"names": ["dev-*", "test-*"],
"privileges": ["all"]
},
{
"names": ["logs-*"],
"privileges": ["read", "view_index_metadata"]
}
],
"applications": [
{
"application": "kibana-.kibana",
"privileges": ["all"],
"resources": ["space:dev"]
}
]
}'
# 运维角色
curl -X POST "$ES_HOST/_security/role/ops_admin" \
-u "$ES_USER:$ES_PASS" \
-H "Content-Type: application/json" \
-d '{
"cluster": [
"monitor",
"manage_index_templates",
"manage_ingest_pipelines",
"manage_ilm",
"manage_slm"
],
"indices": [
{
"names": ["*"],
"privileges": ["all"]
}
],
"applications": [
{
"application": "kibana-.kibana",
"privileges": ["all"],
"resources": ["*"]
}
]
}'
# 只读角色
curl -X POST "$ES_HOST/_security/role/readonly" \
-u "$ES_USER:$ES_PASS" \
-H "Content-Type: application/json" \
-d '{
"cluster": ["monitor"],
"indices": [
{
"names": ["logs-*"],
"privileges": ["read"]
}
],
"applications": [
{
"application": "kibana-.kibana",
"privileges": ["read"],
"resources": ["*"]
}
]
}'
echo "Custom roles created successfully."
用户管理
用户创建和管理脚本:
# user_management.py
import requests
import json
import getpass
from urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
class ElasticsearchUserManager:
def __init__(self, es_host, admin_user, admin_password):
self.es_host = es_host
self.auth = (admin_user, admin_password)
self.session = requests.Session()
self.session.verify = False # 仅用于测试环境
def create_user(self, username, password, email, full_name, roles):
"""创建用户"""
user_data = {
"password": password,
"email": email,
"full_name": full_name,
"roles": roles,
"enabled": True
}
try:
response = self.session.post(
f"{self.es_host}/_security/user/{username}",
auth=self.auth,
json=user_data
)
if response.status_code == 200:
print(f"User '{username}' created successfully.")
return True
else:
print(f"Failed to create user '{username}': {response.text}")
return False
except Exception as e:
print(f"Error creating user '{username}': {e}")
return False
def update_user(self, username, **kwargs):
"""更新用户信息"""
try:
response = self.session.post(
f"{self.es_host}/_security/user/{username}",
auth=self.auth,
json=kwargs
)
if response.status_code == 200:
print(f"User '{username}' updated successfully.")
return True
else:
print(f"Failed to update user '{username}': {response.text}")
return False
except Exception as e:
print(f"Error updating user '{username}': {e}")
return False
def delete_user(self, username):
"""删除用户"""
try:
response = self.session.delete(
f"{self.es_host}/_security/user/{username}",
auth=self.auth
)
if response.status_code == 200:
print(f"User '{username}' deleted successfully.")
return True
else:
print(f"Failed to delete user '{username}': {response.text}")
return False
except Exception as e:
print(f"Error deleting user '{username}': {e}")
return False
def list_users(self):
"""列出所有用户"""
try:
response = self.session.get(
f"{self.es_host}/_security/user",
auth=self.auth
)
if response.status_code == 200:
users = response.json()
print("\n=== Elasticsearch Users ===")
for username, user_info in users.items():
print(f"Username: {username}")
print(f" Full Name: {user_info.get('full_name', 'N/A')}")
print(f" Email: {user_info.get('email', 'N/A')}")
print(f" Roles: {', '.join(user_info.get('roles', []))}")
print(f" Enabled: {user_info.get('enabled', False)}")
print()
return users
else:
print(f"Failed to list users: {response.text}")
return None
except Exception as e:
print(f"Error listing users: {e}")
return None
def change_password(self, username, new_password):
"""修改用户密码"""
try:
response = self.session.post(
f"{self.es_host}/_security/user/{username}/_password",
auth=self.auth,
json={"password": new_password}
)
if response.status_code == 200:
print(f"Password for user '{username}' changed successfully.")
return True
else:
print(f"Failed to change password for user '{username}': {response.text}")
return False
except Exception as e:
print(f"Error changing password for user '{username}': {e}")
return False
def enable_user(self, username):
"""启用用户"""
return self.update_user(username, enabled=True)
def disable_user(self, username):
"""禁用用户"""
return self.update_user(username, enabled=False)
def get_user_privileges(self, username):
"""获取用户权限"""
try:
response = self.session.get(
f"{self.es_host}/_security/user/{username}/_privileges",
auth=self.auth
)
if response.status_code == 200:
privileges = response.json()
print(f"\n=== Privileges for user '{username}' ===")
print(json.dumps(privileges, indent=2))
return privileges
else:
print(f"Failed to get privileges for user '{username}': {response.text}")
return None
except Exception as e:
print(f"Error getting privileges for user '{username}': {e}")
return None
def bulk_create_users(self, users_config):
"""批量创建用户"""
success_count = 0
total_count = len(users_config)
for user_config in users_config:
if self.create_user(**user_config):
success_count += 1
print(f"\nBulk user creation completed: {success_count}/{total_count} users created successfully.")
return success_count == total_count
# 使用示例
if __name__ == "__main__":
# 初始化用户管理器
user_manager = ElasticsearchUserManager(
es_host="https://localhost:9200",
admin_user="elastic",
admin_password="your_elastic_password"
)
# 批量创建用户
users_to_create = [
{
"username": "log_analyst_1",
"password": "SecurePass123!",
"email": "analyst1@company.com",
"full_name": "Log Analyst 1",
"roles": ["log_analyst"]
},
{
"username": "developer_1",
"password": "DevPass123!",
"email": "dev1@company.com",
"full_name": "Developer 1",
"roles": ["developer"]
},
{
"username": "ops_admin_1",
"password": "OpsPass123!",
"email": "ops1@company.com",
"full_name": "Operations Admin 1",
"roles": ["ops_admin"]
},
{
"username": "readonly_user",
"password": "ReadPass123!",
"email": "readonly@company.com",
"full_name": "Read Only User",
"roles": ["readonly"]
}
]
# 执行批量创建
user_manager.bulk_create_users(users_to_create)
# 列出所有用户
user_manager.list_users()
3. API密钥管理
API密钥创建和管理
API密钥管理脚本:
# api_key_management.py
import requests
import json
import base64
from datetime import datetime, timedelta
from urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
class ElasticsearchAPIKeyManager:
def __init__(self, es_host, username, password):
self.es_host = es_host
self.auth = (username, password)
self.session = requests.Session()
self.session.verify = False
def create_api_key(self, name, role_descriptors=None, expiration=None, metadata=None):
"""创建API密钥"""
api_key_data = {
"name": name
}
if role_descriptors:
api_key_data["role_descriptors"] = role_descriptors
if expiration:
api_key_data["expiration"] = expiration
if metadata:
api_key_data["metadata"] = metadata
try:
response = self.session.post(
f"{self.es_host}/_security/api_key",
auth=self.auth,
json=api_key_data
)
if response.status_code == 200:
result = response.json()
api_key_credentials = base64.b64encode(
f"{result['id']}:{result['api_key']}".encode()
).decode()
print(f"API Key '{name}' created successfully.")
print(f"ID: {result['id']}")
print(f"API Key: {result['api_key']}")
print(f"Encoded: {api_key_credentials}")
return {
"id": result['id'],
"api_key": result['api_key'],
"encoded": api_key_credentials
}
else:
print(f"Failed to create API key '{name}': {response.text}")
return None
except Exception as e:
print(f"Error creating API key '{name}': {e}")
return None
def list_api_keys(self, username=None):
"""列出API密钥"""
url = f"{self.es_host}/_security/api_key"
if username:
url += f"?username={username}"
try:
response = self.session.get(url, auth=self.auth)
if response.status_code == 200:
api_keys = response.json()
print("\n=== API Keys ===")
for key in api_keys.get('api_keys', []):
print(f"ID: {key['id']}")
print(f"Name: {key['name']}")
print(f"Username: {key['username']}")
print(f"Creation: {key['creation']}")
if 'expiration' in key:
print(f"Expiration: {key['expiration']}")
print(f"Invalidated: {key['invalidated']}")
print()
return api_keys
else:
print(f"Failed to list API keys: {response.text}")
return None
except Exception as e:
print(f"Error listing API keys: {e}")
return None
def invalidate_api_key(self, key_ids=None, key_names=None, username=None):
"""使API密钥失效"""
invalidate_data = {}
if key_ids:
invalidate_data["ids"] = key_ids if isinstance(key_ids, list) else [key_ids]
if key_names:
invalidate_data["names"] = key_names if isinstance(key_names, list) else [key_names]
if username:
invalidate_data["username"] = username
try:
response = self.session.delete(
f"{self.es_host}/_security/api_key",
auth=self.auth,
json=invalidate_data
)
if response.status_code == 200:
result = response.json()
print(f"Invalidated {len(result['invalidated_api_keys'])} API keys.")
for key_id in result['invalidated_api_keys']:
print(f" - {key_id}")
if result['error_details']:
print("Errors:")
for error in result['error_details']:
print(f" - {error}")
return result
else:
print(f"Failed to invalidate API keys: {response.text}")
return None
except Exception as e:
print(f"Error invalidating API keys: {e}")
return None
def get_api_key_info(self, api_key_id):
"""获取API密钥信息"""
try:
response = self.session.get(
f"{self.es_host}/_security/api_key?id={api_key_id}",
auth=self.auth
)
if response.status_code == 200:
result = response.json()
if result['api_keys']:
key_info = result['api_keys'][0]
print(f"\n=== API Key Info ===")
print(f"ID: {key_info['id']}")
print(f"Name: {key_info['name']}")
print(f"Username: {key_info['username']}")
print(f"Creation: {key_info['creation']}")
if 'expiration' in key_info:
print(f"Expiration: {key_info['expiration']}")
print(f"Invalidated: {key_info['invalidated']}")
if 'metadata' in key_info:
print(f"Metadata: {json.dumps(key_info['metadata'], indent=2)}")
return key_info
else:
print(f"API key '{api_key_id}' not found.")
return None
else:
print(f"Failed to get API key info: {response.text}")
return None
except Exception as e:
print(f"Error getting API key info: {e}")
return None
def create_service_api_keys(self):
"""创建服务专用API密钥"""
service_keys = []
# Logstash API密钥
logstash_role = {
"logstash_writer": {
"cluster": ["monitor", "manage_index_templates", "manage_ingest_pipelines"],
"indices": [
{
"names": ["logs-*", "metrics-*"],
"privileges": ["create_index", "write", "create"]
}
]
}
}
logstash_key = self.create_api_key(
name="logstash-service",
role_descriptors=logstash_role,
expiration="365d",
metadata={"service": "logstash", "environment": "production"}
)
if logstash_key:
service_keys.append({"service": "logstash", "key": logstash_key})
# Beats API密钥
beats_role = {
"beats_writer": {
"cluster": ["monitor", "manage_ingest_pipelines"],
"indices": [
{
"names": ["logs-*", "metrics-*", "filebeat-*", "metricbeat-*"],
"privileges": ["create_index", "write", "create"]
}
]
}
}
beats_key = self.create_api_key(
name="beats-service",
role_descriptors=beats_role,
expiration="365d",
metadata={"service": "beats", "environment": "production"}
)
if beats_key:
service_keys.append({"service": "beats", "key": beats_key})
# 监控API密钥
monitoring_role = {
"monitoring_reader": {
"cluster": ["monitor"],
"indices": [
{
"names": ["*"],
"privileges": ["read", "view_index_metadata"]
}
]
}
}
monitoring_key = self.create_api_key(
name="monitoring-service",
role_descriptors=monitoring_role,
expiration="90d",
metadata={"service": "monitoring", "environment": "production"}
)
if monitoring_key:
service_keys.append({"service": "monitoring", "key": monitoring_key})
return service_keys
def rotate_api_keys(self, service_name):
"""轮换API密钥"""
print(f"Rotating API keys for service: {service_name}")
# 获取现有密钥
existing_keys = self.list_api_keys()
service_keys = []
if existing_keys:
for key in existing_keys.get('api_keys', []):
if key['name'].startswith(service_name):
service_keys.append(key)
# 创建新密钥
new_key = self.create_api_key(
name=f"{service_name}-{datetime.now().strftime('%Y%m%d')}",
expiration="365d",
metadata={"service": service_name, "rotation_date": datetime.now().isoformat()}
)
if new_key:
print(f"New API key created for {service_name}")
# 可选:使旧密钥失效(建议在确认新密钥工作正常后执行)
# old_key_ids = [key['id'] for key in service_keys]
# if old_key_ids:
# self.invalidate_api_key(key_ids=old_key_ids)
return new_key
return None
# 使用示例
if __name__ == "__main__":
# 初始化API密钥管理器
api_manager = ElasticsearchAPIKeyManager(
es_host="https://localhost:9200",
username="elastic",
password="your_elastic_password"
)
# 创建服务API密钥
service_keys = api_manager.create_service_api_keys()
# 列出所有API密钥
api_manager.list_api_keys()
# 保存API密钥到配置文件
with open('api_keys.json', 'w') as f:
json.dump(service_keys, f, indent=2)
print("\nAPI keys saved to api_keys.json")
Kibana安全配置
1. Kibana安全设置
kibana.yml安全配置:
# 服务器配置
server.host: "0.0.0.0"
server.port: 5601
server.name: "kibana-secure"
# Elasticsearch连接
elasticsearch.hosts: ["https://es-node-1:9200", "https://es-node-2:9200"]
elasticsearch.username: "kibana_system"
elasticsearch.password: "kibana_system_password"
# SSL配置
server.ssl.enabled: true
server.ssl.certificate: "/etc/kibana/certs/kibana.crt"
server.ssl.key: "/etc/kibana/certs/kibana.key"
# Elasticsearch SSL
elasticsearch.ssl.certificateAuthorities: ["/etc/kibana/certs/ca.crt"]
elasticsearch.ssl.verificationMode: certificate
# 安全配置
xpack.security.enabled: true
xpack.security.encryptionKey: "your-32-character-encryption-key-here"
xpack.security.session.idleTimeout: "8h"
xpack.security.session.lifespan: "24h"
# 审计日志
xpack.security.audit.enabled: true
xpack.security.audit.appender.type: file
xpack.security.audit.appender.fileName: /var/log/kibana/kibana_audit.log
xpack.security.audit.appender.layout.type: json
# 空间配置
xpack.spaces.enabled: true
xpack.spaces.maxSpaces: 1000
# 报告配置
xpack.reporting.enabled: true
xpack.reporting.encryptionKey: "your-32-character-reporting-key-here"
xpack.reporting.kibanaServer.hostname: "kibana.local"
xpack.reporting.kibanaServer.port: 5601
xpack.reporting.kibanaServer.protocol: "https"
# 监控配置
xpack.monitoring.enabled: true
xpack.monitoring.kibana.collection.enabled: true
xpack.monitoring.ui.container.elasticsearch.enabled: true
# 告警配置
xpack.actions.enabled: true
xpack.alerting.enabled: true
xpack.alerting.healthCheck.interval: "60s"
# 机器学习
xpack.ml.enabled: true
# 图形配置
xpack.graph.enabled: true
# 日志配置
logging.appenders.file.type: file
logging.appenders.file.fileName: /var/log/kibana/kibana.log
logging.appenders.file.layout.type: json
logging.root.level: info
logging.root.appenders: [file]
# 安全头配置
server.securityResponseHeaders.strictTransportSecurity: "max-age=31536000; includeSubDomains"
server.securityResponseHeaders.xContentTypeOptions: "nosniff"
server.securityResponseHeaders.xFrameOptions: "DENY"
server.securityResponseHeaders.contentSecurityPolicy: "default-src 'self'; script-src 'self' 'unsafe-eval'; style-src 'self' 'unsafe-inline'; img-src 'self' data: blob:; font-src 'self' data:; connect-src 'self';"
2. 空间和权限管理
空间管理脚本:
# kibana_space_management.py
import requests
import json
from urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
class KibanaSpaceManager:
def __init__(self, kibana_host, username, password):
self.kibana_host = kibana_host
self.auth = (username, password)
self.session = requests.Session()
self.session.verify = False
self.session.headers.update({
'Content-Type': 'application/json',
'kbn-xsrf': 'true'
})
def create_space(self, space_id, name, description=None, color=None, initials=None, disabled_features=None):
"""创建空间"""
space_data = {
"id": space_id,
"name": name
}
if description:
space_data["description"] = description
if color:
space_data["color"] = color
if initials:
space_data["initials"] = initials
if disabled_features:
space_data["disabledFeatures"] = disabled_features
try:
response = self.session.post(
f"{self.kibana_host}/api/spaces/space",
auth=self.auth,
json=space_data
)
if response.status_code == 200:
print(f"Space '{name}' created successfully.")
return response.json()
else:
print(f"Failed to create space '{name}': {response.text}")
return None
except Exception as e:
print(f"Error creating space '{name}': {e}")
return None
def list_spaces(self):
"""列出所有空间"""
try:
response = self.session.get(
f"{self.kibana_host}/api/spaces/space",
auth=self.auth
)
if response.status_code == 200:
spaces = response.json()
print("\n=== Kibana Spaces ===")
for space in spaces:
print(f"ID: {space['id']}")
print(f"Name: {space['name']}")
print(f"Description: {space.get('description', 'N/A')}")
print(f"Color: {space.get('color', 'N/A')}")
print(f"Disabled Features: {space.get('disabledFeatures', [])}")
print()
return spaces
else:
print(f"Failed to list spaces: {response.text}")
return None
except Exception as e:
print(f"Error listing spaces: {e}")
return None
def update_space(self, space_id, **kwargs):
"""更新空间"""
try:
response = self.session.put(
f"{self.kibana_host}/api/spaces/space/{space_id}",
auth=self.auth,
json=kwargs
)
if response.status_code == 200:
print(f"Space '{space_id}' updated successfully.")
return response.json()
else:
print(f"Failed to update space '{space_id}': {response.text}")
return None
except Exception as e:
print(f"Error updating space '{space_id}': {e}")
return None
def delete_space(self, space_id):
"""删除空间"""
try:
response = self.session.delete(
f"{self.kibana_host}/api/spaces/space/{space_id}",
auth=self.auth
)
if response.status_code == 204:
print(f"Space '{space_id}' deleted successfully.")
return True
else:
print(f"Failed to delete space '{space_id}': {response.text}")
return False
except Exception as e:
print(f"Error deleting space '{space_id}': {e}")
return False
def copy_saved_objects(self, source_space, target_space, object_types=None, include_references=True):
"""复制保存的对象到其他空间"""
copy_data = {
"spaces": [target_space],
"includeReferences": include_references
}
if object_types:
copy_data["objects"] = [
{"type": obj_type, "id": "*"} for obj_type in object_types
]
try:
response = self.session.post(
f"{self.kibana_host}/s/{source_space}/api/spaces/_copy_saved_objects",
auth=self.auth,
json=copy_data
)
if response.status_code == 200:
result = response.json()
print(f"Copied saved objects from '{source_space}' to '{target_space}'.")
print(f"Success: {result.get('successCount', 0)} objects")
if result.get('errors'):
print(f"Errors: {len(result['errors'])} objects")
return result
else:
print(f"Failed to copy saved objects: {response.text}")
return None
except Exception as e:
print(f"Error copying saved objects: {e}")
return None
def setup_department_spaces(self):
"""设置部门空间"""
departments = [
{
"id": "development",
"name": "Development",
"description": "Development team workspace",
"color": "#1BA9F5",
"initials": "DEV",
"disabled_features": ["ml", "graph"]
},
{
"id": "operations",
"name": "Operations",
"description": "Operations team workspace",
"color": "#FF6B47",
"initials": "OPS",
"disabled_features": []
},
{
"id": "security",
"name": "Security",
"description": "Security team workspace",
"color": "#E74C3C",
"initials": "SEC",
"disabled_features": ["graph"]
},
{
"id": "analytics",
"name": "Analytics",
"description": "Data analytics workspace",
"color": "#9C27B0",
"initials": "ANA",
"disabled_features": []
}
]
created_spaces = []
for dept in departments:
space = self.create_space(**dept)
if space:
created_spaces.append(space)
return created_spaces
# 使用示例
if __name__ == "__main__":
# 初始化空间管理器
space_manager = KibanaSpaceManager(
kibana_host="https://localhost:5601",
username="elastic",
password="your_elastic_password"
)
# 设置部门空间
spaces = space_manager.setup_department_spaces()
# 列出所有空间
space_manager.list_spaces()
Logstash安全配置
1. Logstash安全设置
logstash.yml安全配置:
# 节点配置
node.name: logstash-secure
path.data: /var/lib/logstash
path.logs: /var/log/logstash
path.settings: /etc/logstash
# 管道配置
pipeline.workers: 4
pipeline.batch.size: 1000
pipeline.batch.delay: 50
# 监控配置
xpack.monitoring.enabled: true
xpack.monitoring.elasticsearch.hosts: ["https://es-node-1:9200", "https://es-node-2:9200"]
xpack.monitoring.elasticsearch.username: "logstash_system"
xpack.monitoring.elasticsearch.password: "logstash_system_password"
xpack.monitoring.elasticsearch.ssl.certificate_authority: "/etc/logstash/certs/ca.crt"
xpack.monitoring.elasticsearch.ssl.verification_mode: certificate
# 管理API配置
api.enabled: true
api.host: "127.0.0.1"
api.port: 9600
api.ssl.enabled: true
api.ssl.certificate: "/etc/logstash/certs/logstash.crt"
api.ssl.key: "/etc/logstash/certs/logstash.key"
api.auth.type: basic
api.auth.basic.username: "logstash_admin"
api.auth.basic.password: "logstash_admin_password"
# 日志配置
log.level: info
log.format: json
path.logs: /var/log/logstash
# 安全配置
config.reload.automatic: true
config.reload.interval: 3s
config.test_and_exit: false
config.string: ""
# 死信队列
dead_letter_queue.enable: true
dead_letter_queue.max_bytes: 1gb
# 持久队列
queue.type: persisted
queue.max_bytes: 4gb
queue.checkpoint.writes: 1024
2. 安全管道配置
安全的Logstash管道配置:
# secure-pipeline.conf
input {
beats {
port => 5044
ssl => true
ssl_certificate => "/etc/logstash/certs/logstash.crt"
ssl_key => "/etc/logstash/certs/logstash.key"
ssl_certificate_authorities => ["/etc/logstash/certs/ca.crt"]
ssl_verify_mode => "force_peer"
ssl_peer_metadata => true
}
http {
port => 8080
ssl => true
ssl_certificate => "/etc/logstash/certs/logstash.crt"
ssl_key => "/etc/logstash/certs/logstash.key"
ssl_verify_mode => "none"
# 基础认证
user => "logstash_http"
password => "secure_password"
# 限制访问
host => "127.0.0.1"
# 添加安全头
additional_codecs => {
"application/json" => "json"
}
}
kafka {
bootstrap_servers => "kafka1:9093,kafka2:9093,kafka3:9093"
topics => ["secure-logs"]
group_id => "logstash-secure-group"
# SSL配置
security_protocol => "SSL"
ssl_truststore_location => "/etc/logstash/certs/kafka.truststore.jks"
ssl_truststore_password => "truststore_password"
ssl_keystore_location => "/etc/logstash/certs/kafka.keystore.jks"
ssl_keystore_password => "keystore_password"
ssl_key_password => "key_password"
# SASL配置(可选)
# sasl_mechanism => "PLAIN"
# sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='logstash' password='logstash_password';"
}
}
filter {
# 数据清理和验证
if ![host] or [host] == "" {
mutate {
add_field => { "parse_error" => "missing_host" }
add_tag => ["_validation_error"]
}
}
# 敏感数据脱敏
mutate {
# 移除或脱敏敏感字段
remove_field => ["password", "credit_card", "ssn"]
# IP地址脱敏
gsub => [
"client_ip", "\.(\d{1,3})$", ".xxx"
]
}
# 数据验证
if [message] {
# 检查消息长度
if [message] =~ /.{10000,}/ {
mutate {
add_tag => ["_oversized_message"]
replace => { "message" => "[OVERSIZED MESSAGE TRUNCATED]" }
}
}
# 检查恶意内容
if [message] =~ /(?i)(script|javascript|vbscript|onload|onerror)/ {
mutate {
add_tag => ["_potential_xss"]
add_field => { "security_alert" => "potential_xss_detected" }
}
}
}
# 添加安全元数据
mutate {
add_field => {
"[@metadata][security_processed]" => "true"
"[@metadata][processing_timestamp]" => "%{+YYYY-MM-dd'T'HH:mm:ss.SSSZ}"
}
}
# 错误处理
if "_grokparsefailure" in [tags] {
mutate {
add_field => { "parse_error" => "grok_failure" }
add_tag => ["_parse_error"]
}
}
}
output {
# 正常日志输出
if "_validation_error" not in [tags] and "_parse_error" not in [tags] {
elasticsearch {
hosts => ["https://es-node-1:9200", "https://es-node-2:9200", "https://es-node-3:9200"]
# 认证配置
user => "logstash_writer"
password => "logstash_writer_password"
# SSL配置
ssl => true
cacert => "/etc/logstash/certs/ca.crt"
ssl_certificate_verification => true
# 索引配置
index => "secure-logs-%{+YYYY.MM.dd}"
# 性能配置
flush_size => 1000
idle_flush_time => 5
# 重试配置
retry_max_interval => 5
retry_max_times => 3
# 模板配置
template_name => "secure-logs"
template_pattern => "secure-logs-*"
template => "/etc/logstash/templates/secure-logs-template.json"
template_overwrite => true
}
}
# 错误日志单独处理
if "_validation_error" in [tags] or "_parse_error" in [tags] {
elasticsearch {
hosts => ["https://es-node-1:9200", "https://es-node-2:9200", "https://es-node-3:9200"]
user => "logstash_writer"
password => "logstash_writer_password"
ssl => true
cacert => "/etc/logstash/certs/ca.crt"
index => "logstash-errors-%{+YYYY.MM.dd}"
}
}
# 安全告警
if "_potential_xss" in [tags] {
http {
url => "https://security-webhook.company.com/alerts"
http_method => "post"
headers => {
"Authorization" => "Bearer your_webhook_token"
"Content-Type" => "application/json"
}
format => "json"
mapping => {
"alert_type" => "potential_xss"
"timestamp" => "%{@timestamp}"
"host" => "%{host}"
"message" => "%{message}"
}
}
}
# 调试输出(仅开发环境)
if [@metadata][debug] == "true" {
stdout {
codec => rubydebug {
metadata => true
}
}
}
}
网络安全配置
1. 防火墙配置
防火墙规则脚本:
#!/bin/bash
# elk-firewall-setup.sh
echo "Setting up ELK Stack firewall rules..."
# 清除现有规则
iptables -F
iptables -X
iptables -t nat -F
iptables -t nat -X
iptables -t mangle -F
iptables -t mangle -X
# 设置默认策略
iptables -P INPUT DROP
iptables -P FORWARD DROP
iptables -P OUTPUT ACCEPT
# 允许本地回环
iptables -A INPUT -i lo -j ACCEPT
iptables -A OUTPUT -o lo -j ACCEPT
# 允许已建立的连接
iptables -A INPUT -m state --state ESTABLISHED,RELATED -j ACCEPT
# SSH访问(限制源IP)
iptables -A INPUT -p tcp --dport 22 -s 192.168.1.0/24 -j ACCEPT
iptables -A INPUT -p tcp --dport 22 -s 10.0.0.0/8 -j ACCEPT
# Elasticsearch集群通信(仅内部网络)
iptables -A INPUT -p tcp --dport 9200 -s 192.168.1.0/24 -j ACCEPT
iptables -A INPUT -p tcp --dport 9300 -s 192.168.1.0/24 -j ACCEPT
# Kibana访问(限制源IP)
iptables -A INPUT -p tcp --dport 5601 -s 192.168.1.0/24 -j ACCEPT
iptables -A INPUT -p tcp --dport 5601 -s 10.0.0.0/8 -j ACCEPT
# Logstash输入端口
iptables -A INPUT -p tcp --dport 5044 -s 192.168.1.0/24 -j ACCEPT # Beats
iptables -A INPUT -p tcp --dport 8080 -s 192.168.1.0/24 -j ACCEPT # HTTP
iptables -A INPUT -p tcp --dport 9600 -s 127.0.0.1 -j ACCEPT # API
# 监控端口(仅本地)
iptables -A INPUT -p tcp --dport 9114 -s 127.0.0.1 -j ACCEPT # Node Exporter
iptables -A INPUT -p tcp --dport 9108 -s 127.0.0.1 -j ACCEPT # ES Exporter
# ICMP(ping)
iptables -A INPUT -p icmp --icmp-type echo-request -j ACCEPT
# 日志记录被拒绝的连接
iptables -A INPUT -m limit --limit 5/min -j LOG --log-prefix "iptables denied: " --log-level 7
# 保存规则
if command -v iptables-save > /dev/null; then
iptables-save > /etc/iptables/rules.v4
fi
echo "Firewall rules applied successfully."
# 显示当前规则
echo "\nCurrent firewall rules:"
iptables -L -n -v
2. 网络监控
网络安全监控脚本:
# network_security_monitor.py
import subprocess
import re
import time
import json
from datetime import datetime
import smtplib
from email.mime.text import MimeText
from email.mime.multipart import MimeMultipart
class NetworkSecurityMonitor:
def __init__(self, config_file="network_monitor_config.json"):
self.config = self.load_config(config_file)
self.alerts = []
def load_config(self, config_file):
"""加载配置文件"""
try:
with open(config_file, 'r') as f:
return json.load(f)
except FileNotFoundError:
# 默认配置
return {
"monitoring": {
"ports": [9200, 5601, 5044, 9600],
"max_connections_per_ip": 100,
"suspicious_patterns": [
r".*\.(exe|bat|cmd|ps1)$",
r".*[<>\"'&].*",
r".*union.*select.*",
r".*script.*alert.*"
]
},
"alerting": {
"email": {
"enabled": True,
"smtp_server": "smtp.company.com",
"smtp_port": 587,
"username": "alerts@company.com",
"password": "alert_password",
"recipients": ["security@company.com"]
}
}
}
def check_port_security(self):
"""检查端口安全性"""
alerts = []
for port in self.config['monitoring']['ports']:
try:
# 检查端口监听状态
result = subprocess.run(
['netstat', '-tlnp'],
capture_output=True,
text=True
)
port_lines = [line for line in result.stdout.split('\n') if f':{port}' in line]
for line in port_lines:
if '0.0.0.0' in line:
alerts.append({
'type': 'security_warning',
'message': f'Port {port} is listening on all interfaces (0.0.0.0)',
'severity': 'medium',
'timestamp': datetime.now().isoformat()
})
except Exception as e:
alerts.append({
'type': 'monitoring_error',
'message': f'Failed to check port {port}: {e}',
'severity': 'low',
'timestamp': datetime.now().isoformat()
})
return alerts
def check_connection_limits(self):
"""检查连接数限制"""
alerts = []
try:
# 获取当前连接统计
result = subprocess.run(
['netstat', '-tn'],
capture_output=True,
text=True
)
# 统计每个IP的连接数
ip_connections = {}
for line in result.stdout.split('\n'):
if 'ESTABLISHED' in line:
parts = line.split()
if len(parts) >= 5:
foreign_addr = parts[4]
ip = foreign_addr.split(':')[0]
if ip not in ip_connections:
ip_connections[ip] = 0
ip_connections[ip] += 1
# 检查是否超过限制
max_connections = self.config['monitoring']['max_connections_per_ip']
for ip, count in ip_connections.items():
if count > max_connections:
alerts.append({
'type': 'connection_limit_exceeded',
'message': f'IP {ip} has {count} connections (limit: {max_connections})',
'severity': 'high',
'timestamp': datetime.now().isoformat(),
'ip': ip,
'connection_count': count
})
except Exception as e:
alerts.append({
'type': 'monitoring_error',
'message': f'Failed to check connection limits: {e}',
'severity': 'low',
'timestamp': datetime.now().isoformat()
})
return alerts
def check_log_patterns(self):
"""检查日志中的可疑模式"""
alerts = []
log_files = [
'/var/log/elasticsearch/elasticsearch.log',
'/var/log/kibana/kibana.log',
'/var/log/logstash/logstash.log',
'/var/log/nginx/access.log'
]
suspicious_patterns = self.config['monitoring']['suspicious_patterns']
for log_file in log_files:
try:
# 读取最近的日志条目
result = subprocess.run(
['tail', '-n', '1000', log_file],
capture_output=True,
text=True
)
for line_num, line in enumerate(result.stdout.split('\n'), 1):
for pattern in suspicious_patterns:
if re.search(pattern, line, re.IGNORECASE):
alerts.append({
'type': 'suspicious_pattern_detected',
'message': f'Suspicious pattern found in {log_file}:{line_num}',
'severity': 'high',
'timestamp': datetime.now().isoformat(),
'log_file': log_file,
'line_number': line_num,
'pattern': pattern,
'log_line': line[:200] # 限制长度
})
except Exception as e:
alerts.append({
'type': 'monitoring_error',
'message': f'Failed to check log file {log_file}: {e}',
'severity': 'low',
'timestamp': datetime.now().isoformat()
})
return alerts
def send_alert_email(self, alerts):
"""发送告警邮件"""
if not self.config['alerting']['email']['enabled'] or not alerts:
return
try:
smtp_config = self.config['alerting']['email']
msg = MimeMultipart()
msg['From'] = smtp_config['username']
msg['To'] = ', '.join(smtp_config['recipients'])
msg['Subject'] = f'ELK Stack Security Alert - {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}'
# 构建邮件内容
body = "ELK Stack Security Monitoring Alert\n\n"
body += f"Alert Time: {datetime.now().isoformat()}\n"
body += f"Total Alerts: {len(alerts)}\n\n"
for alert in alerts:
body += f"Type: {alert['type']}\n"
body += f"Severity: {alert['severity']}\n"
body += f"Message: {alert['message']}\n"
body += f"Timestamp: {alert['timestamp']}\n"
body += "-" * 50 + "\n"
msg.attach(MimeText(body, 'plain'))
# 发送邮件
server = smtplib.SMTP(smtp_config['smtp_server'], smtp_config['smtp_port'])
server.starttls()
server.login(smtp_config['username'], smtp_config['password'])
server.send_message(msg)
server.quit()
print(f"Alert email sent to {smtp_config['recipients']}")
except Exception as e:
print(f"Failed to send alert email: {e}")
def monitor(self, duration_minutes=60, interval_seconds=300):
"""持续监控网络安全"""
print(f"Starting network security monitoring for {duration_minutes} minutes...")
end_time = time.time() + (duration_minutes * 60)
while time.time() < end_time:
all_alerts = []
# 执行各项检查
all_alerts.extend(self.check_port_security())
all_alerts.extend(self.check_connection_limits())
all_alerts.extend(self.check_log_patterns())
if all_alerts:
print(f"\n[{datetime.now()}] Security alerts detected:")
for alert in all_alerts:
print(f" {alert['severity'].upper()}: {alert['message']}")
# 发送告警
self.send_alert_email(all_alerts)
self.alerts.extend(all_alerts)
else:
print(f"[{datetime.now()}] No security issues detected")
time.sleep(interval_seconds)
print(f"\nMonitoring completed. Total alerts: {len(self.alerts)}")
return self.alerts
# 使用示例
if __name__ == "__main__":
monitor = NetworkSecurityMonitor()
monitor.monitor(duration_minutes=60, interval_seconds=300)
数据加密
1. 传输加密
TLS配置最佳实践:
# elasticsearch.yml - 高级TLS配置
xpack.security.transport.ssl.enabled: true
xpack.security.transport.ssl.verification_mode: full
xpack.security.transport.ssl.client_authentication: required
xpack.security.transport.ssl.supported_protocols: ["TLSv1.2", "TLSv1.3"]
xpack.security.transport.ssl.cipher_suites:
- "TLS_AES_256_GCM_SHA384"
- "TLS_AES_128_GCM_SHA256"
- "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384"
- "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256"
xpack.security.http.ssl.enabled: true
xpack.security.http.ssl.supported_protocols: ["TLSv1.2", "TLSv1.3"]
xpack.security.http.ssl.cipher_suites:
- "TLS_AES_256_GCM_SHA384"
- "TLS_AES_128_GCM_SHA256"
- "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384"
- "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256"
2. 静态数据加密
文件系统加密脚本:
#!/bin/bash
# setup-encryption.sh
DATA_DIR="/var/lib/elasticsearch"
ENCRYPTED_DIR="/encrypted/elasticsearch"
KEY_FILE="/etc/elasticsearch/encryption.key"
echo "Setting up filesystem encryption for Elasticsearch data..."
# 生成加密密钥
if [ ! -f "$KEY_FILE" ]; then
echo "Generating encryption key..."
openssl rand -base64 32 > "$KEY_FILE"
chmod 600 "$KEY_FILE"
chown elasticsearch:elasticsearch "$KEY_FILE"
fi
# 创建加密分区
echo "Creating encrypted partition..."
cryptsetup luksFormat /dev/sdb1 --key-file="$KEY_FILE"
cryptsetup luksOpen /dev/sdb1 elasticsearch-data --key-file="$KEY_FILE"
# 格式化并挂载
mkfs.ext4 /dev/mapper/elasticsearch-data
mkdir -p "$ENCRYPTED_DIR"
mount /dev/mapper/elasticsearch-data "$ENCRYPTED_DIR"
# 设置权限
chown -R elasticsearch:elasticsearch "$ENCRYPTED_DIR"
chmod 750 "$ENCRYPTED_DIR"
# 更新fstab
echo "/dev/mapper/elasticsearch-data $ENCRYPTED_DIR ext4 defaults 0 2" >> /etc/fstab
# 创建自动解锁脚本
cat > /etc/systemd/system/elasticsearch-decrypt.service << EOF
[Unit]
Description=Decrypt Elasticsearch Data Partition
Before=elasticsearch.service
[Service]
Type=oneshot
ExecStart=/usr/sbin/cryptsetup luksOpen /dev/sdb1 elasticsearch-data --key-file=$KEY_FILE
RemainAfterExit=yes
[Install]
WantedBy=multi-user.target
EOF
systemctl enable elasticsearch-decrypt.service
echo "Filesystem encryption setup completed."
3. 数据脱敏
Logstash数据脱敏配置:
# logstash-data-masking.conf
input {
beats {
port => 5044
}
}
filter {
# 脱敏信用卡号
if [message] =~ /\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}/ {
mutate {
gsub => [
"message", "\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}", "****-****-****-****"
]
}
}
# 脱敏身份证号
if [message] =~ /\d{17}[\dXx]/ {
mutate {
gsub => [
"message", "(\d{6})\d{8}(\d{3}[\dXx])", "\1********\2"
]
}
}
# 脱敏手机号
if [message] =~ /1[3-9]\d{9}/ {
mutate {
gsub => [
"message", "(1[3-9]\d)\d{4}(\d{4})", "\1****\2"
]
}
}
# 脱敏邮箱地址
if [message] =~ /[\w.-]+@[\w.-]+\.[a-zA-Z]{2,}/ {
mutate {
gsub => [
"message", "([\w.-]{1,3})[\w.-]*@([\w.-]+\.[a-zA-Z]{2,})", "\1***@\2"
]
}
}
# 脱敏IP地址(保留网段)
if [message] =~ /\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b/ {
mutate {
gsub => [
"message", "\b(\d{1,3}\.\d{1,3}\.)\d{1,3}\.\d{1,3}\b", "\1***.***"
]
}
}
# 添加脱敏标记
mutate {
add_field => { "data_masked" => "true" }
add_field => { "masking_timestamp" => "%{@timestamp}" }
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "masked-logs-%{+YYYY.MM.dd}"
user => "logstash_writer"
password => "${LOGSTASH_PASSWORD}"
}
}
Python数据脱敏工具:
#!/usr/bin/env python3
# data_masking_tool.py
import re
import json
import hashlib
from typing import Dict, List, Any
from datetime import datetime
class DataMaskingTool:
def __init__(self):
self.masking_rules = {
'credit_card': {
'pattern': r'\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}',
'replacement': '****-****-****-****',
'description': 'Credit card number'
},
'ssn': {
'pattern': r'\d{3}-\d{2}-\d{4}',
'replacement': '***-**-****',
'description': 'Social Security Number'
},
'phone': {
'pattern': r'1[3-9]\d{9}',
'replacement': lambda m: m.group()[:3] + '****' + m.group()[-4:],
'description': 'Phone number'
},
'email': {
'pattern': r'[\w.-]+@[\w.-]+\.[a-zA-Z]{2,}',
'replacement': lambda m: m.group().split('@')[0][:3] + '***@' + m.group().split('@')[1],
'description': 'Email address'
},
'ip_address': {
'pattern': r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b',
'replacement': lambda m: '.'.join(m.group().split('.')[:2]) + '.***.***.***',
'description': 'IP address'
}
}
self.masking_stats = {
'total_processed': 0,
'total_masked': 0,
'rules_applied': {}
}
def mask_text(self, text: str, rules: List[str] = None) -> Dict[str, Any]:
"""对文本进行脱敏处理"""
if rules is None:
rules = list(self.masking_rules.keys())
original_text = text
masked_text = text
applied_rules = []
for rule_name in rules:
if rule_name not in self.masking_rules:
continue
rule = self.masking_rules[rule_name]
pattern = rule['pattern']
replacement = rule['replacement']
matches = re.findall(pattern, masked_text)
if matches:
if callable(replacement):
masked_text = re.sub(pattern, replacement, masked_text)
else:
masked_text = re.sub(pattern, replacement, masked_text)
applied_rules.append({
'rule': rule_name,
'description': rule['description'],
'matches_count': len(matches)
})
# 更新统计
if rule_name not in self.masking_stats['rules_applied']:
self.masking_stats['rules_applied'][rule_name] = 0
self.masking_stats['rules_applied'][rule_name] += len(matches)
self.masking_stats['total_processed'] += 1
if applied_rules:
self.masking_stats['total_masked'] += 1
return {
'original_text': original_text,
'masked_text': masked_text,
'applied_rules': applied_rules,
'is_masked': len(applied_rules) > 0,
'processing_timestamp': datetime.now().isoformat()
}
def mask_json_data(self, data: Dict[str, Any], sensitive_fields: List[str] = None) -> Dict[str, Any]:
"""对JSON数据进行脱敏处理"""
if sensitive_fields is None:
sensitive_fields = ['message', 'log', 'content', 'description']
masked_data = data.copy()
masking_info = []
for field in sensitive_fields:
if field in masked_data and isinstance(masked_data[field], str):
result = self.mask_text(masked_data[field])
masked_data[field] = result['masked_text']
if result['is_masked']:
masking_info.extend(result['applied_rules'])
# 添加脱敏元数据
masked_data['_masking_info'] = {
'is_masked': len(masking_info) > 0,
'applied_rules': masking_info,
'masking_timestamp': datetime.now().isoformat(),
'original_hash': hashlib.sha256(json.dumps(data, sort_keys=True).encode()).hexdigest()[:16]
}
return masked_data
def get_statistics(self) -> Dict[str, Any]:
"""获取脱敏统计信息"""
return {
'masking_stats': self.masking_stats,
'masking_rate': self.masking_stats['total_masked'] / max(self.masking_stats['total_processed'], 1),
'available_rules': list(self.masking_rules.keys()),
'timestamp': datetime.now().isoformat()
}
def export_masked_data(self, data_list: List[Dict], output_file: str):
"""导出脱敏后的数据"""
masked_data_list = []
for data in data_list:
masked_data = self.mask_json_data(data)
masked_data_list.append(masked_data)
with open(output_file, 'w', encoding='utf-8') as f:
json.dump({
'masked_data': masked_data_list,
'statistics': self.get_statistics(),
'export_timestamp': datetime.now().isoformat()
}, f, indent=2, ensure_ascii=False)
print(f"Masked data exported to {output_file}")
print(f"Statistics: {self.get_statistics()}")
# 使用示例
if __name__ == "__main__":
masker = DataMaskingTool()
# 测试文本脱敏
test_text = "用户张三的手机号是13812345678,邮箱是zhangsan@example.com,信用卡号是1234-5678-9012-3456"
result = masker.mask_text(test_text)
print(f"原文: {result['original_text']}")
print(f"脱敏后: {result['masked_text']}")
print(f"应用规则: {result['applied_rules']}")
合规性检查
1. GDPR合规检查
GDPR合规检查脚本:
#!/usr/bin/env python3
# gdpr_compliance_checker.py
import json
import re
from datetime import datetime, timedelta
from typing import Dict, List, Any
from elasticsearch import Elasticsearch
class GDPRComplianceChecker:
def __init__(self, es_host='localhost:9200', es_user=None, es_password=None):
if es_user and es_password:
self.es = Elasticsearch(
[es_host],
http_auth=(es_user, es_password),
verify_certs=True
)
else:
self.es = Elasticsearch([es_host])
self.compliance_rules = {
'data_retention': {
'max_retention_days': 365,
'description': 'Data should not be retained longer than necessary'
},
'personal_data_identification': {
'patterns': [
r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', # Email
r'\b\d{3}-\d{2}-\d{4}\b', # SSN
r'\b1[3-9]\d{9}\b', # Phone
r'\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b' # Credit Card
],
'description': 'Personal data should be identified and protected'
},
'consent_tracking': {
'required_fields': ['user_consent', 'consent_timestamp', 'consent_version'],
'description': 'User consent should be tracked and documented'
},
'data_minimization': {
'max_field_count': 50,
'description': 'Only necessary data should be collected'
}
}
self.compliance_report = {
'check_timestamp': datetime.now().isoformat(),
'violations': [],
'recommendations': [],
'summary': {}
}
def check_data_retention(self, index_pattern='*'):
"""检查数据保留期限"""
violations = []
try:
# 获取所有索引
indices = self.es.cat.indices(index=index_pattern, format='json')
retention_limit = datetime.now() - timedelta(
days=self.compliance_rules['data_retention']['max_retention_days']
)
for index in indices:
index_name = index['index']
# 检查索引中最旧的文档
query = {
'size': 1,
'sort': [{'@timestamp': {'order': 'asc'}}],
'query': {'match_all': {}}
}
result = self.es.search(index=index_name, body=query)
if result['hits']['hits']:
oldest_doc = result['hits']['hits'][0]
doc_timestamp = datetime.fromisoformat(
oldest_doc['_source']['@timestamp'].replace('Z', '+00:00')
)
if doc_timestamp < retention_limit:
violations.append({
'type': 'data_retention_violation',
'index': index_name,
'oldest_document': doc_timestamp.isoformat(),
'retention_limit': retention_limit.isoformat(),
'days_over_limit': (retention_limit - doc_timestamp).days,
'severity': 'high'
})
except Exception as e:
violations.append({
'type': 'data_retention_check_error',
'error': str(e),
'severity': 'medium'
})
return violations
def check_personal_data_protection(self, index_pattern='*', sample_size=100):
"""检查个人数据保护"""
violations = []
try:
query = {
'size': sample_size,
'query': {'match_all': {}}
}
result = self.es.search(index=index_pattern, body=query)
for hit in result['hits']['hits']:
doc = hit['_source']
doc_id = hit['_id']
index_name = hit['_index']
# 检查是否包含个人数据
personal_data_found = []
for field, value in doc.items():
if isinstance(value, str):
for pattern in self.compliance_rules['personal_data_identification']['patterns']:
if re.search(pattern, value):
personal_data_found.append({
'field': field,
'pattern_type': self._get_pattern_type(pattern),
'masked': self._is_data_masked(value)
})
# 检查是否有未脱敏的个人数据
unmasked_data = [item for item in personal_data_found if not item['masked']]
if unmasked_data:
violations.append({
'type': 'unmasked_personal_data',
'index': index_name,
'document_id': doc_id,
'unmasked_fields': unmasked_data,
'severity': 'high'
})
# 检查同意跟踪
consent_fields = self.compliance_rules['consent_tracking']['required_fields']
missing_consent_fields = [field for field in consent_fields if field not in doc]
if missing_consent_fields and personal_data_found:
violations.append({
'type': 'missing_consent_tracking',
'index': index_name,
'document_id': doc_id,
'missing_fields': missing_consent_fields,
'severity': 'medium'
})
except Exception as e:
violations.append({
'type': 'personal_data_check_error',
'error': str(e),
'severity': 'medium'
})
return violations
def check_data_minimization(self, index_pattern='*', sample_size=50):
"""检查数据最小化原则"""
violations = []
try:
query = {
'size': sample_size,
'query': {'match_all': {}}
}
result = self.es.search(index=index_pattern, body=query)
for hit in result['hits']['hits']:
doc = hit['_source']
field_count = len(doc)
if field_count > self.compliance_rules['data_minimization']['max_field_count']:
violations.append({
'type': 'data_minimization_violation',
'index': hit['_index'],
'document_id': hit['_id'],
'field_count': field_count,
'max_allowed': self.compliance_rules['data_minimization']['max_field_count'],
'severity': 'low'
})
except Exception as e:
violations.append({
'type': 'data_minimization_check_error',
'error': str(e),
'severity': 'medium'
})
return violations
def _get_pattern_type(self, pattern):
"""获取模式类型"""
pattern_types = {
r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b': 'email',
r'\b\d{3}-\d{2}-\d{4}\b': 'ssn',
r'\b1[3-9]\d{9}\b': 'phone',
r'\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b': 'credit_card'
}
return pattern_types.get(pattern, 'unknown')
def _is_data_masked(self, value):
"""检查数据是否已脱敏"""
masked_patterns = ['***', '****', 'MASKED', 'REDACTED']
return any(pattern in value for pattern in masked_patterns)
def generate_compliance_report(self, index_pattern='*'):
"""生成合规性报告"""
print("Starting GDPR compliance check...")
# 执行各项检查
retention_violations = self.check_data_retention(index_pattern)
personal_data_violations = self.check_personal_data_protection(index_pattern)
minimization_violations = self.check_data_minimization(index_pattern)
# 汇总违规情况
all_violations = retention_violations + personal_data_violations + minimization_violations
# 生成建议
recommendations = self._generate_recommendations(all_violations)
# 更新报告
self.compliance_report.update({
'violations': all_violations,
'recommendations': recommendations,
'summary': {
'total_violations': len(all_violations),
'high_severity': len([v for v in all_violations if v.get('severity') == 'high']),
'medium_severity': len([v for v in all_violations if v.get('severity') == 'medium']),
'low_severity': len([v for v in all_violations if v.get('severity') == 'low']),
'compliance_score': max(0, 100 - len(all_violations) * 5)
}
})
return self.compliance_report
def _generate_recommendations(self, violations):
"""生成改进建议"""
recommendations = []
violation_types = set(v['type'] for v in violations)
if 'data_retention_violation' in violation_types:
recommendations.append({
'type': 'data_retention',
'priority': 'high',
'action': 'Implement automated data deletion policies',
'description': 'Set up Index Lifecycle Management (ILM) to automatically delete old data'
})
if 'unmasked_personal_data' in violation_types:
recommendations.append({
'type': 'data_masking',
'priority': 'high',
'action': 'Implement data masking in Logstash pipelines',
'description': 'Configure Logstash filters to mask personal data before indexing'
})
if 'missing_consent_tracking' in violation_types:
recommendations.append({
'type': 'consent_management',
'priority': 'medium',
'action': 'Implement consent tracking system',
'description': 'Add consent metadata to all documents containing personal data'
})
if 'data_minimization_violation' in violation_types:
recommendations.append({
'type': 'data_minimization',
'priority': 'low',
'action': 'Review data collection practices',
'description': 'Reduce the number of fields collected to only what is necessary'
})
return recommendations
def export_report(self, output_file='gdpr_compliance_report.json'):
"""导出合规性报告"""
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(self.compliance_report, f, indent=2, ensure_ascii=False)
print(f"GDPR compliance report exported to {output_file}")
print(f"Compliance Score: {self.compliance_report['summary']['compliance_score']}/100")
print(f"Total Violations: {self.compliance_report['summary']['total_violations']}")
# 使用示例
if __name__ == "__main__":
checker = GDPRComplianceChecker(
es_host='localhost:9200',
es_user='elastic',
es_password='your_password'
)
report = checker.generate_compliance_report('logs-*')
checker.export_report()
总结
本章详细介绍了ELK Stack的安全配置与权限管理,涵盖了以下核心内容:
🔐 核心安全功能
- X-Pack Security配置:SSL/TLS加密、用户认证、角色权限
- 用户和角色管理:细粒度权限控制、API密钥管理
- 网络安全:防火墙配置、端口安全、连接监控
- 数据加密:传输加密、静态数据加密、密钥管理
🛡️ 数据保护策略
- 数据脱敏:敏感信息自动识别和脱敏处理
- 访问控制:基于角色的访问控制(RBAC)
- 审计日志:完整的操作审计和安全事件记录
- 合规性检查:GDPR等法规的自动化合规检查
📊 监控和告警
- 安全监控:实时安全威胁检测和响应
- 性能监控:安全配置对性能的影响监控
- 告警机制:多渠道安全告警和通知
🎯 技术亮点
- 自动化脚本:Python和Shell脚本实现安全配置自动化
- 最佳实践:企业级安全配置模板和规范
- 实战案例:真实场景下的安全配置示例
💡 实践价值
- 企业级安全:满足企业级安全要求和合规需求
- 运维效率:自动化安全配置和监控,提升运维效率
- 风险控制:有效识别和控制安全风险
- 合规保障:确保数据处理符合相关法规要求
通过本章的学习,您将能够: - 配置完整的ELK Stack安全体系 - 实现细粒度的用户权限管理 - 建立有效的数据保护机制 - 满足企业级安全和合规要求
下一章我们将学习ELK Stack的最佳实践与案例分析,通过实际案例深入了解ELK Stack在不同场景下的应用和优化策略。 “`