概述

在企业级环境中,ELK Stack的安全性至关重要。本章将详细介绍如何配置Elasticsearch、Kibana和Logstash的安全功能,包括身份认证、授权管理、网络安全、数据加密等方面,确保日志分析系统的安全性和合规性。

Elasticsearch安全配置

1. 启用X-Pack Security

基础安全配置

elasticsearch.yml安全配置:

# 集群配置
cluster.name: secure-elk-cluster
node.name: es-node-1
network.host: 0.0.0.0
http.port: 9200
transport.port: 9300

# 安全配置
xpack.security.enabled: true
xpack.security.enrollment.enabled: true

# 传输层安全
xpack.security.transport.ssl.enabled: true
xpack.security.transport.ssl.verification_mode: certificate
xpack.security.transport.ssl.client_authentication: required
xpack.security.transport.ssl.keystore.path: certs/elastic-certificates.p12
xpack.security.transport.ssl.truststore.path: certs/elastic-certificates.p12

# HTTP层安全
xpack.security.http.ssl.enabled: true
xpack.security.http.ssl.keystore.path: certs/elastic-certificates.p12
xpack.security.http.ssl.truststore.path: certs/elastic-certificates.p12

# 审计日志
xpack.security.audit.enabled: true
xpack.security.audit.logfile.events.include:
  - access_denied
  - access_granted
  - anonymous_access_denied
  - authentication_failed
  - connection_denied
  - tampered_request
  - run_as_denied
  - run_as_granted

# 密码策略
xpack.security.authc.password_hashing.algorithm: bcrypt

# API密钥设置
xpack.security.authc.api_key.enabled: true
xpack.security.authc.api_key.hashing.algorithm: pbkdf2_stretch

# 令牌服务
xpack.security.authc.token.enabled: true
xpack.security.authc.token.timeout: 20m

# 会话设置
xpack.security.session_timeout: 8h

SSL证书生成和配置

证书生成脚本:

#!/bin/bash
# generate-certificates.sh

CERT_DIR="/etc/elasticsearch/certs"
INSTANCE_FILE="instances.yml"

# 创建证书目录
mkdir -p "$CERT_DIR"
cd "$CERT_DIR"

# 创建实例配置文件
cat > "$INSTANCE_FILE" << EOF
instances:
  - name: es-node-1
    dns:
      - es-node-1.local
      - localhost
    ip:
      - 127.0.0.1
      - 192.168.1.10
  - name: es-node-2
    dns:
      - es-node-2.local
    ip:
      - 192.168.1.11
  - name: es-node-3
    dns:
      - es-node-3.local
    ip:
      - 192.168.1.12
  - name: kibana
    dns:
      - kibana.local
      - localhost
    ip:
      - 127.0.0.1
      - 192.168.1.20
  - name: logstash
    dns:
      - logstash.local
    ip:
      - 192.168.1.30
EOF

# 生成CA证书
echo "Generating CA certificate..."
/usr/share/elasticsearch/bin/elasticsearch-certutil ca --out elastic-stack-ca.p12 --pass ""

# 生成节点证书
echo "Generating node certificates..."
/usr/share/elasticsearch/bin/elasticsearch-certutil cert \
  --ca elastic-stack-ca.p12 \
  --ca-pass "" \
  --out elastic-certificates.p12 \
  --pass "" \
  --in "$INSTANCE_FILE"

# 生成HTTP证书
echo "Generating HTTP certificates..."
/usr/share/elasticsearch/bin/elasticsearch-certutil http

# 设置权限
chown -R elasticsearch:elasticsearch "$CERT_DIR"
chmod 660 "$CERT_DIR"/*.p12

echo "Certificate generation completed."
echo "Certificates location: $CERT_DIR"
echo "Please distribute certificates to all nodes."

# 验证证书
echo "\nVerifying certificates..."
openssl pkcs12 -in elastic-certificates.p12 -nodes -passin pass: | openssl x509 -noout -text | grep -E '(Subject:|DNS:|IP Address:)'

2. 用户和角色管理

内置角色配置

角色定义脚本:

#!/bin/bash
# setup-security.sh

ES_HOST="https://localhost:9200"
ES_USER="elastic"
ES_PASS="your_elastic_password"

# 创建自定义角色
echo "Creating custom roles..."

# 日志分析师角色
curl -X POST "$ES_HOST/_security/role/log_analyst" \
  -u "$ES_USER:$ES_PASS" \
  -H "Content-Type: application/json" \
  -d '{
    "cluster": ["monitor"],
    "indices": [
      {
        "names": ["logs-*", "metrics-*"],
        "privileges": ["read", "view_index_metadata"]
      }
    ],
    "applications": [
      {
        "application": "kibana-.kibana",
        "privileges": ["read"],
        "resources": ["*"]
      }
    ]
  }'

# 开发者角色
curl -X POST "$ES_HOST/_security/role/developer" \
  -u "$ES_USER:$ES_PASS" \
  -H "Content-Type: application/json" \
  -d '{
    "cluster": ["monitor", "manage_index_templates"],
    "indices": [
      {
        "names": ["dev-*", "test-*"],
        "privileges": ["all"]
      },
      {
        "names": ["logs-*"],
        "privileges": ["read", "view_index_metadata"]
      }
    ],
    "applications": [
      {
        "application": "kibana-.kibana",
        "privileges": ["all"],
        "resources": ["space:dev"]
      }
    ]
  }'

# 运维角色
curl -X POST "$ES_HOST/_security/role/ops_admin" \
  -u "$ES_USER:$ES_PASS" \
  -H "Content-Type: application/json" \
  -d '{
    "cluster": [
      "monitor", 
      "manage_index_templates", 
      "manage_ingest_pipelines",
      "manage_ilm",
      "manage_slm"
    ],
    "indices": [
      {
        "names": ["*"],
        "privileges": ["all"]
      }
    ],
    "applications": [
      {
        "application": "kibana-.kibana",
        "privileges": ["all"],
        "resources": ["*"]
      }
    ]
  }'

# 只读角色
curl -X POST "$ES_HOST/_security/role/readonly" \
  -u "$ES_USER:$ES_PASS" \
  -H "Content-Type: application/json" \
  -d '{
    "cluster": ["monitor"],
    "indices": [
      {
        "names": ["logs-*"],
        "privileges": ["read"]
      }
    ],
    "applications": [
      {
        "application": "kibana-.kibana",
        "privileges": ["read"],
        "resources": ["*"]
      }
    ]
  }'

echo "Custom roles created successfully."

用户管理

用户创建和管理脚本:

# user_management.py
import requests
import json
import getpass
from urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)

class ElasticsearchUserManager:
    def __init__(self, es_host, admin_user, admin_password):
        self.es_host = es_host
        self.auth = (admin_user, admin_password)
        self.session = requests.Session()
        self.session.verify = False  # 仅用于测试环境
    
    def create_user(self, username, password, email, full_name, roles):
        """创建用户"""
        user_data = {
            "password": password,
            "email": email,
            "full_name": full_name,
            "roles": roles,
            "enabled": True
        }
        
        try:
            response = self.session.post(
                f"{self.es_host}/_security/user/{username}",
                auth=self.auth,
                json=user_data
            )
            
            if response.status_code == 200:
                print(f"User '{username}' created successfully.")
                return True
            else:
                print(f"Failed to create user '{username}': {response.text}")
                return False
        except Exception as e:
            print(f"Error creating user '{username}': {e}")
            return False
    
    def update_user(self, username, **kwargs):
        """更新用户信息"""
        try:
            response = self.session.post(
                f"{self.es_host}/_security/user/{username}",
                auth=self.auth,
                json=kwargs
            )
            
            if response.status_code == 200:
                print(f"User '{username}' updated successfully.")
                return True
            else:
                print(f"Failed to update user '{username}': {response.text}")
                return False
        except Exception as e:
            print(f"Error updating user '{username}': {e}")
            return False
    
    def delete_user(self, username):
        """删除用户"""
        try:
            response = self.session.delete(
                f"{self.es_host}/_security/user/{username}",
                auth=self.auth
            )
            
            if response.status_code == 200:
                print(f"User '{username}' deleted successfully.")
                return True
            else:
                print(f"Failed to delete user '{username}': {response.text}")
                return False
        except Exception as e:
            print(f"Error deleting user '{username}': {e}")
            return False
    
    def list_users(self):
        """列出所有用户"""
        try:
            response = self.session.get(
                f"{self.es_host}/_security/user",
                auth=self.auth
            )
            
            if response.status_code == 200:
                users = response.json()
                print("\n=== Elasticsearch Users ===")
                for username, user_info in users.items():
                    print(f"Username: {username}")
                    print(f"  Full Name: {user_info.get('full_name', 'N/A')}")
                    print(f"  Email: {user_info.get('email', 'N/A')}")
                    print(f"  Roles: {', '.join(user_info.get('roles', []))}")
                    print(f"  Enabled: {user_info.get('enabled', False)}")
                    print()
                return users
            else:
                print(f"Failed to list users: {response.text}")
                return None
        except Exception as e:
            print(f"Error listing users: {e}")
            return None
    
    def change_password(self, username, new_password):
        """修改用户密码"""
        try:
            response = self.session.post(
                f"{self.es_host}/_security/user/{username}/_password",
                auth=self.auth,
                json={"password": new_password}
            )
            
            if response.status_code == 200:
                print(f"Password for user '{username}' changed successfully.")
                return True
            else:
                print(f"Failed to change password for user '{username}': {response.text}")
                return False
        except Exception as e:
            print(f"Error changing password for user '{username}': {e}")
            return False
    
    def enable_user(self, username):
        """启用用户"""
        return self.update_user(username, enabled=True)
    
    def disable_user(self, username):
        """禁用用户"""
        return self.update_user(username, enabled=False)
    
    def get_user_privileges(self, username):
        """获取用户权限"""
        try:
            response = self.session.get(
                f"{self.es_host}/_security/user/{username}/_privileges",
                auth=self.auth
            )
            
            if response.status_code == 200:
                privileges = response.json()
                print(f"\n=== Privileges for user '{username}' ===")
                print(json.dumps(privileges, indent=2))
                return privileges
            else:
                print(f"Failed to get privileges for user '{username}': {response.text}")
                return None
        except Exception as e:
            print(f"Error getting privileges for user '{username}': {e}")
            return None
    
    def bulk_create_users(self, users_config):
        """批量创建用户"""
        success_count = 0
        total_count = len(users_config)
        
        for user_config in users_config:
            if self.create_user(**user_config):
                success_count += 1
        
        print(f"\nBulk user creation completed: {success_count}/{total_count} users created successfully.")
        return success_count == total_count

# 使用示例
if __name__ == "__main__":
    # 初始化用户管理器
    user_manager = ElasticsearchUserManager(
        es_host="https://localhost:9200",
        admin_user="elastic",
        admin_password="your_elastic_password"
    )
    
    # 批量创建用户
    users_to_create = [
        {
            "username": "log_analyst_1",
            "password": "SecurePass123!",
            "email": "analyst1@company.com",
            "full_name": "Log Analyst 1",
            "roles": ["log_analyst"]
        },
        {
            "username": "developer_1",
            "password": "DevPass123!",
            "email": "dev1@company.com",
            "full_name": "Developer 1",
            "roles": ["developer"]
        },
        {
            "username": "ops_admin_1",
            "password": "OpsPass123!",
            "email": "ops1@company.com",
            "full_name": "Operations Admin 1",
            "roles": ["ops_admin"]
        },
        {
            "username": "readonly_user",
            "password": "ReadPass123!",
            "email": "readonly@company.com",
            "full_name": "Read Only User",
            "roles": ["readonly"]
        }
    ]
    
    # 执行批量创建
    user_manager.bulk_create_users(users_to_create)
    
    # 列出所有用户
    user_manager.list_users()

3. API密钥管理

API密钥创建和管理

API密钥管理脚本:

# api_key_management.py
import requests
import json
import base64
from datetime import datetime, timedelta
from urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)

class ElasticsearchAPIKeyManager:
    def __init__(self, es_host, username, password):
        self.es_host = es_host
        self.auth = (username, password)
        self.session = requests.Session()
        self.session.verify = False
    
    def create_api_key(self, name, role_descriptors=None, expiration=None, metadata=None):
        """创建API密钥"""
        api_key_data = {
            "name": name
        }
        
        if role_descriptors:
            api_key_data["role_descriptors"] = role_descriptors
        
        if expiration:
            api_key_data["expiration"] = expiration
        
        if metadata:
            api_key_data["metadata"] = metadata
        
        try:
            response = self.session.post(
                f"{self.es_host}/_security/api_key",
                auth=self.auth,
                json=api_key_data
            )
            
            if response.status_code == 200:
                result = response.json()
                api_key_credentials = base64.b64encode(
                    f"{result['id']}:{result['api_key']}".encode()
                ).decode()
                
                print(f"API Key '{name}' created successfully.")
                print(f"ID: {result['id']}")
                print(f"API Key: {result['api_key']}")
                print(f"Encoded: {api_key_credentials}")
                
                return {
                    "id": result['id'],
                    "api_key": result['api_key'],
                    "encoded": api_key_credentials
                }
            else:
                print(f"Failed to create API key '{name}': {response.text}")
                return None
        except Exception as e:
            print(f"Error creating API key '{name}': {e}")
            return None
    
    def list_api_keys(self, username=None):
        """列出API密钥"""
        url = f"{self.es_host}/_security/api_key"
        if username:
            url += f"?username={username}"
        
        try:
            response = self.session.get(url, auth=self.auth)
            
            if response.status_code == 200:
                api_keys = response.json()
                print("\n=== API Keys ===")
                for key in api_keys.get('api_keys', []):
                    print(f"ID: {key['id']}")
                    print(f"Name: {key['name']}")
                    print(f"Username: {key['username']}")
                    print(f"Creation: {key['creation']}")
                    if 'expiration' in key:
                        print(f"Expiration: {key['expiration']}")
                    print(f"Invalidated: {key['invalidated']}")
                    print()
                return api_keys
            else:
                print(f"Failed to list API keys: {response.text}")
                return None
        except Exception as e:
            print(f"Error listing API keys: {e}")
            return None
    
    def invalidate_api_key(self, key_ids=None, key_names=None, username=None):
        """使API密钥失效"""
        invalidate_data = {}
        
        if key_ids:
            invalidate_data["ids"] = key_ids if isinstance(key_ids, list) else [key_ids]
        
        if key_names:
            invalidate_data["names"] = key_names if isinstance(key_names, list) else [key_names]
        
        if username:
            invalidate_data["username"] = username
        
        try:
            response = self.session.delete(
                f"{self.es_host}/_security/api_key",
                auth=self.auth,
                json=invalidate_data
            )
            
            if response.status_code == 200:
                result = response.json()
                print(f"Invalidated {len(result['invalidated_api_keys'])} API keys.")
                for key_id in result['invalidated_api_keys']:
                    print(f"  - {key_id}")
                
                if result['error_details']:
                    print("Errors:")
                    for error in result['error_details']:
                        print(f"  - {error}")
                
                return result
            else:
                print(f"Failed to invalidate API keys: {response.text}")
                return None
        except Exception as e:
            print(f"Error invalidating API keys: {e}")
            return None
    
    def get_api_key_info(self, api_key_id):
        """获取API密钥信息"""
        try:
            response = self.session.get(
                f"{self.es_host}/_security/api_key?id={api_key_id}",
                auth=self.auth
            )
            
            if response.status_code == 200:
                result = response.json()
                if result['api_keys']:
                    key_info = result['api_keys'][0]
                    print(f"\n=== API Key Info ===")
                    print(f"ID: {key_info['id']}")
                    print(f"Name: {key_info['name']}")
                    print(f"Username: {key_info['username']}")
                    print(f"Creation: {key_info['creation']}")
                    if 'expiration' in key_info:
                        print(f"Expiration: {key_info['expiration']}")
                    print(f"Invalidated: {key_info['invalidated']}")
                    if 'metadata' in key_info:
                        print(f"Metadata: {json.dumps(key_info['metadata'], indent=2)}")
                    return key_info
                else:
                    print(f"API key '{api_key_id}' not found.")
                    return None
            else:
                print(f"Failed to get API key info: {response.text}")
                return None
        except Exception as e:
            print(f"Error getting API key info: {e}")
            return None
    
    def create_service_api_keys(self):
        """创建服务专用API密钥"""
        service_keys = []
        
        # Logstash API密钥
        logstash_role = {
            "logstash_writer": {
                "cluster": ["monitor", "manage_index_templates", "manage_ingest_pipelines"],
                "indices": [
                    {
                        "names": ["logs-*", "metrics-*"],
                        "privileges": ["create_index", "write", "create"]
                    }
                ]
            }
        }
        
        logstash_key = self.create_api_key(
            name="logstash-service",
            role_descriptors=logstash_role,
            expiration="365d",
            metadata={"service": "logstash", "environment": "production"}
        )
        
        if logstash_key:
            service_keys.append({"service": "logstash", "key": logstash_key})
        
        # Beats API密钥
        beats_role = {
            "beats_writer": {
                "cluster": ["monitor", "manage_ingest_pipelines"],
                "indices": [
                    {
                        "names": ["logs-*", "metrics-*", "filebeat-*", "metricbeat-*"],
                        "privileges": ["create_index", "write", "create"]
                    }
                ]
            }
        }
        
        beats_key = self.create_api_key(
            name="beats-service",
            role_descriptors=beats_role,
            expiration="365d",
            metadata={"service": "beats", "environment": "production"}
        )
        
        if beats_key:
            service_keys.append({"service": "beats", "key": beats_key})
        
        # 监控API密钥
        monitoring_role = {
            "monitoring_reader": {
                "cluster": ["monitor"],
                "indices": [
                    {
                        "names": ["*"],
                        "privileges": ["read", "view_index_metadata"]
                    }
                ]
            }
        }
        
        monitoring_key = self.create_api_key(
            name="monitoring-service",
            role_descriptors=monitoring_role,
            expiration="90d",
            metadata={"service": "monitoring", "environment": "production"}
        )
        
        if monitoring_key:
            service_keys.append({"service": "monitoring", "key": monitoring_key})
        
        return service_keys
    
    def rotate_api_keys(self, service_name):
        """轮换API密钥"""
        print(f"Rotating API keys for service: {service_name}")
        
        # 获取现有密钥
        existing_keys = self.list_api_keys()
        service_keys = []
        
        if existing_keys:
            for key in existing_keys.get('api_keys', []):
                if key['name'].startswith(service_name):
                    service_keys.append(key)
        
        # 创建新密钥
        new_key = self.create_api_key(
            name=f"{service_name}-{datetime.now().strftime('%Y%m%d')}",
            expiration="365d",
            metadata={"service": service_name, "rotation_date": datetime.now().isoformat()}
        )
        
        if new_key:
            print(f"New API key created for {service_name}")
            
            # 可选:使旧密钥失效(建议在确认新密钥工作正常后执行)
            # old_key_ids = [key['id'] for key in service_keys]
            # if old_key_ids:
            #     self.invalidate_api_key(key_ids=old_key_ids)
            
            return new_key
        
        return None

# 使用示例
if __name__ == "__main__":
    # 初始化API密钥管理器
    api_manager = ElasticsearchAPIKeyManager(
        es_host="https://localhost:9200",
        username="elastic",
        password="your_elastic_password"
    )
    
    # 创建服务API密钥
    service_keys = api_manager.create_service_api_keys()
    
    # 列出所有API密钥
    api_manager.list_api_keys()
    
    # 保存API密钥到配置文件
    with open('api_keys.json', 'w') as f:
        json.dump(service_keys, f, indent=2)
    
    print("\nAPI keys saved to api_keys.json")

Kibana安全配置

1. Kibana安全设置

kibana.yml安全配置:

# 服务器配置
server.host: "0.0.0.0"
server.port: 5601
server.name: "kibana-secure"

# Elasticsearch连接
elasticsearch.hosts: ["https://es-node-1:9200", "https://es-node-2:9200"]
elasticsearch.username: "kibana_system"
elasticsearch.password: "kibana_system_password"

# SSL配置
server.ssl.enabled: true
server.ssl.certificate: "/etc/kibana/certs/kibana.crt"
server.ssl.key: "/etc/kibana/certs/kibana.key"

# Elasticsearch SSL
elasticsearch.ssl.certificateAuthorities: ["/etc/kibana/certs/ca.crt"]
elasticsearch.ssl.verificationMode: certificate

# 安全配置
xpack.security.enabled: true
xpack.security.encryptionKey: "your-32-character-encryption-key-here"
xpack.security.session.idleTimeout: "8h"
xpack.security.session.lifespan: "24h"

# 审计日志
xpack.security.audit.enabled: true
xpack.security.audit.appender.type: file
xpack.security.audit.appender.fileName: /var/log/kibana/kibana_audit.log
xpack.security.audit.appender.layout.type: json

# 空间配置
xpack.spaces.enabled: true
xpack.spaces.maxSpaces: 1000

# 报告配置
xpack.reporting.enabled: true
xpack.reporting.encryptionKey: "your-32-character-reporting-key-here"
xpack.reporting.kibanaServer.hostname: "kibana.local"
xpack.reporting.kibanaServer.port: 5601
xpack.reporting.kibanaServer.protocol: "https"

# 监控配置
xpack.monitoring.enabled: true
xpack.monitoring.kibana.collection.enabled: true
xpack.monitoring.ui.container.elasticsearch.enabled: true

# 告警配置
xpack.actions.enabled: true
xpack.alerting.enabled: true
xpack.alerting.healthCheck.interval: "60s"

# 机器学习
xpack.ml.enabled: true

# 图形配置
xpack.graph.enabled: true

# 日志配置
logging.appenders.file.type: file
logging.appenders.file.fileName: /var/log/kibana/kibana.log
logging.appenders.file.layout.type: json
logging.root.level: info
logging.root.appenders: [file]

# 安全头配置
server.securityResponseHeaders.strictTransportSecurity: "max-age=31536000; includeSubDomains"
server.securityResponseHeaders.xContentTypeOptions: "nosniff"
server.securityResponseHeaders.xFrameOptions: "DENY"
server.securityResponseHeaders.contentSecurityPolicy: "default-src 'self'; script-src 'self' 'unsafe-eval'; style-src 'self' 'unsafe-inline'; img-src 'self' data: blob:; font-src 'self' data:; connect-src 'self';"

2. 空间和权限管理

空间管理脚本:

# kibana_space_management.py
import requests
import json
from urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)

class KibanaSpaceManager:
    def __init__(self, kibana_host, username, password):
        self.kibana_host = kibana_host
        self.auth = (username, password)
        self.session = requests.Session()
        self.session.verify = False
        self.session.headers.update({
            'Content-Type': 'application/json',
            'kbn-xsrf': 'true'
        })
    
    def create_space(self, space_id, name, description=None, color=None, initials=None, disabled_features=None):
        """创建空间"""
        space_data = {
            "id": space_id,
            "name": name
        }
        
        if description:
            space_data["description"] = description
        
        if color:
            space_data["color"] = color
        
        if initials:
            space_data["initials"] = initials
        
        if disabled_features:
            space_data["disabledFeatures"] = disabled_features
        
        try:
            response = self.session.post(
                f"{self.kibana_host}/api/spaces/space",
                auth=self.auth,
                json=space_data
            )
            
            if response.status_code == 200:
                print(f"Space '{name}' created successfully.")
                return response.json()
            else:
                print(f"Failed to create space '{name}': {response.text}")
                return None
        except Exception as e:
            print(f"Error creating space '{name}': {e}")
            return None
    
    def list_spaces(self):
        """列出所有空间"""
        try:
            response = self.session.get(
                f"{self.kibana_host}/api/spaces/space",
                auth=self.auth
            )
            
            if response.status_code == 200:
                spaces = response.json()
                print("\n=== Kibana Spaces ===")
                for space in spaces:
                    print(f"ID: {space['id']}")
                    print(f"Name: {space['name']}")
                    print(f"Description: {space.get('description', 'N/A')}")
                    print(f"Color: {space.get('color', 'N/A')}")
                    print(f"Disabled Features: {space.get('disabledFeatures', [])}")
                    print()
                return spaces
            else:
                print(f"Failed to list spaces: {response.text}")
                return None
        except Exception as e:
            print(f"Error listing spaces: {e}")
            return None
    
    def update_space(self, space_id, **kwargs):
        """更新空间"""
        try:
            response = self.session.put(
                f"{self.kibana_host}/api/spaces/space/{space_id}",
                auth=self.auth,
                json=kwargs
            )
            
            if response.status_code == 200:
                print(f"Space '{space_id}' updated successfully.")
                return response.json()
            else:
                print(f"Failed to update space '{space_id}': {response.text}")
                return None
        except Exception as e:
            print(f"Error updating space '{space_id}': {e}")
            return None
    
    def delete_space(self, space_id):
        """删除空间"""
        try:
            response = self.session.delete(
                f"{self.kibana_host}/api/spaces/space/{space_id}",
                auth=self.auth
            )
            
            if response.status_code == 204:
                print(f"Space '{space_id}' deleted successfully.")
                return True
            else:
                print(f"Failed to delete space '{space_id}': {response.text}")
                return False
        except Exception as e:
            print(f"Error deleting space '{space_id}': {e}")
            return False
    
    def copy_saved_objects(self, source_space, target_space, object_types=None, include_references=True):
        """复制保存的对象到其他空间"""
        copy_data = {
            "spaces": [target_space],
            "includeReferences": include_references
        }
        
        if object_types:
            copy_data["objects"] = [
                {"type": obj_type, "id": "*"} for obj_type in object_types
            ]
        
        try:
            response = self.session.post(
                f"{self.kibana_host}/s/{source_space}/api/spaces/_copy_saved_objects",
                auth=self.auth,
                json=copy_data
            )
            
            if response.status_code == 200:
                result = response.json()
                print(f"Copied saved objects from '{source_space}' to '{target_space}'.")
                print(f"Success: {result.get('successCount', 0)} objects")
                if result.get('errors'):
                    print(f"Errors: {len(result['errors'])} objects")
                return result
            else:
                print(f"Failed to copy saved objects: {response.text}")
                return None
        except Exception as e:
            print(f"Error copying saved objects: {e}")
            return None
    
    def setup_department_spaces(self):
        """设置部门空间"""
        departments = [
            {
                "id": "development",
                "name": "Development",
                "description": "Development team workspace",
                "color": "#1BA9F5",
                "initials": "DEV",
                "disabled_features": ["ml", "graph"]
            },
            {
                "id": "operations",
                "name": "Operations",
                "description": "Operations team workspace",
                "color": "#FF6B47",
                "initials": "OPS",
                "disabled_features": []
            },
            {
                "id": "security",
                "name": "Security",
                "description": "Security team workspace",
                "color": "#E74C3C",
                "initials": "SEC",
                "disabled_features": ["graph"]
            },
            {
                "id": "analytics",
                "name": "Analytics",
                "description": "Data analytics workspace",
                "color": "#9C27B0",
                "initials": "ANA",
                "disabled_features": []
            }
        ]
        
        created_spaces = []
        for dept in departments:
            space = self.create_space(**dept)
            if space:
                created_spaces.append(space)
        
        return created_spaces

# 使用示例
if __name__ == "__main__":
    # 初始化空间管理器
    space_manager = KibanaSpaceManager(
        kibana_host="https://localhost:5601",
        username="elastic",
        password="your_elastic_password"
    )
    
    # 设置部门空间
    spaces = space_manager.setup_department_spaces()
    
    # 列出所有空间
    space_manager.list_spaces()

Logstash安全配置

1. Logstash安全设置

logstash.yml安全配置:

# 节点配置
node.name: logstash-secure
path.data: /var/lib/logstash
path.logs: /var/log/logstash
path.settings: /etc/logstash

# 管道配置
pipeline.workers: 4
pipeline.batch.size: 1000
pipeline.batch.delay: 50

# 监控配置
xpack.monitoring.enabled: true
xpack.monitoring.elasticsearch.hosts: ["https://es-node-1:9200", "https://es-node-2:9200"]
xpack.monitoring.elasticsearch.username: "logstash_system"
xpack.monitoring.elasticsearch.password: "logstash_system_password"
xpack.monitoring.elasticsearch.ssl.certificate_authority: "/etc/logstash/certs/ca.crt"
xpack.monitoring.elasticsearch.ssl.verification_mode: certificate

# 管理API配置
api.enabled: true
api.host: "127.0.0.1"
api.port: 9600
api.ssl.enabled: true
api.ssl.certificate: "/etc/logstash/certs/logstash.crt"
api.ssl.key: "/etc/logstash/certs/logstash.key"
api.auth.type: basic
api.auth.basic.username: "logstash_admin"
api.auth.basic.password: "logstash_admin_password"

# 日志配置
log.level: info
log.format: json
path.logs: /var/log/logstash

# 安全配置
config.reload.automatic: true
config.reload.interval: 3s
config.test_and_exit: false
config.string: ""

# 死信队列
dead_letter_queue.enable: true
dead_letter_queue.max_bytes: 1gb

# 持久队列
queue.type: persisted
queue.max_bytes: 4gb
queue.checkpoint.writes: 1024

2. 安全管道配置

安全的Logstash管道配置:

# secure-pipeline.conf
input {
  beats {
    port => 5044
    ssl => true
    ssl_certificate => "/etc/logstash/certs/logstash.crt"
    ssl_key => "/etc/logstash/certs/logstash.key"
    ssl_certificate_authorities => ["/etc/logstash/certs/ca.crt"]
    ssl_verify_mode => "force_peer"
    ssl_peer_metadata => true
  }
  
  http {
    port => 8080
    ssl => true
    ssl_certificate => "/etc/logstash/certs/logstash.crt"
    ssl_key => "/etc/logstash/certs/logstash.key"
    ssl_verify_mode => "none"
    
    # 基础认证
    user => "logstash_http"
    password => "secure_password"
    
    # 限制访问
    host => "127.0.0.1"
    
    # 添加安全头
    additional_codecs => {
      "application/json" => "json"
    }
  }
  
  kafka {
    bootstrap_servers => "kafka1:9093,kafka2:9093,kafka3:9093"
    topics => ["secure-logs"]
    group_id => "logstash-secure-group"
    
    # SSL配置
    security_protocol => "SSL"
    ssl_truststore_location => "/etc/logstash/certs/kafka.truststore.jks"
    ssl_truststore_password => "truststore_password"
    ssl_keystore_location => "/etc/logstash/certs/kafka.keystore.jks"
    ssl_keystore_password => "keystore_password"
    ssl_key_password => "key_password"
    
    # SASL配置(可选)
    # sasl_mechanism => "PLAIN"
    # sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='logstash' password='logstash_password';"
  }
}

filter {
  # 数据清理和验证
  if ![host] or [host] == "" {
    mutate {
      add_field => { "parse_error" => "missing_host" }
      add_tag => ["_validation_error"]
    }
  }
  
  # 敏感数据脱敏
  mutate {
    # 移除或脱敏敏感字段
    remove_field => ["password", "credit_card", "ssn"]
    
    # IP地址脱敏
    gsub => [
      "client_ip", "\.(\d{1,3})$", ".xxx"
    ]
  }
  
  # 数据验证
  if [message] {
    # 检查消息长度
    if [message] =~ /.{10000,}/ {
      mutate {
        add_tag => ["_oversized_message"]
        replace => { "message" => "[OVERSIZED MESSAGE TRUNCATED]" }
      }
    }
    
    # 检查恶意内容
    if [message] =~ /(?i)(script|javascript|vbscript|onload|onerror)/ {
      mutate {
        add_tag => ["_potential_xss"]
        add_field => { "security_alert" => "potential_xss_detected" }
      }
    }
  }
  
  # 添加安全元数据
  mutate {
    add_field => {
      "[@metadata][security_processed]" => "true"
      "[@metadata][processing_timestamp]" => "%{+YYYY-MM-dd'T'HH:mm:ss.SSSZ}"
    }
  }
  
  # 错误处理
  if "_grokparsefailure" in [tags] {
    mutate {
      add_field => { "parse_error" => "grok_failure" }
      add_tag => ["_parse_error"]
    }
  }
}

output {
  # 正常日志输出
  if "_validation_error" not in [tags] and "_parse_error" not in [tags] {
    elasticsearch {
      hosts => ["https://es-node-1:9200", "https://es-node-2:9200", "https://es-node-3:9200"]
      
      # 认证配置
      user => "logstash_writer"
      password => "logstash_writer_password"
      
      # SSL配置
      ssl => true
      cacert => "/etc/logstash/certs/ca.crt"
      ssl_certificate_verification => true
      
      # 索引配置
      index => "secure-logs-%{+YYYY.MM.dd}"
      
      # 性能配置
      flush_size => 1000
      idle_flush_time => 5
      
      # 重试配置
      retry_max_interval => 5
      retry_max_times => 3
      
      # 模板配置
      template_name => "secure-logs"
      template_pattern => "secure-logs-*"
      template => "/etc/logstash/templates/secure-logs-template.json"
      template_overwrite => true
    }
  }
  
  # 错误日志单独处理
  if "_validation_error" in [tags] or "_parse_error" in [tags] {
    elasticsearch {
      hosts => ["https://es-node-1:9200", "https://es-node-2:9200", "https://es-node-3:9200"]
      user => "logstash_writer"
      password => "logstash_writer_password"
      ssl => true
      cacert => "/etc/logstash/certs/ca.crt"
      index => "logstash-errors-%{+YYYY.MM.dd}"
    }
  }
  
  # 安全告警
  if "_potential_xss" in [tags] {
    http {
      url => "https://security-webhook.company.com/alerts"
      http_method => "post"
      headers => {
        "Authorization" => "Bearer your_webhook_token"
        "Content-Type" => "application/json"
      }
      format => "json"
      mapping => {
        "alert_type" => "potential_xss"
        "timestamp" => "%{@timestamp}"
        "host" => "%{host}"
        "message" => "%{message}"
      }
    }
  }
  
  # 调试输出(仅开发环境)
  if [@metadata][debug] == "true" {
    stdout {
      codec => rubydebug {
        metadata => true
      }
    }
  }
}

网络安全配置

1. 防火墙配置

防火墙规则脚本:

#!/bin/bash
# elk-firewall-setup.sh

echo "Setting up ELK Stack firewall rules..."

# 清除现有规则
iptables -F
iptables -X
iptables -t nat -F
iptables -t nat -X
iptables -t mangle -F
iptables -t mangle -X

# 设置默认策略
iptables -P INPUT DROP
iptables -P FORWARD DROP
iptables -P OUTPUT ACCEPT

# 允许本地回环
iptables -A INPUT -i lo -j ACCEPT
iptables -A OUTPUT -o lo -j ACCEPT

# 允许已建立的连接
iptables -A INPUT -m state --state ESTABLISHED,RELATED -j ACCEPT

# SSH访问(限制源IP)
iptables -A INPUT -p tcp --dport 22 -s 192.168.1.0/24 -j ACCEPT
iptables -A INPUT -p tcp --dport 22 -s 10.0.0.0/8 -j ACCEPT

# Elasticsearch集群通信(仅内部网络)
iptables -A INPUT -p tcp --dport 9200 -s 192.168.1.0/24 -j ACCEPT
iptables -A INPUT -p tcp --dport 9300 -s 192.168.1.0/24 -j ACCEPT

# Kibana访问(限制源IP)
iptables -A INPUT -p tcp --dport 5601 -s 192.168.1.0/24 -j ACCEPT
iptables -A INPUT -p tcp --dport 5601 -s 10.0.0.0/8 -j ACCEPT

# Logstash输入端口
iptables -A INPUT -p tcp --dport 5044 -s 192.168.1.0/24 -j ACCEPT  # Beats
iptables -A INPUT -p tcp --dport 8080 -s 192.168.1.0/24 -j ACCEPT  # HTTP
iptables -A INPUT -p tcp --dport 9600 -s 127.0.0.1 -j ACCEPT       # API

# 监控端口(仅本地)
iptables -A INPUT -p tcp --dport 9114 -s 127.0.0.1 -j ACCEPT  # Node Exporter
iptables -A INPUT -p tcp --dport 9108 -s 127.0.0.1 -j ACCEPT  # ES Exporter

# ICMP(ping)
iptables -A INPUT -p icmp --icmp-type echo-request -j ACCEPT

# 日志记录被拒绝的连接
iptables -A INPUT -m limit --limit 5/min -j LOG --log-prefix "iptables denied: " --log-level 7

# 保存规则
if command -v iptables-save > /dev/null; then
    iptables-save > /etc/iptables/rules.v4
fi

echo "Firewall rules applied successfully."

# 显示当前规则
echo "\nCurrent firewall rules:"
iptables -L -n -v

2. 网络监控

网络安全监控脚本:

# network_security_monitor.py
import subprocess
import re
import time
import json
from datetime import datetime
import smtplib
from email.mime.text import MimeText
from email.mime.multipart import MimeMultipart

class NetworkSecurityMonitor:
    def __init__(self, config_file="network_monitor_config.json"):
        self.config = self.load_config(config_file)
        self.alerts = []
    
    def load_config(self, config_file):
        """加载配置文件"""
        try:
            with open(config_file, 'r') as f:
                return json.load(f)
        except FileNotFoundError:
            # 默认配置
            return {
                "monitoring": {
                    "ports": [9200, 5601, 5044, 9600],
                    "max_connections_per_ip": 100,
                    "suspicious_patterns": [
                        r".*\.(exe|bat|cmd|ps1)$",
                        r".*[<>\"'&].*",
                        r".*union.*select.*",
                        r".*script.*alert.*"
                    ]
                },
                "alerting": {
                    "email": {
                        "enabled": True,
                        "smtp_server": "smtp.company.com",
                        "smtp_port": 587,
                        "username": "alerts@company.com",
                        "password": "alert_password",
                        "recipients": ["security@company.com"]
                    }
                }
            }
    
    def check_port_security(self):
        """检查端口安全性"""
        alerts = []
        
        for port in self.config['monitoring']['ports']:
            try:
                # 检查端口监听状态
                result = subprocess.run(
                    ['netstat', '-tlnp'],
                    capture_output=True,
                    text=True
                )
                
                port_lines = [line for line in result.stdout.split('\n') if f':{port}' in line]
                
                for line in port_lines:
                    if '0.0.0.0' in line:
                        alerts.append({
                            'type': 'security_warning',
                            'message': f'Port {port} is listening on all interfaces (0.0.0.0)',
                            'severity': 'medium',
                            'timestamp': datetime.now().isoformat()
                        })
                
            except Exception as e:
                alerts.append({
                    'type': 'monitoring_error',
                    'message': f'Failed to check port {port}: {e}',
                    'severity': 'low',
                    'timestamp': datetime.now().isoformat()
                })
        
        return alerts
    
    def check_connection_limits(self):
        """检查连接数限制"""
        alerts = []
        
        try:
            # 获取当前连接统计
            result = subprocess.run(
                ['netstat', '-tn'],
                capture_output=True,
                text=True
            )
            
            # 统计每个IP的连接数
            ip_connections = {}
            for line in result.stdout.split('\n'):
                if 'ESTABLISHED' in line:
                    parts = line.split()
                    if len(parts) >= 5:
                        foreign_addr = parts[4]
                        ip = foreign_addr.split(':')[0]
                        
                        if ip not in ip_connections:
                            ip_connections[ip] = 0
                        ip_connections[ip] += 1
            
            # 检查是否超过限制
            max_connections = self.config['monitoring']['max_connections_per_ip']
            for ip, count in ip_connections.items():
                if count > max_connections:
                    alerts.append({
                        'type': 'connection_limit_exceeded',
                        'message': f'IP {ip} has {count} connections (limit: {max_connections})',
                        'severity': 'high',
                        'timestamp': datetime.now().isoformat(),
                        'ip': ip,
                        'connection_count': count
                    })
        
        except Exception as e:
            alerts.append({
                'type': 'monitoring_error',
                'message': f'Failed to check connection limits: {e}',
                'severity': 'low',
                'timestamp': datetime.now().isoformat()
            })
        
        return alerts
    
    def check_log_patterns(self):
        """检查日志中的可疑模式"""
        alerts = []
        log_files = [
            '/var/log/elasticsearch/elasticsearch.log',
            '/var/log/kibana/kibana.log',
            '/var/log/logstash/logstash.log',
            '/var/log/nginx/access.log'
        ]
        
        suspicious_patterns = self.config['monitoring']['suspicious_patterns']
        
        for log_file in log_files:
            try:
                # 读取最近的日志条目
                result = subprocess.run(
                    ['tail', '-n', '1000', log_file],
                    capture_output=True,
                    text=True
                )
                
                for line_num, line in enumerate(result.stdout.split('\n'), 1):
                    for pattern in suspicious_patterns:
                        if re.search(pattern, line, re.IGNORECASE):
                            alerts.append({
                                'type': 'suspicious_pattern_detected',
                                'message': f'Suspicious pattern found in {log_file}:{line_num}',
                                'severity': 'high',
                                'timestamp': datetime.now().isoformat(),
                                'log_file': log_file,
                                'line_number': line_num,
                                'pattern': pattern,
                                'log_line': line[:200]  # 限制长度
                            })
            
            except Exception as e:
                alerts.append({
                    'type': 'monitoring_error',
                    'message': f'Failed to check log file {log_file}: {e}',
                    'severity': 'low',
                    'timestamp': datetime.now().isoformat()
                })
        
        return alerts
    
    def send_alert_email(self, alerts):
        """发送告警邮件"""
        if not self.config['alerting']['email']['enabled'] or not alerts:
            return
        
        try:
            smtp_config = self.config['alerting']['email']
            
            msg = MimeMultipart()
            msg['From'] = smtp_config['username']
            msg['To'] = ', '.join(smtp_config['recipients'])
            msg['Subject'] = f'ELK Stack Security Alert - {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}'
            
            # 构建邮件内容
            body = "ELK Stack Security Monitoring Alert\n\n"
            body += f"Alert Time: {datetime.now().isoformat()}\n"
            body += f"Total Alerts: {len(alerts)}\n\n"
            
            for alert in alerts:
                body += f"Type: {alert['type']}\n"
                body += f"Severity: {alert['severity']}\n"
                body += f"Message: {alert['message']}\n"
                body += f"Timestamp: {alert['timestamp']}\n"
                body += "-" * 50 + "\n"
            
            msg.attach(MimeText(body, 'plain'))
            
            # 发送邮件
            server = smtplib.SMTP(smtp_config['smtp_server'], smtp_config['smtp_port'])
            server.starttls()
            server.login(smtp_config['username'], smtp_config['password'])
            server.send_message(msg)
            server.quit()
            
            print(f"Alert email sent to {smtp_config['recipients']}")
        
        except Exception as e:
            print(f"Failed to send alert email: {e}")
    
    def monitor(self, duration_minutes=60, interval_seconds=300):
        """持续监控网络安全"""
        print(f"Starting network security monitoring for {duration_minutes} minutes...")
        
        end_time = time.time() + (duration_minutes * 60)
        
        while time.time() < end_time:
            all_alerts = []
            
            # 执行各项检查
            all_alerts.extend(self.check_port_security())
            all_alerts.extend(self.check_connection_limits())
            all_alerts.extend(self.check_log_patterns())
            
            if all_alerts:
                print(f"\n[{datetime.now()}] Security alerts detected:")
                for alert in all_alerts:
                    print(f"  {alert['severity'].upper()}: {alert['message']}")
                
                # 发送告警
                self.send_alert_email(all_alerts)
                self.alerts.extend(all_alerts)
            else:
                print(f"[{datetime.now()}] No security issues detected")
            
            time.sleep(interval_seconds)
        
        print(f"\nMonitoring completed. Total alerts: {len(self.alerts)}")
        return self.alerts

# 使用示例
if __name__ == "__main__":
    monitor = NetworkSecurityMonitor()
    monitor.monitor(duration_minutes=60, interval_seconds=300)

数据加密

1. 传输加密

TLS配置最佳实践:

# elasticsearch.yml - 高级TLS配置
xpack.security.transport.ssl.enabled: true
xpack.security.transport.ssl.verification_mode: full
xpack.security.transport.ssl.client_authentication: required
xpack.security.transport.ssl.supported_protocols: ["TLSv1.2", "TLSv1.3"]
xpack.security.transport.ssl.cipher_suites:
  - "TLS_AES_256_GCM_SHA384"
  - "TLS_AES_128_GCM_SHA256"
  - "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384"
  - "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256"

xpack.security.http.ssl.enabled: true
xpack.security.http.ssl.supported_protocols: ["TLSv1.2", "TLSv1.3"]
xpack.security.http.ssl.cipher_suites:
  - "TLS_AES_256_GCM_SHA384"
  - "TLS_AES_128_GCM_SHA256"
  - "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384"
  - "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256"

2. 静态数据加密

文件系统加密脚本:

#!/bin/bash
# setup-encryption.sh

DATA_DIR="/var/lib/elasticsearch"
ENCRYPTED_DIR="/encrypted/elasticsearch"
KEY_FILE="/etc/elasticsearch/encryption.key"

echo "Setting up filesystem encryption for Elasticsearch data..."

# 生成加密密钥
if [ ! -f "$KEY_FILE" ]; then
    echo "Generating encryption key..."
    openssl rand -base64 32 > "$KEY_FILE"
    chmod 600 "$KEY_FILE"
    chown elasticsearch:elasticsearch "$KEY_FILE"
fi

# 创建加密分区
echo "Creating encrypted partition..."
cryptsetup luksFormat /dev/sdb1 --key-file="$KEY_FILE"
cryptsetup luksOpen /dev/sdb1 elasticsearch-data --key-file="$KEY_FILE"

# 格式化并挂载
mkfs.ext4 /dev/mapper/elasticsearch-data
mkdir -p "$ENCRYPTED_DIR"
mount /dev/mapper/elasticsearch-data "$ENCRYPTED_DIR"

# 设置权限
chown -R elasticsearch:elasticsearch "$ENCRYPTED_DIR"
chmod 750 "$ENCRYPTED_DIR"

# 更新fstab
echo "/dev/mapper/elasticsearch-data $ENCRYPTED_DIR ext4 defaults 0 2" >> /etc/fstab

# 创建自动解锁脚本
cat > /etc/systemd/system/elasticsearch-decrypt.service << EOF
[Unit]
Description=Decrypt Elasticsearch Data Partition
Before=elasticsearch.service

[Service]
Type=oneshot
ExecStart=/usr/sbin/cryptsetup luksOpen /dev/sdb1 elasticsearch-data --key-file=$KEY_FILE
RemainAfterExit=yes

[Install]
WantedBy=multi-user.target
EOF

systemctl enable elasticsearch-decrypt.service

echo "Filesystem encryption setup completed."

3. 数据脱敏

Logstash数据脱敏配置:

# logstash-data-masking.conf
input {
  beats {
    port => 5044
  }
}

filter {
  # 脱敏信用卡号
  if [message] =~ /\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}/ {
    mutate {
      gsub => [
        "message", "\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}", "****-****-****-****"
      ]
    }
  }
  
  # 脱敏身份证号
  if [message] =~ /\d{17}[\dXx]/ {
    mutate {
      gsub => [
        "message", "(\d{6})\d{8}(\d{3}[\dXx])", "\1********\2"
      ]
    }
  }
  
  # 脱敏手机号
  if [message] =~ /1[3-9]\d{9}/ {
    mutate {
      gsub => [
        "message", "(1[3-9]\d)\d{4}(\d{4})", "\1****\2"
      ]
    }
  }
  
  # 脱敏邮箱地址
  if [message] =~ /[\w.-]+@[\w.-]+\.[a-zA-Z]{2,}/ {
    mutate {
      gsub => [
        "message", "([\w.-]{1,3})[\w.-]*@([\w.-]+\.[a-zA-Z]{2,})", "\1***@\2"
      ]
    }
  }
  
  # 脱敏IP地址(保留网段)
  if [message] =~ /\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b/ {
    mutate {
      gsub => [
        "message", "\b(\d{1,3}\.\d{1,3}\.)\d{1,3}\.\d{1,3}\b", "\1***.***"
      ]
    }
  }
  
  # 添加脱敏标记
  mutate {
    add_field => { "data_masked" => "true" }
    add_field => { "masking_timestamp" => "%{@timestamp}" }
  }
}

output {
  elasticsearch {
    hosts => ["localhost:9200"]
    index => "masked-logs-%{+YYYY.MM.dd}"
    user => "logstash_writer"
    password => "${LOGSTASH_PASSWORD}"
  }
}

Python数据脱敏工具:

#!/usr/bin/env python3
# data_masking_tool.py

import re
import json
import hashlib
from typing import Dict, List, Any
from datetime import datetime

class DataMaskingTool:
    def __init__(self):
        self.masking_rules = {
            'credit_card': {
                'pattern': r'\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}',
                'replacement': '****-****-****-****',
                'description': 'Credit card number'
            },
            'ssn': {
                'pattern': r'\d{3}-\d{2}-\d{4}',
                'replacement': '***-**-****',
                'description': 'Social Security Number'
            },
            'phone': {
                'pattern': r'1[3-9]\d{9}',
                'replacement': lambda m: m.group()[:3] + '****' + m.group()[-4:],
                'description': 'Phone number'
            },
            'email': {
                'pattern': r'[\w.-]+@[\w.-]+\.[a-zA-Z]{2,}',
                'replacement': lambda m: m.group().split('@')[0][:3] + '***@' + m.group().split('@')[1],
                'description': 'Email address'
            },
            'ip_address': {
                'pattern': r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b',
                'replacement': lambda m: '.'.join(m.group().split('.')[:2]) + '.***.***.***',
                'description': 'IP address'
            }
        }
        
        self.masking_stats = {
            'total_processed': 0,
            'total_masked': 0,
            'rules_applied': {}
        }
    
    def mask_text(self, text: str, rules: List[str] = None) -> Dict[str, Any]:
        """对文本进行脱敏处理"""
        if rules is None:
            rules = list(self.masking_rules.keys())
        
        original_text = text
        masked_text = text
        applied_rules = []
        
        for rule_name in rules:
            if rule_name not in self.masking_rules:
                continue
            
            rule = self.masking_rules[rule_name]
            pattern = rule['pattern']
            replacement = rule['replacement']
            
            matches = re.findall(pattern, masked_text)
            if matches:
                if callable(replacement):
                    masked_text = re.sub(pattern, replacement, masked_text)
                else:
                    masked_text = re.sub(pattern, replacement, masked_text)
                
                applied_rules.append({
                    'rule': rule_name,
                    'description': rule['description'],
                    'matches_count': len(matches)
                })
                
                # 更新统计
                if rule_name not in self.masking_stats['rules_applied']:
                    self.masking_stats['rules_applied'][rule_name] = 0
                self.masking_stats['rules_applied'][rule_name] += len(matches)
        
        self.masking_stats['total_processed'] += 1
        if applied_rules:
            self.masking_stats['total_masked'] += 1
        
        return {
            'original_text': original_text,
            'masked_text': masked_text,
            'applied_rules': applied_rules,
            'is_masked': len(applied_rules) > 0,
            'processing_timestamp': datetime.now().isoformat()
        }
    
    def mask_json_data(self, data: Dict[str, Any], sensitive_fields: List[str] = None) -> Dict[str, Any]:
        """对JSON数据进行脱敏处理"""
        if sensitive_fields is None:
            sensitive_fields = ['message', 'log', 'content', 'description']
        
        masked_data = data.copy()
        masking_info = []
        
        for field in sensitive_fields:
            if field in masked_data and isinstance(masked_data[field], str):
                result = self.mask_text(masked_data[field])
                masked_data[field] = result['masked_text']
                
                if result['is_masked']:
                    masking_info.extend(result['applied_rules'])
        
        # 添加脱敏元数据
        masked_data['_masking_info'] = {
            'is_masked': len(masking_info) > 0,
            'applied_rules': masking_info,
            'masking_timestamp': datetime.now().isoformat(),
            'original_hash': hashlib.sha256(json.dumps(data, sort_keys=True).encode()).hexdigest()[:16]
        }
        
        return masked_data
    
    def get_statistics(self) -> Dict[str, Any]:
        """获取脱敏统计信息"""
        return {
            'masking_stats': self.masking_stats,
            'masking_rate': self.masking_stats['total_masked'] / max(self.masking_stats['total_processed'], 1),
            'available_rules': list(self.masking_rules.keys()),
            'timestamp': datetime.now().isoformat()
        }
    
    def export_masked_data(self, data_list: List[Dict], output_file: str):
        """导出脱敏后的数据"""
        masked_data_list = []
        
        for data in data_list:
            masked_data = self.mask_json_data(data)
            masked_data_list.append(masked_data)
        
        with open(output_file, 'w', encoding='utf-8') as f:
            json.dump({
                'masked_data': masked_data_list,
                'statistics': self.get_statistics(),
                'export_timestamp': datetime.now().isoformat()
            }, f, indent=2, ensure_ascii=False)
        
        print(f"Masked data exported to {output_file}")
        print(f"Statistics: {self.get_statistics()}")

# 使用示例
if __name__ == "__main__":
    masker = DataMaskingTool()
    
    # 测试文本脱敏
    test_text = "用户张三的手机号是13812345678,邮箱是zhangsan@example.com,信用卡号是1234-5678-9012-3456"
    result = masker.mask_text(test_text)
    print(f"原文: {result['original_text']}")
    print(f"脱敏后: {result['masked_text']}")
    print(f"应用规则: {result['applied_rules']}")

合规性检查

1. GDPR合规检查

GDPR合规检查脚本:

#!/usr/bin/env python3
# gdpr_compliance_checker.py

import json
import re
from datetime import datetime, timedelta
from typing import Dict, List, Any
from elasticsearch import Elasticsearch

class GDPRComplianceChecker:
    def __init__(self, es_host='localhost:9200', es_user=None, es_password=None):
        if es_user and es_password:
            self.es = Elasticsearch(
                [es_host],
                http_auth=(es_user, es_password),
                verify_certs=True
            )
        else:
            self.es = Elasticsearch([es_host])
        
        self.compliance_rules = {
            'data_retention': {
                'max_retention_days': 365,
                'description': 'Data should not be retained longer than necessary'
            },
            'personal_data_identification': {
                'patterns': [
                    r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',  # Email
                    r'\b\d{3}-\d{2}-\d{4}\b',  # SSN
                    r'\b1[3-9]\d{9}\b',  # Phone
                    r'\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b'  # Credit Card
                ],
                'description': 'Personal data should be identified and protected'
            },
            'consent_tracking': {
                'required_fields': ['user_consent', 'consent_timestamp', 'consent_version'],
                'description': 'User consent should be tracked and documented'
            },
            'data_minimization': {
                'max_field_count': 50,
                'description': 'Only necessary data should be collected'
            }
        }
        
        self.compliance_report = {
            'check_timestamp': datetime.now().isoformat(),
            'violations': [],
            'recommendations': [],
            'summary': {}
        }
    
    def check_data_retention(self, index_pattern='*'):
        """检查数据保留期限"""
        violations = []
        
        try:
            # 获取所有索引
            indices = self.es.cat.indices(index=index_pattern, format='json')
            
            retention_limit = datetime.now() - timedelta(
                days=self.compliance_rules['data_retention']['max_retention_days']
            )
            
            for index in indices:
                index_name = index['index']
                
                # 检查索引中最旧的文档
                query = {
                    'size': 1,
                    'sort': [{'@timestamp': {'order': 'asc'}}],
                    'query': {'match_all': {}}
                }
                
                result = self.es.search(index=index_name, body=query)
                
                if result['hits']['hits']:
                    oldest_doc = result['hits']['hits'][0]
                    doc_timestamp = datetime.fromisoformat(
                        oldest_doc['_source']['@timestamp'].replace('Z', '+00:00')
                    )
                    
                    if doc_timestamp < retention_limit:
                        violations.append({
                            'type': 'data_retention_violation',
                            'index': index_name,
                            'oldest_document': doc_timestamp.isoformat(),
                            'retention_limit': retention_limit.isoformat(),
                            'days_over_limit': (retention_limit - doc_timestamp).days,
                            'severity': 'high'
                        })
        
        except Exception as e:
            violations.append({
                'type': 'data_retention_check_error',
                'error': str(e),
                'severity': 'medium'
            })
        
        return violations
    
    def check_personal_data_protection(self, index_pattern='*', sample_size=100):
        """检查个人数据保护"""
        violations = []
        
        try:
            query = {
                'size': sample_size,
                'query': {'match_all': {}}
            }
            
            result = self.es.search(index=index_pattern, body=query)
            
            for hit in result['hits']['hits']:
                doc = hit['_source']
                doc_id = hit['_id']
                index_name = hit['_index']
                
                # 检查是否包含个人数据
                personal_data_found = []
                
                for field, value in doc.items():
                    if isinstance(value, str):
                        for pattern in self.compliance_rules['personal_data_identification']['patterns']:
                            if re.search(pattern, value):
                                personal_data_found.append({
                                    'field': field,
                                    'pattern_type': self._get_pattern_type(pattern),
                                    'masked': self._is_data_masked(value)
                                })
                
                # 检查是否有未脱敏的个人数据
                unmasked_data = [item for item in personal_data_found if not item['masked']]
                
                if unmasked_data:
                    violations.append({
                        'type': 'unmasked_personal_data',
                        'index': index_name,
                        'document_id': doc_id,
                        'unmasked_fields': unmasked_data,
                        'severity': 'high'
                    })
                
                # 检查同意跟踪
                consent_fields = self.compliance_rules['consent_tracking']['required_fields']
                missing_consent_fields = [field for field in consent_fields if field not in doc]
                
                if missing_consent_fields and personal_data_found:
                    violations.append({
                        'type': 'missing_consent_tracking',
                        'index': index_name,
                        'document_id': doc_id,
                        'missing_fields': missing_consent_fields,
                        'severity': 'medium'
                    })
        
        except Exception as e:
            violations.append({
                'type': 'personal_data_check_error',
                'error': str(e),
                'severity': 'medium'
            })
        
        return violations
    
    def check_data_minimization(self, index_pattern='*', sample_size=50):
        """检查数据最小化原则"""
        violations = []
        
        try:
            query = {
                'size': sample_size,
                'query': {'match_all': {}}
            }
            
            result = self.es.search(index=index_pattern, body=query)
            
            for hit in result['hits']['hits']:
                doc = hit['_source']
                field_count = len(doc)
                
                if field_count > self.compliance_rules['data_minimization']['max_field_count']:
                    violations.append({
                        'type': 'data_minimization_violation',
                        'index': hit['_index'],
                        'document_id': hit['_id'],
                        'field_count': field_count,
                        'max_allowed': self.compliance_rules['data_minimization']['max_field_count'],
                        'severity': 'low'
                    })
        
        except Exception as e:
            violations.append({
                'type': 'data_minimization_check_error',
                'error': str(e),
                'severity': 'medium'
            })
        
        return violations
    
    def _get_pattern_type(self, pattern):
        """获取模式类型"""
        pattern_types = {
            r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b': 'email',
            r'\b\d{3}-\d{2}-\d{4}\b': 'ssn',
            r'\b1[3-9]\d{9}\b': 'phone',
            r'\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b': 'credit_card'
        }
        return pattern_types.get(pattern, 'unknown')
    
    def _is_data_masked(self, value):
        """检查数据是否已脱敏"""
        masked_patterns = ['***', '****', 'MASKED', 'REDACTED']
        return any(pattern in value for pattern in masked_patterns)
    
    def generate_compliance_report(self, index_pattern='*'):
        """生成合规性报告"""
        print("Starting GDPR compliance check...")
        
        # 执行各项检查
        retention_violations = self.check_data_retention(index_pattern)
        personal_data_violations = self.check_personal_data_protection(index_pattern)
        minimization_violations = self.check_data_minimization(index_pattern)
        
        # 汇总违规情况
        all_violations = retention_violations + personal_data_violations + minimization_violations
        
        # 生成建议
        recommendations = self._generate_recommendations(all_violations)
        
        # 更新报告
        self.compliance_report.update({
            'violations': all_violations,
            'recommendations': recommendations,
            'summary': {
                'total_violations': len(all_violations),
                'high_severity': len([v for v in all_violations if v.get('severity') == 'high']),
                'medium_severity': len([v for v in all_violations if v.get('severity') == 'medium']),
                'low_severity': len([v for v in all_violations if v.get('severity') == 'low']),
                'compliance_score': max(0, 100 - len(all_violations) * 5)
            }
        })
        
        return self.compliance_report
    
    def _generate_recommendations(self, violations):
        """生成改进建议"""
        recommendations = []
        
        violation_types = set(v['type'] for v in violations)
        
        if 'data_retention_violation' in violation_types:
            recommendations.append({
                'type': 'data_retention',
                'priority': 'high',
                'action': 'Implement automated data deletion policies',
                'description': 'Set up Index Lifecycle Management (ILM) to automatically delete old data'
            })
        
        if 'unmasked_personal_data' in violation_types:
            recommendations.append({
                'type': 'data_masking',
                'priority': 'high',
                'action': 'Implement data masking in Logstash pipelines',
                'description': 'Configure Logstash filters to mask personal data before indexing'
            })
        
        if 'missing_consent_tracking' in violation_types:
            recommendations.append({
                'type': 'consent_management',
                'priority': 'medium',
                'action': 'Implement consent tracking system',
                'description': 'Add consent metadata to all documents containing personal data'
            })
        
        if 'data_minimization_violation' in violation_types:
            recommendations.append({
                'type': 'data_minimization',
                'priority': 'low',
                'action': 'Review data collection practices',
                'description': 'Reduce the number of fields collected to only what is necessary'
            })
        
        return recommendations
    
    def export_report(self, output_file='gdpr_compliance_report.json'):
        """导出合规性报告"""
        with open(output_file, 'w', encoding='utf-8') as f:
            json.dump(self.compliance_report, f, indent=2, ensure_ascii=False)
        
        print(f"GDPR compliance report exported to {output_file}")
        print(f"Compliance Score: {self.compliance_report['summary']['compliance_score']}/100")
        print(f"Total Violations: {self.compliance_report['summary']['total_violations']}")

# 使用示例
if __name__ == "__main__":
    checker = GDPRComplianceChecker(
        es_host='localhost:9200',
        es_user='elastic',
        es_password='your_password'
    )
    
    report = checker.generate_compliance_report('logs-*')
    checker.export_report()

总结

本章详细介绍了ELK Stack的安全配置与权限管理,涵盖了以下核心内容:

🔐 核心安全功能

  • X-Pack Security配置:SSL/TLS加密、用户认证、角色权限
  • 用户和角色管理:细粒度权限控制、API密钥管理
  • 网络安全:防火墙配置、端口安全、连接监控
  • 数据加密:传输加密、静态数据加密、密钥管理

🛡️ 数据保护策略

  • 数据脱敏:敏感信息自动识别和脱敏处理
  • 访问控制:基于角色的访问控制(RBAC)
  • 审计日志:完整的操作审计和安全事件记录
  • 合规性检查:GDPR等法规的自动化合规检查

📊 监控和告警

  • 安全监控:实时安全威胁检测和响应
  • 性能监控:安全配置对性能的影响监控
  • 告警机制:多渠道安全告警和通知

🎯 技术亮点

  • 自动化脚本:Python和Shell脚本实现安全配置自动化
  • 最佳实践:企业级安全配置模板和规范
  • 实战案例:真实场景下的安全配置示例

💡 实践价值

  • 企业级安全:满足企业级安全要求和合规需求
  • 运维效率:自动化安全配置和监控,提升运维效率
  • 风险控制:有效识别和控制安全风险
  • 合规保障:确保数据处理符合相关法规要求

通过本章的学习,您将能够: - 配置完整的ELK Stack安全体系 - 实现细粒度的用户权限管理 - 建立有效的数据保护机制 - 满足企业级安全和合规要求

下一章我们将学习ELK Stack的最佳实践与案例分析,通过实际案例深入了解ELK Stack在不同场景下的应用和优化策略。 “`