1. X-Pack Security基础

1.1 启用安全功能

# elasticsearch.yml 安全配置
xpack.security.enabled: true
xpack.security.transport.ssl.enabled: true
xpack.security.transport.ssl.verification_mode: certificate
xpack.security.transport.ssl.client_authentication: required
xpack.security.transport.ssl.keystore.path: elastic-certificates.p12
xpack.security.transport.ssl.truststore.path: elastic-certificates.p12

# HTTP SSL配置
xpack.security.http.ssl.enabled: true
xpack.security.http.ssl.keystore.path: elastic-certificates.p12

# 审计日志配置
xpack.security.audit.enabled: true
xpack.security.audit.logfile.events.include: access_denied, access_granted, anonymous_access_denied, authentication_failed, connection_denied, tampered_request, run_as_denied, run_as_granted
xpack.security.audit.logfile.events.exclude: _system_indices_access

1.2 生成SSL证书

#!/bin/bash
# generate_certificates.sh

set -e

ES_HOME="/opt/elasticsearch"
CERT_DIR="$ES_HOME/config/certs"

# 创建证书目录
mkdir -p $CERT_DIR
cd $CERT_DIR

# 生成CA证书
echo "生成CA证书..."
$ES_HOME/bin/elasticsearch-certutil ca --out elastic-stack-ca.p12 --pass ""

# 生成节点证书
echo "生成节点证书..."
$ES_HOME/bin/elasticsearch-certutil cert --ca elastic-stack-ca.p12 --out elastic-certificates.p12 --pass ""

# 生成HTTP证书
echo "生成HTTP证书..."
$ES_HOME/bin/elasticsearch-certutil http

# 设置权限
chown -R elasticsearch:elasticsearch $CERT_DIR
chmod 660 $CERT_DIR/*.p12

echo "证书生成完成!"
echo "证书位置: $CERT_DIR"
echo "请将证书复制到所有节点的相同位置"

1.3 初始化安全设置

#!/bin/bash
# setup_security.sh

set -e

ES_HOME="/opt/elasticsearch"
ES_HOST="localhost"
ES_PORT="9200"

echo "设置内置用户密码..."

# 自动生成密码
$ES_HOME/bin/elasticsearch-setup-passwords auto

# 或者交互式设置密码
# $ES_HOME/bin/elasticsearch-setup-passwords interactive

echo "安全设置完成!"
echo "请保存生成的密码,并更新应用程序配置"

2. 用户和角色管理

2.1 用户管理API

import requests
import json
from typing import Dict, List, Any, Optional
from requests.auth import HTTPBasicAuth

class ElasticsearchUserManager:
    def __init__(self, es_host: str, username: str, password: str, use_ssl: bool = True):
        self.es_host = es_host
        self.protocol = "https" if use_ssl else "http"
        self.es_url = f"{self.protocol}://{es_host}:9200"
        self.auth = HTTPBasicAuth(username, password)
        self.session = requests.Session()
        self.session.auth = self.auth
        self.session.verify = False  # 在生产环境中应该验证SSL证书
    
    def create_user(self, username: str, password: str, roles: List[str], 
                   full_name: str = None, email: str = None, metadata: Dict = None) -> bool:
        """创建用户"""
        user_data = {
            "password": password,
            "roles": roles
        }
        
        if full_name:
            user_data["full_name"] = full_name
        if email:
            user_data["email"] = email
        if metadata:
            user_data["metadata"] = metadata
        
        try:
            response = self.session.put(
                f"{self.es_url}/_security/user/{username}",
                json=user_data,
                timeout=30
            )
            
            if response.status_code in [200, 201]:
                print(f"用户 {username} 创建成功")
                return True
            else:
                print(f"创建用户失败: {response.text}")
                return False
        
        except Exception as e:
            print(f"创建用户异常: {e}")
            return False
    
    def update_user(self, username: str, **kwargs) -> bool:
        """更新用户信息"""
        try:
            response = self.session.post(
                f"{self.es_url}/_security/user/{username}/_password",
                json={"password": kwargs.get("password")},
                timeout=30
            )
            
            if response.status_code == 200:
                print(f"用户 {username} 更新成功")
                return True
            else:
                print(f"更新用户失败: {response.text}")
                return False
        
        except Exception as e:
            print(f"更新用户异常: {e}")
            return False
    
    def delete_user(self, username: str) -> bool:
        """删除用户"""
        try:
            response = self.session.delete(
                f"{self.es_url}/_security/user/{username}",
                timeout=30
            )
            
            if response.status_code == 200:
                print(f"用户 {username} 删除成功")
                return True
            else:
                print(f"删除用户失败: {response.text}")
                return False
        
        except Exception as e:
            print(f"删除用户异常: {e}")
            return False
    
    def get_user(self, username: str) -> Optional[Dict[str, Any]]:
        """获取用户信息"""
        try:
            response = self.session.get(
                f"{self.es_url}/_security/user/{username}",
                timeout=30
            )
            
            if response.status_code == 200:
                return response.json()[username]
            else:
                print(f"获取用户信息失败: {response.text}")
                return None
        
        except Exception as e:
            print(f"获取用户信息异常: {e}")
            return None
    
    def list_users(self) -> Dict[str, Any]:
        """列出所有用户"""
        try:
            response = self.session.get(
                f"{self.es_url}/_security/user",
                timeout=30
            )
            
            if response.status_code == 200:
                return response.json()
            else:
                print(f"获取用户列表失败: {response.text}")
                return {}
        
        except Exception as e:
            print(f"获取用户列表异常: {e}")
            return {}
    
    def create_role(self, role_name: str, cluster_privileges: List[str], 
                   index_privileges: List[Dict[str, Any]], 
                   application_privileges: List[Dict[str, Any]] = None) -> bool:
        """创建角色"""
        role_data = {
            "cluster": cluster_privileges,
            "indices": index_privileges
        }
        
        if application_privileges:
            role_data["applications"] = application_privileges
        
        try:
            response = self.session.put(
                f"{self.es_url}/_security/role/{role_name}",
                json=role_data,
                timeout=30
            )
            
            if response.status_code in [200, 201]:
                print(f"角色 {role_name} 创建成功")
                return True
            else:
                print(f"创建角色失败: {response.text}")
                return False
        
        except Exception as e:
            print(f"创建角色异常: {e}")
            return False
    
    def delete_role(self, role_name: str) -> bool:
        """删除角色"""
        try:
            response = self.session.delete(
                f"{self.es_url}/_security/role/{role_name}",
                timeout=30
            )
            
            if response.status_code == 200:
                print(f"角色 {role_name} 删除成功")
                return True
            else:
                print(f"删除角色失败: {response.text}")
                return False
        
        except Exception as e:
            print(f"删除角色异常: {e}")
            return False
    
    def get_role(self, role_name: str) -> Optional[Dict[str, Any]]:
        """获取角色信息"""
        try:
            response = self.session.get(
                f"{self.es_url}/_security/role/{role_name}",
                timeout=30
            )
            
            if response.status_code == 200:
                return response.json()[role_name]
            else:
                print(f"获取角色信息失败: {response.text}")
                return None
        
        except Exception as e:
            print(f"获取角色信息异常: {e}")
            return None
    
    def list_roles(self) -> Dict[str, Any]:
        """列出所有角色"""
        try:
            response = self.session.get(
                f"{self.es_url}/_security/role",
                timeout=30
            )
            
            if response.status_code == 200:
                return response.json()
            else:
                print(f"获取角色列表失败: {response.text}")
                return {}
        
        except Exception as e:
            print(f"获取角色列表异常: {e}")
            return {}

# 使用示例
if __name__ == "__main__":
    # 创建用户管理器
    user_manager = ElasticsearchUserManager(
        es_host="localhost",
        username="elastic",
        password="your_password"
    )
    
    # 创建自定义角色
    print("=== 创建角色 ===")
    
    # 只读角色
    readonly_role = user_manager.create_role(
        role_name="logs_readonly",
        cluster_privileges=["monitor"],
        index_privileges=[
            {
                "names": ["logs-*"],
                "privileges": ["read", "view_index_metadata"]
            }
        ]
    )
    
    # 数据分析师角色
    analyst_role = user_manager.create_role(
        role_name="data_analyst",
        cluster_privileges=["monitor", "manage_index_templates"],
        index_privileges=[
            {
                "names": ["logs-*", "metrics-*"],
                "privileges": ["read", "view_index_metadata", "create_index"]
            },
            {
                "names": ["reports-*"],
                "privileges": ["all"]
            }
        ]
    )
    
    # 开发者角色
    developer_role = user_manager.create_role(
        role_name="developer",
        cluster_privileges=["monitor", "manage_index_templates", "manage_ingest_pipelines"],
        index_privileges=[
            {
                "names": ["dev-*", "test-*"],
                "privileges": ["all"]
            },
            {
                "names": ["logs-*"],
                "privileges": ["read", "view_index_metadata"]
            }
        ]
    )
    
    # 创建用户
    print("\n=== 创建用户 ===")
    
    # 只读用户
    user_manager.create_user(
        username="readonly_user",
        password="readonly_pass123",
        roles=["logs_readonly"],
        full_name="Read Only User",
        email="readonly@company.com"
    )
    
    # 数据分析师用户
    user_manager.create_user(
        username="analyst_user",
        password="analyst_pass123",
        roles=["data_analyst"],
        full_name="Data Analyst",
        email="analyst@company.com"
    )
    
    # 开发者用户
    user_manager.create_user(
        username="dev_user",
        password="dev_pass123",
        roles=["developer"],
        full_name="Developer User",
        email="developer@company.com"
    )
    
    # 列出所有用户和角色
    print("\n=== 用户列表 ===")
    users = user_manager.list_users()
    for username, user_info in users.items():
        print(f"用户: {username}, 角色: {user_info.get('roles', [])}")
    
    print("\n=== 角色列表 ===")
    roles = user_manager.list_roles()
    for role_name, role_info in roles.items():
        if not role_name.startswith('_'):  # 跳过内置角色
            print(f"角色: {role_name}")
            print(f"  集群权限: {role_info.get('cluster', [])}")
            print(f"  索引权限: {len(role_info.get('indices', []))} 个索引规则")

2.2 基于属性的访问控制(ABAC)

from typing import Dict, List, Any
import json

class ElasticsearchABACManager:
    def __init__(self, user_manager: ElasticsearchUserManager):
        self.user_manager = user_manager
    
    def create_department_role(self, department: str, access_level: str) -> bool:
        """基于部门创建角色"""
        role_name = f"{department}_{access_level}"
        
        # 根据访问级别定义权限
        if access_level == "read":
            cluster_privileges = ["monitor"]
            index_privileges = [
                {
                    "names": [f"{department}-*"],
                    "privileges": ["read", "view_index_metadata"],
                    "query": {
                        "term": {
                            "department.keyword": department
                        }
                    }
                }
            ]
        elif access_level == "write":
            cluster_privileges = ["monitor", "manage_index_templates"]
            index_privileges = [
                {
                    "names": [f"{department}-*"],
                    "privileges": ["read", "write", "create_index", "view_index_metadata"],
                    "query": {
                        "term": {
                            "department.keyword": department
                        }
                    }
                }
            ]
        elif access_level == "admin":
            cluster_privileges = ["monitor", "manage_index_templates", "manage_ingest_pipelines"]
            index_privileges = [
                {
                    "names": [f"{department}-*"],
                    "privileges": ["all"]
                }
            ]
        else:
            print(f"未知的访问级别: {access_level}")
            return False
        
        return self.user_manager.create_role(
            role_name=role_name,
            cluster_privileges=cluster_privileges,
            index_privileges=index_privileges
        )
    
    def create_time_based_role(self, role_name: str, start_time: str, end_time: str) -> bool:
        """创建基于时间的角色"""
        index_privileges = [
            {
                "names": ["logs-*"],
                "privileges": ["read", "view_index_metadata"],
                "query": {
                    "range": {
                        "@timestamp": {
                            "gte": start_time,
                            "lte": end_time
                        }
                    }
                }
            }
        ]
        
        return self.user_manager.create_role(
            role_name=role_name,
            cluster_privileges=["monitor"],
            index_privileges=index_privileges
        )
    
    def create_field_level_security_role(self, role_name: str, allowed_fields: List[str], 
                                       denied_fields: List[str] = None) -> bool:
        """创建字段级安全角色"""
        field_security = {
            "grant": allowed_fields
        }
        
        if denied_fields:
            field_security["except"] = denied_fields
        
        index_privileges = [
            {
                "names": ["sensitive-*"],
                "privileges": ["read", "view_index_metadata"],
                "field_security": field_security
            }
        ]
        
        return self.user_manager.create_role(
            role_name=role_name,
            cluster_privileges=["monitor"],
            index_privileges=index_privileges
        )
    
    def setup_multi_tenant_security(self, tenants: List[str]) -> bool:
        """设置多租户安全"""
        success = True
        
        for tenant in tenants:
            # 为每个租户创建不同级别的角色
            roles = [
                (f"{tenant}_admin", "admin"),
                (f"{tenant}_user", "write"),
                (f"{tenant}_readonly", "read")
            ]
            
            for role_name, access_level in roles:
                if not self.create_department_role(tenant, access_level):
                    success = False
                    print(f"创建角色 {role_name} 失败")
        
        return success

# 使用示例
if __name__ == "__main__":
    user_manager = ElasticsearchUserManager(
        es_host="localhost",
        username="elastic",
        password="your_password"
    )
    
    abac_manager = ElasticsearchABACManager(user_manager)
    
    # 设置多租户安全
    print("=== 设置多租户安全 ===")
    tenants = ["finance", "hr", "engineering", "marketing"]
    abac_manager.setup_multi_tenant_security(tenants)
    
    # 创建基于时间的角色
    print("\n=== 创建基于时间的角色 ===")
    abac_manager.create_time_based_role(
        role_name="last_month_logs",
        start_time="2024-01-01",
        end_time="2024-01-31"
    )
    
    # 创建字段级安全角色
    print("\n=== 创建字段级安全角色 ===")
    abac_manager.create_field_level_security_role(
        role_name="pii_restricted",
        allowed_fields=["@timestamp", "level", "message", "service"],
        denied_fields=["user.email", "user.phone", "credit_card"]
    )
    
    # 创建对应的用户
    print("\n=== 创建租户用户 ===")
    
    # 财务部门用户
    user_manager.create_user(
        username="finance_admin",
        password="finance_admin_pass123",
        roles=["finance_admin"],
        full_name="Finance Administrator",
        email="finance.admin@company.com",
        metadata={"department": "finance", "level": "admin"}
    )
    
    user_manager.create_user(
        username="finance_user",
        password="finance_user_pass123",
        roles=["finance_user"],
        full_name="Finance User",
        email="finance.user@company.com",
        metadata={"department": "finance", "level": "user"}
    )
    
    # HR部门用户
    user_manager.create_user(
        username="hr_readonly",
        password="hr_readonly_pass123",
        roles=["hr_readonly", "pii_restricted"],
        full_name="HR Read Only User",
        email="hr.readonly@company.com",
        metadata={"department": "hr", "level": "readonly"}
    )

3. API密钥管理

3.1 API密钥创建和管理

import base64
import json
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional

class ElasticsearchAPIKeyManager:
    def __init__(self, user_manager: ElasticsearchUserManager):
        self.user_manager = user_manager
        self.session = user_manager.session
        self.es_url = user_manager.es_url
    
    def create_api_key(self, name: str, role_descriptors: Dict[str, Any] = None, 
                      expiration: str = None, metadata: Dict[str, Any] = None) -> Optional[Dict[str, Any]]:
        """创建API密钥"""
        api_key_data = {
            "name": name
        }
        
        if role_descriptors:
            api_key_data["role_descriptors"] = role_descriptors
        
        if expiration:
            api_key_data["expiration"] = expiration
        
        if metadata:
            api_key_data["metadata"] = metadata
        
        try:
            response = self.session.post(
                f"{self.es_url}/_security/api_key",
                json=api_key_data,
                timeout=30
            )
            
            if response.status_code == 200:
                result = response.json()
                print(f"API密钥 {name} 创建成功")
                print(f"ID: {result['id']}")
                print(f"API Key: {result['api_key']}")
                
                # 生成Base64编码的认证字符串
                auth_string = f"{result['id']}:{result['api_key']}"
                encoded_auth = base64.b64encode(auth_string.encode()).decode()
                print(f"Authorization Header: ApiKey {encoded_auth}")
                
                return result
            else:
                print(f"创建API密钥失败: {response.text}")
                return None
        
        except Exception as e:
            print(f"创建API密钥异常: {e}")
            return None
    
    def get_api_key_info(self, api_key_id: str = None, name: str = None) -> Optional[Dict[str, Any]]:
        """获取API密钥信息"""
        try:
            if api_key_id:
                url = f"{self.es_url}/_security/api_key?id={api_key_id}"
            elif name:
                url = f"{self.es_url}/_security/api_key?name={name}"
            else:
                url = f"{self.es_url}/_security/api_key"
            
            response = self.session.get(url, timeout=30)
            
            if response.status_code == 200:
                return response.json()
            else:
                print(f"获取API密钥信息失败: {response.text}")
                return None
        
        except Exception as e:
            print(f"获取API密钥信息异常: {e}")
            return None
    
    def invalidate_api_key(self, api_key_id: str = None, name: str = None) -> bool:
        """使API密钥失效"""
        invalidate_data = {}
        
        if api_key_id:
            invalidate_data["id"] = api_key_id
        elif name:
            invalidate_data["name"] = name
        else:
            print("必须提供API密钥ID或名称")
            return False
        
        try:
            response = self.session.delete(
                f"{self.es_url}/_security/api_key",
                json=invalidate_data,
                timeout=30
            )
            
            if response.status_code == 200:
                result = response.json()
                print(f"成功使 {result['invalidated_api_keys']} 个API密钥失效")
                return True
            else:
                print(f"使API密钥失效失败: {response.text}")
                return False
        
        except Exception as e:
            print(f"使API密钥失效异常: {e}")
            return False
    
    def create_application_api_keys(self, app_name: str, environments: List[str]) -> Dict[str, Dict[str, Any]]:
        """为应用程序创建不同环境的API密钥"""
        api_keys = {}
        
        for env in environments:
            # 根据环境定义不同的权限
            if env == "production":
                role_descriptors = {
                    f"{app_name}_prod": {
                        "cluster": ["monitor"],
                        "indices": [
                            {
                                "names": [f"{app_name}-prod-*"],
                                "privileges": ["read", "write", "create_index"]
                            }
                        ]
                    }
                }
                expiration = "365d"  # 生产环境密钥有效期1年
            elif env == "staging":
                role_descriptors = {
                    f"{app_name}_staging": {
                        "cluster": ["monitor"],
                        "indices": [
                            {
                                "names": [f"{app_name}-staging-*"],
                                "privileges": ["all"]
                            }
                        ]
                    }
                }
                expiration = "90d"  # 测试环境密钥有效期90天
            elif env == "development":
                role_descriptors = {
                    f"{app_name}_dev": {
                        "cluster": ["monitor", "manage_index_templates"],
                        "indices": [
                            {
                                "names": [f"{app_name}-dev-*"],
                                "privileges": ["all"]
                            }
                        ]
                    }
                }
                expiration = "30d"  # 开发环境密钥有效期30天
            else:
                continue
            
            api_key_name = f"{app_name}_{env}_api_key"
            metadata = {
                "application": app_name,
                "environment": env,
                "created_at": datetime.now().isoformat()
            }
            
            api_key = self.create_api_key(
                name=api_key_name,
                role_descriptors=role_descriptors,
                expiration=expiration,
                metadata=metadata
            )
            
            if api_key:
                api_keys[env] = api_key
        
        return api_keys
    
    def rotate_api_key(self, old_key_name: str, new_key_name: str = None) -> Optional[Dict[str, Any]]:
        """轮换API密钥"""
        # 获取旧密钥信息
        old_key_info = self.get_api_key_info(name=old_key_name)
        if not old_key_info or not old_key_info.get('api_keys'):
            print(f"未找到API密钥: {old_key_name}")
            return None
        
        old_key = old_key_info['api_keys'][0]
        
        # 创建新密钥(使用相同的角色描述符)
        new_name = new_key_name or f"{old_key_name}_rotated_{datetime.now().strftime('%Y%m%d')}"
        
        new_key = self.create_api_key(
            name=new_name,
            role_descriptors=old_key.get('role_descriptors', {}),
            expiration=old_key.get('expiration'),
            metadata={
                **old_key.get('metadata', {}),
                'rotated_from': old_key['id'],
                'rotated_at': datetime.now().isoformat()
            }
        )
        
        if new_key:
            print(f"新API密钥创建成功,请更新应用程序配置后再删除旧密钥")
            print(f"旧密钥ID: {old_key['id']}")
            print(f"新密钥ID: {new_key['id']}")
            
            # 注意:这里不自动删除旧密钥,需要手动确认后删除
            return new_key
        
        return None
    
    def list_api_keys_by_application(self, app_name: str) -> List[Dict[str, Any]]:
        """列出应用程序的所有API密钥"""
        all_keys = self.get_api_key_info()
        if not all_keys:
            return []
        
        app_keys = []
        for key in all_keys.get('api_keys', []):
            metadata = key.get('metadata', {})
            if metadata.get('application') == app_name:
                app_keys.append(key)
        
        return app_keys
    
    def check_expiring_keys(self, days_threshold: int = 30) -> List[Dict[str, Any]]:
        """检查即将过期的API密钥"""
        all_keys = self.get_api_key_info()
        if not all_keys:
            return []
        
        expiring_keys = []
        threshold_date = datetime.now() + timedelta(days=days_threshold)
        
        for key in all_keys.get('api_keys', []):
            if key.get('expiration'):
                # 解析过期时间
                expiration_str = key['expiration']
                try:
                    expiration_date = datetime.fromisoformat(expiration_str.replace('Z', '+00:00'))
                    if expiration_date <= threshold_date:
                        expiring_keys.append({
                            'id': key['id'],
                            'name': key['name'],
                            'expiration': expiration_str,
                            'days_until_expiry': (expiration_date - datetime.now()).days
                        })
                except ValueError:
                    continue
        
        return expiring_keys

# 使用示例
if __name__ == "__main__":
    user_manager = ElasticsearchUserManager(
        es_host="localhost",
        username="elastic",
        password="your_password"
    )
    
    api_key_manager = ElasticsearchAPIKeyManager(user_manager)
    
    # 为应用程序创建API密钥
    print("=== 创建应用程序API密钥 ===")
    app_keys = api_key_manager.create_application_api_keys(
        app_name="web_app",
        environments=["development", "staging", "production"]
    )
    
    for env, key_info in app_keys.items():
        print(f"\n{env.upper()} 环境:")
        print(f"  密钥ID: {key_info['id']}")
        print(f"  API Key: {key_info['api_key'][:20]}...")
    
    # 创建监控API密钥
    print("\n=== 创建监控API密钥 ===")
    monitoring_key = api_key_manager.create_api_key(
        name="monitoring_readonly",
        role_descriptors={
            "monitoring": {
                "cluster": ["monitor"],
                "indices": [
                    {
                        "names": ["*"],
                        "privileges": ["read", "view_index_metadata"]
                    }
                ]
            }
        },
        expiration="180d",
        metadata={
            "purpose": "monitoring",
            "team": "ops"
        }
    )
    
    # 检查即将过期的密钥
    print("\n=== 检查即将过期的密钥 ===")
    expiring_keys = api_key_manager.check_expiring_keys(days_threshold=30)
    
    if expiring_keys:
        print("发现即将过期的API密钥:")
        for key in expiring_keys:
            print(f"  - {key['name']} (ID: {key['id']}) - {key['days_until_expiry']} 天后过期")
    else:
        print("没有即将过期的API密钥")
    
    # 列出所有API密钥
    print("\n=== API密钥列表 ===")
    all_keys = api_key_manager.get_api_key_info()
    if all_keys:
        for key in all_keys.get('api_keys', []):
            print(f"密钥: {key['name']} (ID: {key['id']})")
            if key.get('metadata'):
                print(f"  元数据: {key['metadata']}")

4. 审计日志配置

4.1 审计日志设置

# elasticsearch.yml 审计配置
xpack.security.audit.enabled: true

# 审计日志输出到文件
xpack.security.audit.outputs: [logfile]
xpack.security.audit.logfile.events.include: [
  access_denied,
  access_granted,
  anonymous_access_denied,
  authentication_failed,
  authentication_success,
  connection_denied,
  connection_granted,
  tampered_request,
  run_as_denied,
  run_as_granted,
  security_config_change
]

# 排除系统索引访问
xpack.security.audit.logfile.events.exclude: [
  _system_indices_access
]

# 审计日志文件配置
xpack.security.audit.logfile.events.emit_request_body: true
xpack.security.audit.logfile.prefix: audit
xpack.security.audit.logfile.suffix: .log
xpack.security.audit.logfile.rollover: daily

# 忽略特定用户的审计
xpack.security.audit.logfile.events.ignore_filters:
  monitor_user:
    users: ["monitoring_user", "beats_system"]
    actions: ["indices:data/read/*"]
  
  kibana_system:
    users: ["kibana_system"]
    indices: [".kibana*"]

4.2 审计日志分析工具

import json
import re
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional
from collections import defaultdict, Counter
import pandas as pd

class ElasticsearchAuditAnalyzer:
    def __init__(self, audit_log_path: str):
        self.audit_log_path = audit_log_path
        self.events = []
        self.load_audit_logs()
    
    def load_audit_logs(self):
        """加载审计日志"""
        try:
            with open(self.audit_log_path, 'r', encoding='utf-8') as f:
                for line in f:
                    line = line.strip()
                    if line:
                        try:
                            event = json.loads(line)
                            self.events.append(event)
                        except json.JSONDecodeError:
                            continue
            
            print(f"加载了 {len(self.events)} 条审计日志")
        
        except FileNotFoundError:
            print(f"审计日志文件不存在: {self.audit_log_path}")
        except Exception as e:
            print(f"加载审计日志失败: {e}")
    
    def analyze_failed_authentications(self, time_window_hours: int = 24) -> Dict[str, Any]:
        """分析认证失败事件"""
        cutoff_time = datetime.now() - timedelta(hours=time_window_hours)
        
        failed_auths = []
        for event in self.events:
            if (event.get('event_type') == 'authentication_failed' and 
                datetime.fromisoformat(event.get('@timestamp', '').replace('Z', '+00:00')) > cutoff_time):
                failed_auths.append(event)
        
        # 按用户名统计
        user_failures = Counter()
        ip_failures = Counter()
        
        for event in failed_auths:
            user = event.get('user', {}).get('name', 'unknown')
            ip = event.get('origin', {}).get('address', 'unknown')
            
            user_failures[user] += 1
            ip_failures[ip] += 1
        
        # 检测暴力破解攻击
        brute_force_threshold = 10
        potential_attacks = {
            'users': {user: count for user, count in user_failures.items() if count >= brute_force_threshold},
            'ips': {ip: count for ip, count in ip_failures.items() if count >= brute_force_threshold}
        }
        
        return {
            'total_failures': len(failed_auths),
            'unique_users': len(user_failures),
            'unique_ips': len(ip_failures),
            'top_failed_users': user_failures.most_common(10),
            'top_failed_ips': ip_failures.most_common(10),
            'potential_brute_force': potential_attacks
        }
    
    def analyze_access_patterns(self) -> Dict[str, Any]:
        """分析访问模式"""
        access_events = []
        for event in self.events:
            if event.get('event_type') in ['access_granted', 'access_denied']:
                access_events.append(event)
        
        # 按用户统计访问
        user_access = defaultdict(lambda: {'granted': 0, 'denied': 0})
        index_access = defaultdict(lambda: {'granted': 0, 'denied': 0})
        action_access = defaultdict(lambda: {'granted': 0, 'denied': 0})
        
        for event in access_events:
            user = event.get('user', {}).get('name', 'unknown')
            indices = event.get('indices', [])
            action = event.get('action', 'unknown')
            event_type = event.get('event_type')
            
            status = 'granted' if event_type == 'access_granted' else 'denied'
            
            user_access[user][status] += 1
            action_access[action][status] += 1
            
            for index in indices:
                index_access[index][status] += 1
        
        return {
            'user_access_summary': dict(user_access),
            'index_access_summary': dict(index_access),
            'action_access_summary': dict(action_access),
            'total_access_events': len(access_events)
        }
    
    def detect_anomalies(self) -> List[Dict[str, Any]]:
        """检测异常行为"""
        anomalies = []
        
        # 检测异常时间访问
        for event in self.events:
            timestamp = event.get('@timestamp', '')
            if timestamp:
                try:
                    dt = datetime.fromisoformat(timestamp.replace('Z', '+00:00'))
                    hour = dt.hour
                    
                    # 检测非工作时间访问(晚上10点到早上6点)
                    if hour >= 22 or hour <= 6:
                        if event.get('event_type') == 'access_granted':
                            anomalies.append({
                                'type': 'off_hours_access',
                                'timestamp': timestamp,
                                'user': event.get('user', {}).get('name'),
                                'action': event.get('action'),
                                'indices': event.get('indices', [])
                            })
                except ValueError:
                    continue
        
        # 检测权限提升尝试
        for event in self.events:
            if event.get('event_type') == 'access_denied':
                action = event.get('action', '')
                if any(admin_action in action for admin_action in 
                      ['cluster:admin', 'indices:admin', 'cluster:manage']):
                    anomalies.append({
                        'type': 'privilege_escalation_attempt',
                        'timestamp': event.get('@timestamp'),
                        'user': event.get('user', {}).get('name'),
                        'action': action,
                        'indices': event.get('indices', [])
                    })
        
        # 检测异常IP访问
        user_ips = defaultdict(set)
        for event in self.events:
            user = event.get('user', {}).get('name')
            ip = event.get('origin', {}).get('address')
            if user and ip:
                user_ips[user].add(ip)
        
        for user, ips in user_ips.items():
            if len(ips) > 5:  # 用户从超过5个不同IP访问
                anomalies.append({
                    'type': 'multiple_ip_access',
                    'user': user,
                    'ip_count': len(ips),
                    'ips': list(ips)
                })
        
        return anomalies
    
    def generate_security_report(self) -> str:
        """生成安全报告"""
        auth_analysis = self.analyze_failed_authentications()
        access_analysis = self.analyze_access_patterns()
        anomalies = self.detect_anomalies()
        
        report = "=== Elasticsearch 安全审计报告 ===\n\n"
        
        # 认证失败分析
        report += "## 认证失败分析\n"
        report += f"总失败次数: {auth_analysis['total_failures']}\n"
        report += f"涉及用户数: {auth_analysis['unique_users']}\n"
        report += f"涉及IP数: {auth_analysis['unique_ips']}\n\n"
        
        if auth_analysis['top_failed_users']:
            report += "失败次数最多的用户:\n"
            for user, count in auth_analysis['top_failed_users'][:5]:
                report += f"  - {user}: {count} 次\n"
            report += "\n"
        
        if auth_analysis['potential_brute_force']['users']:
            report += "⚠️ 可能的暴力破解攻击:\n"
            for user, count in auth_analysis['potential_brute_force']['users'].items():
                report += f"  - 用户 {user}: {count} 次失败\n"
            report += "\n"
        
        # 访问模式分析
        report += "## 访问模式分析\n"
        report += f"总访问事件数: {access_analysis['total_access_events']}\n\n"
        
        # 异常检测
        report += "## 异常行为检测\n"
        if anomalies:
            anomaly_types = Counter(anomaly['type'] for anomaly in anomalies)
            for anomaly_type, count in anomaly_types.items():
                report += f"- {anomaly_type}: {count} 次\n"
            
            report += "\n详细异常事件:\n"
            for anomaly in anomalies[:10]:  # 只显示前10个
                report += f"  类型: {anomaly['type']}\n"
                if 'user' in anomaly:
                    report += f"  用户: {anomaly['user']}\n"
                if 'timestamp' in anomaly:
                    report += f"  时间: {anomaly['timestamp']}\n"
                report += "  ---\n"
        else:
            report += "✅ 未发现异常行为\n"
        
        return report
    
    def export_to_csv(self, output_file: str):
        """导出审计日志到CSV"""
        if not self.events:
            print("没有审计日志数据可导出")
            return
        
        # 提取关键字段
        csv_data = []
        for event in self.events:
            csv_data.append({
                'timestamp': event.get('@timestamp', ''),
                'event_type': event.get('event_type', ''),
                'user': event.get('user', {}).get('name', ''),
                'origin_ip': event.get('origin', {}).get('address', ''),
                'action': event.get('action', ''),
                'indices': ','.join(event.get('indices', [])),
                'request_id': event.get('request', {}).get('id', '')
            })
        
        df = pd.DataFrame(csv_data)
        df.to_csv(output_file, index=False)
        print(f"审计日志已导出到: {output_file}")

# 使用示例
if __name__ == "__main__":
    # 分析审计日志
    analyzer = ElasticsearchAuditAnalyzer('/var/log/elasticsearch/audit.log')
    
    # 生成安全报告
    print("=== 生成安全报告 ===")
    report = analyzer.generate_security_report()
    print(report)
    
    # 导出到CSV
    print("\n=== 导出审计日志 ===")
    analyzer.export_to_csv('/tmp/elasticsearch_audit.csv')
    
    # 详细分析
    print("\n=== 详细分析 ===")
    auth_analysis = analyzer.analyze_failed_authentications()
    print(f"认证失败分析: {json.dumps(auth_analysis, indent=2, ensure_ascii=False)}")
    
    anomalies = analyzer.detect_anomalies()
    print(f"\n检测到 {len(anomalies)} 个异常事件")

5. 章节总结

本章详细介绍了Elasticsearch的安全配置与权限管理,主要内容包括:

5.1 核心知识点

  1. X-Pack Security基础

    • SSL/TLS证书配置
    • 安全功能启用
    • 内置用户初始化
  2. 用户和角色管理

    • 用户CRUD操作
    • 角色权限配置
    • 基于属性的访问控制(ABAC)
    • 多租户安全架构
  3. API密钥管理

    • API密钥创建和轮换
    • 应用程序密钥管理
    • 密钥过期监控
  4. 审计日志

    • 审计日志配置
    • 安全事件分析
    • 异常行为检测

5.2 最佳实践

  1. 权限最小化原则

    • 只授予必要的最小权限
    • 定期审查和清理权限
    • 使用角色而非直接权限分配
  2. 密钥管理

    • 定期轮换API密钥
    • 为不同环境使用不同密钥
    • 监控密钥使用情况
  3. 审计监控

    • 启用全面的审计日志
    • 定期分析安全事件
    • 建立异常检测机制

5.3 练习题

  1. 设计一个多租户Elasticsearch集群的安全架构
  2. 实现一个API密钥自动轮换系统
  3. 配置基于时间和地理位置的访问控制
  4. 建立一个安全事件监控和告警系统
  5. 实现字段级安全和文档级安全控制

通过本章的学习,你应该能够全面配置和管理Elasticsearch集群的安全功能,保护数据安全和访问控制。