6.1 事件通知概述

6.1.1 事件通知的概念和应用场景

MinIO事件通知是一种实时监控对象存储操作的机制,当存储桶中发生特定事件时,MinIO会自动发送通知到配置的目标。

主要应用场景: - 实时数据处理:文件上传后自动触发处理流程 - 内容审核:新上传的图片或视频自动进行审核 - 数据同步:跨系统的数据同步和备份 - 监控告警:异常操作的实时监控和告警 - 工作流触发:基于文件操作的自动化工作流

from minio import Minio
from minio.error import S3Error
from datetime import datetime, timedelta
import json
import requests
import threading
import time
from typing import Dict, List, Any, Optional, Callable
from flask import Flask, request, jsonify
import uuid
import hashlib
import hmac
import base64
from urllib.parse import urlparse

class MinIOEventNotification:
    """MinIO事件通知管理器"""
    
    def __init__(self, client: Minio):
        self.client = client
        self.event_handlers = {}
        self.webhook_endpoints = {}
        self.notification_history = []
        self.lock = threading.Lock()
    
    def get_supported_events(self) -> Dict[str, Any]:
        """获取支持的事件类型"""
        return {
            'success': True,
            'supported_events': {
                'object_events': {
                    's3:ObjectCreated:*': '对象创建(所有类型)',
                    's3:ObjectCreated:Put': '通过PUT创建对象',
                    's3:ObjectCreated:Post': '通过POST创建对象',
                    's3:ObjectCreated:Copy': '通过复制创建对象',
                    's3:ObjectCreated:CompleteMultipartUpload': '多部分上传完成',
                    's3:ObjectRemoved:*': '对象删除(所有类型)',
                    's3:ObjectRemoved:Delete': '对象删除',
                    's3:ObjectRemoved:DeleteMarkerCreated': '删除标记创建',
                    's3:ObjectAccessed:*': '对象访问(所有类型)',
                    's3:ObjectAccessed:Get': '对象读取',
                    's3:ObjectAccessed:Head': '对象元数据访问'
                },
                'bucket_events': {
                    's3:BucketCreated': '存储桶创建',
                    's3:BucketRemoved': '存储桶删除'
                }
            },
            'notification_targets': {
                'webhook': 'HTTP Webhook',
                'amqp': 'AMQP消息队列',
                'redis': 'Redis发布/订阅',
                'nats': 'NATS消息系统',
                'postgresql': 'PostgreSQL数据库',
                'mysql': 'MySQL数据库',
                'kafka': 'Apache Kafka',
                'elasticsearch': 'Elasticsearch'
            }
        }
    
    def configure_webhook_notification(self, bucket_name: str, webhook_url: str,
                                     events: List[str], prefix: str = "",
                                     suffix: str = "") -> Dict[str, Any]:
        """配置Webhook通知"""
        try:
            # 验证存储桶是否存在
            if not self.client.bucket_exists(bucket_name):
                return {
                    'success': False,
                    'error': f"存储桶不存在: {bucket_name}",
                    'bucket_name': bucket_name
                }
            
            # 验证Webhook URL
            parsed_url = urlparse(webhook_url)
            if not parsed_url.scheme or not parsed_url.netloc:
                return {
                    'success': False,
                    'error': f"无效的Webhook URL: {webhook_url}",
                    'webhook_url': webhook_url
                }
            
            # 生成配置ID
            config_id = str(uuid.uuid4())
            
            # 构建通知配置
            notification_config = {
                'webhook_configs': [{
                    'id': config_id,
                    'webhook_url': webhook_url,
                    'events': events,
                    'filter': {
                        'key': {
                            'filter_rules': []
                        }
                    }
                }]
            }
            
            # 添加前缀过滤
            if prefix:
                notification_config['webhook_configs'][0]['filter']['key']['filter_rules'].append({
                    'name': 'prefix',
                    'value': prefix
                })
            
            # 添加后缀过滤
            if suffix:
                notification_config['webhook_configs'][0]['filter']['key']['filter_rules'].append({
                    'name': 'suffix',
                    'value': suffix
                })
            
            # 设置存储桶通知配置
            self.client.set_bucket_notification(bucket_name, notification_config)
            
            # 记录配置
            with self.lock:
                if bucket_name not in self.webhook_endpoints:
                    self.webhook_endpoints[bucket_name] = []
                
                self.webhook_endpoints[bucket_name].append({
                    'config_id': config_id,
                    'webhook_url': webhook_url,
                    'events': events,
                    'prefix': prefix,
                    'suffix': suffix,
                    'created_at': datetime.now().isoformat(),
                    'status': 'active'
                })
            
            return {
                'success': True,
                'message': f"Webhook通知配置成功: {bucket_name}",
                'bucket_name': bucket_name,
                'config_id': config_id,
                'webhook_url': webhook_url,
                'events': events,
                'prefix': prefix,
                'suffix': suffix,
                'created_at': datetime.now().isoformat()
            }
            
        except S3Error as e:
            return {
                'success': False,
                'error': f"配置Webhook通知失败: {e}",
                'bucket_name': bucket_name,
                'webhook_url': webhook_url
            }
    
    def get_bucket_notification(self, bucket_name: str) -> Dict[str, Any]:
        """获取存储桶通知配置"""
        try:
            # 获取通知配置
            notification_config = self.client.get_bucket_notification(bucket_name)
            
            # 解析配置
            webhook_configs = []
            if hasattr(notification_config, 'webhook_configs'):
                for config in notification_config.webhook_configs:
                    webhook_configs.append({
                        'id': getattr(config, 'id', 'unknown'),
                        'webhook_url': getattr(config, 'webhook_url', ''),
                        'events': getattr(config, 'events', []),
                        'filter_rules': getattr(config.filter.key, 'filter_rules', []) if hasattr(config, 'filter') else []
                    })
            
            return {
                'success': True,
                'bucket_name': bucket_name,
                'webhook_configs': webhook_configs,
                'total_configs': len(webhook_configs),
                'retrieved_at': datetime.now().isoformat()
            }
            
        except S3Error as e:
            return {
                'success': False,
                'error': f"获取通知配置失败: {e}",
                'bucket_name': bucket_name
            }
    
    def remove_bucket_notification(self, bucket_name: str) -> Dict[str, Any]:
        """移除存储桶通知配置"""
        try:
            # 移除通知配置
            self.client.remove_bucket_notification(bucket_name)
            
            # 清理本地记录
            with self.lock:
                if bucket_name in self.webhook_endpoints:
                    del self.webhook_endpoints[bucket_name]
            
            return {
                'success': True,
                'message': f"通知配置已移除: {bucket_name}",
                'bucket_name': bucket_name,
                'removed_at': datetime.now().isoformat()
            }
            
        except S3Error as e:
            return {
                'success': False,
                'error': f"移除通知配置失败: {e}",
                'bucket_name': bucket_name
            }
    
    def test_webhook_endpoint(self, webhook_url: str, test_payload: Dict = None) -> Dict[str, Any]:
        """测试Webhook端点"""
        try:
            # 构建测试负载
            if test_payload is None:
                test_payload = {
                    'EventName': 's3:ObjectCreated:Put',
                    'Key': 'test-object.txt',
                    'Records': [{
                        'eventVersion': '2.0',
                        'eventSource': 'minio:s3',
                        'eventTime': datetime.now().isoformat(),
                        'eventName': 's3:ObjectCreated:Put',
                        's3': {
                            'bucket': {
                                'name': 'test-bucket'
                            },
                            'object': {
                                'key': 'test-object.txt',
                                'size': 1024,
                                'eTag': 'test-etag'
                            }
                        }
                    }]
                }
            
            # 发送测试请求
            start_time = time.time()
            
            response = requests.post(
                webhook_url,
                json=test_payload,
                headers={
                    'Content-Type': 'application/json',
                    'User-Agent': 'MinIO/test'
                },
                timeout=10
            )
            
            response_time = time.time() - start_time
            
            return {
                'success': True,
                'webhook_url': webhook_url,
                'status_code': response.status_code,
                'response_time_ms': response_time * 1000,
                'response_headers': dict(response.headers),
                'response_body': response.text[:500],  # 限制响应体长度
                'is_reachable': response.status_code < 400,
                'tested_at': datetime.now().isoformat()
            }
            
        except requests.exceptions.RequestException as e:
            return {
                'success': False,
                'error': f"Webhook测试失败: {e}",
                'webhook_url': webhook_url,
                'is_reachable': False,
                'tested_at': datetime.now().isoformat()
            }
    
    def register_event_handler(self, event_type: str, handler_func: Callable) -> Dict[str, Any]:
        """注册事件处理器"""
        try:
            handler_id = str(uuid.uuid4())
            
            with self.lock:
                if event_type not in self.event_handlers:
                    self.event_handlers[event_type] = {}
                
                self.event_handlers[event_type][handler_id] = {
                    'handler_func': handler_func,
                    'registered_at': datetime.now().isoformat(),
                    'call_count': 0,
                    'last_called': None,
                    'error_count': 0
                }
            
            return {
                'success': True,
                'message': f"事件处理器注册成功: {event_type}",
                'handler_id': handler_id,
                'event_type': event_type,
                'registered_at': datetime.now().isoformat()
            }
            
        except Exception as e:
            return {
                'success': False,
                'error': f"注册事件处理器失败: {e}",
                'event_type': event_type
            }
    
    def process_webhook_event(self, event_data: Dict) -> Dict[str, Any]:
        """处理Webhook事件"""
        try:
            # 解析事件数据
            if 'Records' not in event_data:
                return {
                    'success': False,
                    'error': '无效的事件数据格式',
                    'event_data': event_data
                }
            
            processed_events = []
            
            for record in event_data['Records']:
                event_name = record.get('eventName', 'unknown')
                bucket_name = record.get('s3', {}).get('bucket', {}).get('name', 'unknown')
                object_key = record.get('s3', {}).get('object', {}).get('key', 'unknown')
                object_size = record.get('s3', {}).get('object', {}).get('size', 0)
                event_time = record.get('eventTime', datetime.now().isoformat())
                
                # 记录事件
                event_info = {
                    'event_id': str(uuid.uuid4()),
                    'event_name': event_name,
                    'bucket_name': bucket_name,
                    'object_key': object_key,
                    'object_size': object_size,
                    'event_time': event_time,
                    'processed_at': datetime.now().isoformat(),
                    'handlers_called': []
                }
                
                # 调用注册的处理器
                with self.lock:
                    # 查找匹配的处理器
                    matching_handlers = []
                    
                    # 精确匹配
                    if event_name in self.event_handlers:
                        matching_handlers.extend(self.event_handlers[event_name].items())
                    
                    # 通配符匹配
                    wildcard_event = event_name.split(':')[0] + ':*'
                    if wildcard_event in self.event_handlers:
                        matching_handlers.extend(self.event_handlers[wildcard_event].items())
                    
                    # 调用处理器
                    for handler_id, handler_info in matching_handlers:
                        try:
                            handler_func = handler_info['handler_func']
                            handler_result = handler_func(record)
                            
                            # 更新处理器统计
                            handler_info['call_count'] += 1
                            handler_info['last_called'] = datetime.now().isoformat()
                            
                            event_info['handlers_called'].append({
                                'handler_id': handler_id,
                                'success': True,
                                'result': handler_result
                            })
                            
                        except Exception as handler_error:
                            handler_info['error_count'] += 1
                            
                            event_info['handlers_called'].append({
                                'handler_id': handler_id,
                                'success': False,
                                'error': str(handler_error)
                            })
                
                processed_events.append(event_info)
                
                # 添加到历史记录
                with self.lock:
                    self.notification_history.append(event_info)
                    
                    # 保持历史记录数量限制
                    if len(self.notification_history) > 1000:
                        self.notification_history = self.notification_history[-1000:]
            
            return {
                'success': True,
                'message': f"处理了 {len(processed_events)} 个事件",
                'processed_events': processed_events,
                'processed_at': datetime.now().isoformat()
            }
            
        except Exception as e:
            return {
                'success': False,
                'error': f"处理Webhook事件失败: {e}",
                'event_data': event_data
            }
    
    def get_notification_history(self, limit: int = 50, event_type: str = None,
                               bucket_name: str = None) -> Dict[str, Any]:
        """获取通知历史"""
        try:
            with self.lock:
                history = self.notification_history.copy()
            
            # 过滤条件
            if event_type:
                history = [event for event in history if event['event_name'] == event_type]
            
            if bucket_name:
                history = [event for event in history if event['bucket_name'] == bucket_name]
            
            # 按时间倒序排列
            history.sort(key=lambda x: x['processed_at'], reverse=True)
            
            # 限制数量
            history = history[:limit]
            
            return {
                'success': True,
                'total_events': len(history),
                'events': history,
                'filters': {
                    'event_type': event_type,
                    'bucket_name': bucket_name,
                    'limit': limit
                },
                'retrieved_at': datetime.now().isoformat()
            }
            
        except Exception as e:
            return {
                'success': False,
                'error': f"获取通知历史失败: {e}"
            }
    
    def get_handler_statistics(self) -> Dict[str, Any]:
        """获取处理器统计"""
        try:
            with self.lock:
                handler_stats = {}
                
                for event_type, handlers in self.event_handlers.items():
                    handler_stats[event_type] = []
                    
                    for handler_id, handler_info in handlers.items():
                        handler_stats[event_type].append({
                            'handler_id': handler_id,
                            'registered_at': handler_info['registered_at'],
                            'call_count': handler_info['call_count'],
                            'last_called': handler_info['last_called'],
                            'error_count': handler_info['error_count'],
                            'success_rate': (
                                (handler_info['call_count'] - handler_info['error_count']) / 
                                handler_info['call_count'] * 100
                            ) if handler_info['call_count'] > 0 else 0
                        })
            
            return {
                'success': True,
                'handler_statistics': handler_stats,
                'total_event_types': len(handler_stats),
                'total_handlers': sum(len(handlers) for handlers in handler_stats.values()),
                'generated_at': datetime.now().isoformat()
            }
            
        except Exception as e:
            return {
                'success': False,
                'error': f"获取处理器统计失败: {e}"
            }

## 6.2 Webhook服务器

### 6.2.1 Flask Webhook接收器

```python
class WebhookServer:
    """Webhook服务器"""
    
    def __init__(self, host: str = '0.0.0.0', port: int = 5000, secret_key: str = None):
        self.host = host
        self.port = port
        self.secret_key = secret_key
        self.app = Flask(__name__)
        self.notification_manager = None
        self.received_webhooks = []
        self.lock = threading.Lock()
        
        # 设置路由
        self._setup_routes()
    
    def _setup_routes(self):
        """设置路由"""
        
        @self.app.route('/webhook', methods=['POST'])
        def handle_webhook():
            """处理Webhook请求"""
            try:
                # 验证请求
                if not self._verify_request(request):
                    return jsonify({
                        'success': False,
                        'error': 'Invalid signature'
                    }), 401
                
                # 获取事件数据
                event_data = request.get_json()
                
                if not event_data:
                    return jsonify({
                        'success': False,
                        'error': 'No JSON data provided'
                    }), 400
                
                # 记录接收到的Webhook
                webhook_info = {
                    'webhook_id': str(uuid.uuid4()),
                    'received_at': datetime.now().isoformat(),
                    'source_ip': request.remote_addr,
                    'user_agent': request.headers.get('User-Agent', ''),
                    'content_type': request.headers.get('Content-Type', ''),
                    'event_data': event_data,
                    'processed': False
                }
                
                with self.lock:
                    self.received_webhooks.append(webhook_info)
                    
                    # 保持记录数量限制
                    if len(self.received_webhooks) > 500:
                        self.received_webhooks = self.received_webhooks[-500:]
                
                # 处理事件
                if self.notification_manager:
                    result = self.notification_manager.process_webhook_event(event_data)
                    webhook_info['processed'] = result['success']
                    webhook_info['process_result'] = result
                else:
                    webhook_info['processed'] = True
                    webhook_info['process_result'] = {'success': True, 'message': 'No notification manager configured'}
                
                return jsonify({
                    'success': True,
                    'message': 'Webhook processed successfully',
                    'webhook_id': webhook_info['webhook_id'],
                    'processed_at': webhook_info['received_at']
                }), 200
                
            except Exception as e:
                return jsonify({
                    'success': False,
                    'error': f'Webhook processing failed: {e}'
                }), 500
        
        @self.app.route('/webhook/test', methods=['GET'])
        def test_endpoint():
            """测试端点"""
            return jsonify({
                'success': True,
                'message': 'Webhook endpoint is working',
                'server_time': datetime.now().isoformat(),
                'endpoint': '/webhook'
            }), 200
        
        @self.app.route('/webhook/status', methods=['GET'])
        def get_status():
            """获取服务器状态"""
            with self.lock:
                total_webhooks = len(self.received_webhooks)
                processed_webhooks = sum(1 for w in self.received_webhooks if w['processed'])
            
            return jsonify({
                'success': True,
                'server_status': {
                    'host': self.host,
                    'port': self.port,
                    'total_webhooks_received': total_webhooks,
                    'processed_webhooks': processed_webhooks,
                    'processing_rate': (processed_webhooks / total_webhooks * 100) if total_webhooks > 0 else 0,
                    'has_notification_manager': self.notification_manager is not None,
                    'uptime': datetime.now().isoformat()
                }
            }), 200
        
        @self.app.route('/webhook/history', methods=['GET'])
        def get_webhook_history():
            """获取Webhook历史"""
            limit = request.args.get('limit', 50, type=int)
            
            with self.lock:
                history = self.received_webhooks.copy()
            
            # 按时间倒序排列
            history.sort(key=lambda x: x['received_at'], reverse=True)
            history = history[:limit]
            
            return jsonify({
                'success': True,
                'total_webhooks': len(history),
                'webhooks': history,
                'limit': limit
            }), 200
    
    def _verify_request(self, request) -> bool:
        """验证请求签名"""
        if not self.secret_key:
            return True  # 没有配置密钥时跳过验证
        
        signature = request.headers.get('X-Minio-Signature')
        if not signature:
            return False
        
        # 计算期望的签名
        body = request.get_data()
        expected_signature = hmac.new(
            self.secret_key.encode('utf-8'),
            body,
            hashlib.sha256
        ).hexdigest()
        
        return hmac.compare_digest(signature, expected_signature)
    
    def set_notification_manager(self, notification_manager: MinIOEventNotification):
        """设置通知管理器"""
        self.notification_manager = notification_manager
    
    def start_server(self, debug: bool = False, threaded: bool = True):
        """启动服务器"""
        print(f"启动Webhook服务器: http://{self.host}:{self.port}")
        print(f"Webhook端点: http://{self.host}:{self.port}/webhook")
        print(f"测试端点: http://{self.host}:{self.port}/webhook/test")
        print(f"状态端点: http://{self.host}:{self.port}/webhook/status")
        
        self.app.run(
            host=self.host,
            port=self.port,
            debug=debug,
            threaded=threaded
        )
    
    def get_received_webhooks(self, limit: int = 50) -> List[Dict]:
        """获取接收到的Webhook"""
        with self.lock:
            webhooks = self.received_webhooks.copy()
        
        # 按时间倒序排列
        webhooks.sort(key=lambda x: x['received_at'], reverse=True)
        return webhooks[:limit]

## 6.3 事件处理器

### 6.3.1 常用事件处理器实现

```python
class EventHandlers:
    """事件处理器集合"""
    
    def __init__(self, client: Minio):
        self.client = client
        self.processed_objects = set()
        self.processing_stats = {
            'total_processed': 0,
            'successful_processed': 0,
            'failed_processed': 0,
            'start_time': datetime.now().isoformat()
        }
    
    def image_processing_handler(self, event_record: Dict) -> Dict[str, Any]:
        """图片处理事件处理器"""
        try:
            # 解析事件信息
            bucket_name = event_record['s3']['bucket']['name']
            object_key = event_record['s3']['object']['key']
            object_size = event_record['s3']['object']['size']
            
            # 检查是否为图片文件
            image_extensions = ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.webp']
            if not any(object_key.lower().endswith(ext) for ext in image_extensions):
                return {
                    'success': True,
                    'message': '非图片文件,跳过处理',
                    'object_key': object_key
                }
            
            print(f"处理图片: {bucket_name}/{object_key} ({object_size} bytes)")
            
            # 模拟图片处理(实际应用中可以集成图片处理库)
            processing_tasks = [
                '生成缩略图',
                '提取EXIF信息',
                '图片格式转换',
                '质量压缩'
            ]
            
            results = []
            for task in processing_tasks:
                # 模拟处理时间
                time.sleep(0.1)
                
                results.append({
                    'task': task,
                    'status': 'completed',
                    'processed_at': datetime.now().isoformat()
                })
            
            # 更新统计
            self.processing_stats['total_processed'] += 1
            self.processing_stats['successful_processed'] += 1
            
            return {
                'success': True,
                'message': f'图片处理完成: {object_key}',
                'bucket_name': bucket_name,
                'object_key': object_key,
                'object_size': object_size,
                'processing_results': results,
                'processed_at': datetime.now().isoformat()
            }
            
        except Exception as e:
            self.processing_stats['total_processed'] += 1
            self.processing_stats['failed_processed'] += 1
            
            return {
                'success': False,
                'error': f'图片处理失败: {e}',
                'event_record': event_record
            }
    
    def backup_handler(self, event_record: Dict) -> Dict[str, Any]:
        """备份事件处理器"""
        try:
            # 解析事件信息
            bucket_name = event_record['s3']['bucket']['name']
            object_key = event_record['s3']['object']['key']
            
            # 跳过已处理的对象
            object_id = f"{bucket_name}/{object_key}"
            if object_id in self.processed_objects:
                return {
                    'success': True,
                    'message': '对象已备份,跳过',
                    'object_key': object_key
                }
            
            print(f"备份对象: {bucket_name}/{object_key}")
            
            # 构建备份存储桶名称
            backup_bucket = f"{bucket_name}-backup"
            backup_object_key = f"backup/{datetime.now().strftime('%Y/%m/%d')}/{object_key}"
            
            # 确保备份存储桶存在
            if not self.client.bucket_exists(backup_bucket):
                self.client.make_bucket(backup_bucket)
                print(f"创建备份存储桶: {backup_bucket}")
            
            # 复制对象到备份存储桶
            copy_source = {
                'Bucket': bucket_name,
                'Key': object_key
            }
            
            self.client.copy_object(
                bucket_name=backup_bucket,
                object_name=backup_object_key,
                object_source=f"{bucket_name}/{object_key}"
            )
            
            # 记录已处理的对象
            self.processed_objects.add(object_id)
            
            return {
                'success': True,
                'message': f'对象备份完成: {object_key}',
                'source_bucket': bucket_name,
                'source_object': object_key,
                'backup_bucket': backup_bucket,
                'backup_object': backup_object_key,
                'backed_up_at': datetime.now().isoformat()
            }
            
        except Exception as e:
            return {
                'success': False,
                'error': f'备份失败: {e}',
                'event_record': event_record
            }
    
    def audit_log_handler(self, event_record: Dict) -> Dict[str, Any]:
        """审计日志事件处理器"""
        try:
            # 解析事件信息
            event_name = event_record['eventName']
            bucket_name = event_record['s3']['bucket']['name']
            object_key = event_record['s3']['object']['key']
            event_time = event_record['eventTime']
            
            # 构建审计日志
            audit_log = {
                'audit_id': str(uuid.uuid4()),
                'event_name': event_name,
                'bucket_name': bucket_name,
                'object_key': object_key,
                'event_time': event_time,
                'source_ip': event_record.get('sourceIPAddress', 'unknown'),
                'user_identity': event_record.get('userIdentity', {}),
                'request_parameters': event_record.get('requestParameters', {}),
                'response_elements': event_record.get('responseElements', {}),
                'logged_at': datetime.now().isoformat()
            }
            
            # 这里可以将审计日志写入数据库、文件或发送到日志系统
            print(f"审计日志: {event_name} - {bucket_name}/{object_key}")
            
            # 模拟写入日志文件
            log_filename = f"audit_log_{datetime.now().strftime('%Y%m%d')}.json"
            
            try:
                with open(log_filename, 'a', encoding='utf-8') as f:
                    f.write(json.dumps(audit_log, ensure_ascii=False) + '\n')
            except Exception as log_error:
                print(f"写入审计日志文件失败: {log_error}")
            
            return {
                'success': True,
                'message': f'审计日志记录完成: {event_name}',
                'audit_id': audit_log['audit_id'],
                'event_name': event_name,
                'bucket_name': bucket_name,
                'object_key': object_key,
                'log_file': log_filename,
                'logged_at': audit_log['logged_at']
            }
            
        except Exception as e:
            return {
                'success': False,
                'error': f'审计日志记录失败: {e}',
                'event_record': event_record
            }
    
    def content_analysis_handler(self, event_record: Dict) -> Dict[str, Any]:
        """内容分析事件处理器"""
        try:
            # 解析事件信息
            bucket_name = event_record['s3']['bucket']['name']
            object_key = event_record['s3']['object']['key']
            object_size = event_record['s3']['object']['size']
            
            print(f"分析内容: {bucket_name}/{object_key}")
            
            # 获取对象信息
            try:
                obj_stat = self.client.stat_object(bucket_name, object_key)
                content_type = obj_stat.content_type
            except:
                content_type = 'unknown'
            
            # 模拟内容分析
            analysis_results = {
                'file_type': content_type,
                'file_size': object_size,
                'analysis_timestamp': datetime.now().isoformat()
            }
            
            # 根据文件类型进行不同的分析
            if content_type.startswith('text/'):
                analysis_results.update({
                    'text_analysis': {
                        'estimated_word_count': object_size // 6,  # 估算单词数
                        'language': 'auto-detected',
                        'encoding': 'utf-8'
                    }
                })
            elif content_type.startswith('image/'):
                analysis_results.update({
                    'image_analysis': {
                        'estimated_dimensions': 'unknown',
                        'color_profile': 'sRGB',
                        'has_metadata': True
                    }
                })
            elif content_type.startswith('video/'):
                analysis_results.update({
                    'video_analysis': {
                        'estimated_duration': object_size // 1000000,  # 粗略估算
                        'codec': 'unknown',
                        'resolution': 'unknown'
                    }
                })
            
            # 安全扫描(模拟)
            analysis_results['security_scan'] = {
                'virus_scan': 'clean',
                'malware_scan': 'clean',
                'scan_timestamp': datetime.now().isoformat()
            }
            
            return {
                'success': True,
                'message': f'内容分析完成: {object_key}',
                'bucket_name': bucket_name,
                'object_key': object_key,
                'analysis_results': analysis_results,
                'analyzed_at': datetime.now().isoformat()
            }
            
        except Exception as e:
            return {
                'success': False,
                'error': f'内容分析失败: {e}',
                'event_record': event_record
            }
    
    def notification_handler(self, event_record: Dict) -> Dict[str, Any]:
        """通知事件处理器"""
        try:
            # 解析事件信息
            event_name = event_record['eventName']
            bucket_name = event_record['s3']['bucket']['name']
            object_key = event_record['s3']['object']['key']
            
            # 构建通知消息
            if 'ObjectCreated' in event_name:
                message = f"新文件上传: {object_key} 到存储桶 {bucket_name}"
                priority = 'normal'
            elif 'ObjectRemoved' in event_name:
                message = f"文件删除: {object_key} 从存储桶 {bucket_name}"
                priority = 'high'
            else:
                message = f"对象操作: {event_name} - {bucket_name}/{object_key}"
                priority = 'low'
            
            # 模拟发送通知(实际应用中可以集成邮件、短信、即时消息等)
            notification_channels = ['email', 'slack', 'webhook']
            
            sent_notifications = []
            for channel in notification_channels:
                # 模拟发送延迟
                time.sleep(0.05)
                
                sent_notifications.append({
                    'channel': channel,
                    'status': 'sent',
                    'sent_at': datetime.now().isoformat()
                })
            
            return {
                'success': True,
                'message': f'通知发送完成: {event_name}',
                'event_name': event_name,
                'bucket_name': bucket_name,
                'object_key': object_key,
                'notification_message': message,
                'priority': priority,
                'sent_notifications': sent_notifications,
                'notified_at': datetime.now().isoformat()
            }
            
        except Exception as e:
            return {
                'success': False,
                'error': f'通知发送失败: {e}',
                'event_record': event_record
            }
    
    def get_processing_statistics(self) -> Dict[str, Any]:
        """获取处理统计"""
        current_time = datetime.now()
        start_time = datetime.fromisoformat(self.processing_stats['start_time'])
        uptime_seconds = (current_time - start_time).total_seconds()
        
        return {
            'success': True,
            'processing_statistics': {
                'total_processed': self.processing_stats['total_processed'],
                'successful_processed': self.processing_stats['successful_processed'],
                'failed_processed': self.processing_stats['failed_processed'],
                'success_rate': (
                    self.processing_stats['successful_processed'] / 
                    self.processing_stats['total_processed'] * 100
                ) if self.processing_stats['total_processed'] > 0 else 0,
                'start_time': self.processing_stats['start_time'],
                'uptime_seconds': uptime_seconds,
                'processing_rate_per_hour': (
                    self.processing_stats['total_processed'] / (uptime_seconds / 3600)
                ) if uptime_seconds > 0 else 0,
                'unique_objects_processed': len(self.processed_objects)
            },
            'generated_at': current_time.isoformat()
        }

## 6.4 实际应用示例

### 6.4.1 完整的事件通知系统

```python
class MinIOEventSystem:
    """MinIO事件通知系统"""
    
    def __init__(self, client: Minio, webhook_host: str = '0.0.0.0', webhook_port: int = 5000):
        self.client = client
        self.notification_manager = MinIOEventNotification(client)
        self.webhook_server = WebhookServer(webhook_host, webhook_port)
        self.event_handlers = EventHandlers(client)
        
        # 连接组件
        self.webhook_server.set_notification_manager(self.notification_manager)
        
        # 注册默认事件处理器
        self._register_default_handlers()
    
    def _register_default_handlers(self):
        """注册默认事件处理器"""
        # 注册图片处理器
        self.notification_manager.register_event_handler(
            's3:ObjectCreated:*',
            self.event_handlers.image_processing_handler
        )
        
        # 注册备份处理器
        self.notification_manager.register_event_handler(
            's3:ObjectCreated:Put',
            self.event_handlers.backup_handler
        )
        
        # 注册审计日志处理器
        self.notification_manager.register_event_handler(
            's3:ObjectCreated:*',
            self.event_handlers.audit_log_handler
        )
        
        self.notification_manager.register_event_handler(
            's3:ObjectRemoved:*',
            self.event_handlers.audit_log_handler
        )
        
        # 注册内容分析处理器
        self.notification_manager.register_event_handler(
            's3:ObjectCreated:*',
            self.event_handlers.content_analysis_handler
        )
        
        # 注册通知处理器
        self.notification_manager.register_event_handler(
            's3:ObjectCreated:*',
            self.event_handlers.notification_handler
        )
        
        self.notification_manager.register_event_handler(
            's3:ObjectRemoved:*',
            self.event_handlers.notification_handler
        )
    
    def setup_bucket_notifications(self, bucket_name: str, webhook_url: str = None) -> Dict[str, Any]:
        """设置存储桶通知"""
        try:
            # 使用默认Webhook URL
            if webhook_url is None:
                webhook_url = f"http://{self.webhook_server.host}:{self.webhook_server.port}/webhook"
            
            # 配置通知事件
            events = [
                's3:ObjectCreated:*',
                's3:ObjectRemoved:*'
            ]
            
            # 配置Webhook通知
            result = self.notification_manager.configure_webhook_notification(
                bucket_name=bucket_name,
                webhook_url=webhook_url,
                events=events
            )
            
            if result['success']:
                print(f"✅ 存储桶通知配置成功: {bucket_name}")
                print(f"   Webhook URL: {webhook_url}")
                print(f"   监听事件: {', '.join(events)}")
            
            return result
            
        except Exception as e:
            return {
                'success': False,
                'error': f'设置存储桶通知失败: {e}',
                'bucket_name': bucket_name
            }
    
    def start_webhook_server(self, background: bool = False):
        """启动Webhook服务器"""
        if background:
            # 在后台线程中启动服务器
            server_thread = threading.Thread(
                target=self.webhook_server.start_server,
                kwargs={'debug': False, 'threaded': True}
            )
            server_thread.daemon = True
            server_thread.start()
            
            print(f"Webhook服务器已在后台启动: http://{self.webhook_server.host}:{self.webhook_server.port}")
            return server_thread
        else:
            # 在主线程中启动服务器
            self.webhook_server.start_server(debug=False, threaded=True)
    
    def test_notification_system(self, bucket_name: str, test_file_path: str = None) -> Dict[str, Any]:
        """测试通知系统"""
        try:
            print(f"\n=== 测试MinIO事件通知系统 ===")
            
            # 1. 测试Webhook端点
            webhook_url = f"http://{self.webhook_server.host}:{self.webhook_server.port}/webhook"
            print(f"\n1. 测试Webhook端点: {webhook_url}")
            
            test_result = self.notification_manager.test_webhook_endpoint(webhook_url)
            if test_result['success'] and test_result['is_reachable']:
                print(f"✅ Webhook端点测试通过")
            else:
                print(f"❌ Webhook端点测试失败: {test_result.get('error', 'Unknown error')}")
                return test_result
            
            # 2. 创建测试文件
            if test_file_path is None:
                test_file_path = 'test_notification_file.txt'
                with open(test_file_path, 'w', encoding='utf-8') as f:
                    f.write(f"测试文件内容\n创建时间: {datetime.now().isoformat()}\n")
                print(f"✅ 创建测试文件: {test_file_path}")
            
            # 3. 上传测试文件
            print(f"\n2. 上传测试文件触发事件...")
            
            object_name = f"test/{datetime.now().strftime('%Y%m%d_%H%M%S')}/notification_test.txt"
            
            self.client.fput_object(
                bucket_name=bucket_name,
                object_name=object_name,
                file_path=test_file_path,
                content_type='text/plain'
            )
            
            print(f"✅ 文件上传完成: {bucket_name}/{object_name}")
            
            # 4. 等待事件处理
            print(f"\n3. 等待事件处理...")
            time.sleep(3)  # 等待事件处理
            
            # 5. 检查处理结果
            print(f"\n4. 检查处理结果...")
            
            # 获取通知历史
            history = self.notification_manager.get_notification_history(limit=10)
            if history['success'] and history['total_events'] > 0:
                latest_event = history['events'][0]
                print(f"✅ 检测到事件: {latest_event['event_name']}")
                print(f"   对象: {latest_event['bucket_name']}/{latest_event['object_key']}")
                print(f"   处理器调用: {len(latest_event['handlers_called'])} 个")
            else:
                print(f"⚠️ 未检测到事件")
            
            # 获取处理器统计
            handler_stats = self.notification_manager.get_handler_statistics()
            if handler_stats['success']:
                print(f"\n处理器统计:")
                print(f"  - 事件类型: {handler_stats['total_event_types']} 个")
                print(f"  - 处理器总数: {handler_stats['total_handlers']} 个")
            
            # 获取处理统计
            processing_stats = self.event_handlers.get_processing_statistics()
            if processing_stats['success']:
                stats = processing_stats['processing_statistics']
                print(f"\n处理统计:")
                print(f"  - 总处理数: {stats['total_processed']}")
                print(f"  - 成功率: {stats['success_rate']:.1f}%")
                print(f"  - 处理速率: {stats['processing_rate_per_hour']:.1f} 个/小时")
            
            # 6. 删除测试文件
            print(f"\n5. 删除测试文件触发删除事件...")
            
            self.client.remove_object(bucket_name, object_name)
            print(f"✅ 文件删除完成: {bucket_name}/{object_name}")
            
            # 等待删除事件处理
            time.sleep(2)
            
            # 7. 清理本地测试文件
            try:
                import os
                os.remove(test_file_path)
                print(f"✅ 清理本地测试文件: {test_file_path}")
            except:
                pass
            
            print(f"\n=== 通知系统测试完成 ===")
            
            return {
                'success': True,
                'message': '通知系统测试完成',
                'test_results': {
                    'webhook_test': test_result,
                    'notification_history': history,
                    'handler_statistics': handler_stats,
                    'processing_statistics': processing_stats
                }
            }
            
        except Exception as e:
            return {
                'success': False,
                'error': f'通知系统测试失败: {e}',
                'bucket_name': bucket_name
            }
    
    def get_system_status(self) -> Dict[str, Any]:
        """获取系统状态"""
        try:
            # 获取各组件状态
            notification_stats = self.notification_manager.get_handler_statistics()
            processing_stats = self.event_handlers.get_processing_statistics()
            webhook_history = self.webhook_server.get_received_webhooks(limit=10)
            
            return {
                'success': True,
                'system_status': {
                    'webhook_server': {
                        'host': self.webhook_server.host,
                        'port': self.webhook_server.port,
                        'recent_webhooks': len(webhook_history)
                    },
                    'notification_manager': notification_stats,
                    'event_handlers': processing_stats,
                    'system_uptime': datetime.now().isoformat()
                }
            }
            
        except Exception as e:
            return {
                'success': False,
                'error': f'获取系统状态失败: {e}'
            }

# 使用示例
def event_notification_example():
    """事件通知示例"""
    
    print("=== MinIO事件通知示例 ===")
    
    try:
        # 连接MinIO
        client = Minio(
            endpoint="localhost:9000",
            access_key="minioadmin",
            secret_key="minioadmin",
            secure=False
        )
        
        bucket_name = "event-test"
        
        # 创建存储桶
        if not client.bucket_exists(bucket_name):
            client.make_bucket(bucket_name)
            print(f"✅ 创建存储桶: {bucket_name}")
        
        # 创建事件系统
        event_system = MinIOEventSystem(client, webhook_host='localhost', webhook_port=5001)
        
        # 在后台启动Webhook服务器
        server_thread = event_system.start_webhook_server(background=True)
        
        # 等待服务器启动
        time.sleep(2)
        
        # 设置存储桶通知
        setup_result = event_system.setup_bucket_notifications(bucket_name)
        
        if setup_result['success']:
            print(f"✅ 通知配置成功")
            
            # 测试通知系统
            test_result = event_system.test_notification_system(bucket_name)
            
            if test_result['success']:
                print(f"✅ 通知系统测试通过")
            else:
                print(f"❌ 通知系统测试失败: {test_result['error']}")
            
            # 获取系统状态
            status = event_system.get_system_status()
            if status['success']:
                print(f"\n系统状态: 正常运行")
        else:
            print(f"❌ 通知配置失败: {setup_result['error']}")
        
        print(f"\n事件通知系统运行中...")
        print(f"可以上传文件到存储桶 '{bucket_name}' 来触发事件")
        print(f"Webhook服务器: http://localhost:5001")
        print(f"按 Ctrl+C 停止")
        
        # 保持运行
        try:
            while True:
                time.sleep(10)
                
                # 定期显示状态
                status = event_system.get_system_status()
                if status['success']:
                    webhook_count = status['system_status']['webhook_server']['recent_webhooks']
                    if webhook_count > 0:
                        print(f"最近收到 {webhook_count} 个Webhook请求")
                
        except KeyboardInterrupt:
            print(f"\n停止事件通知系统...")
        
        print("\n=== 事件通知示例完成 ===")
        
    except Exception as e:
        print(f"❌ 示例执行失败: {e}")

if __name__ == "__main__":
    event_notification_example()

6.5 总结

本章详细介绍了MinIO的事件通知和Webhook集成,主要内容包括:

6.5.1 核心功能

  1. 事件通知管理

    • 支持的事件类型和通知目标
    • Webhook通知配置和管理
    • 通知历史记录和统计
  2. Webhook服务器

    • Flask-based Webhook接收器
    • 请求验证和安全机制
    • 状态监控和历史记录
  3. 事件处理器

    • 图片处理自动化
    • 数据备份和同步
    • 审计日志记录
    • 内容分析和安全扫描
    • 实时通知发送
  4. 完整事件系统

    • 组件集成和协调
    • 系统状态监控
    • 自动化测试和验证

6.5.2 技术特点

  • 实时性: 事件即时触发和处理
  • 可扩展: 支持自定义事件处理器
  • 可靠性: 错误处理和重试机制
  • 监控性: 完整的状态监控和统计
  • 安全性: 请求验证和签名机制

6.5.3 应用场景

  1. 自动化工作流: 文件上传后的自动处理
  2. 实时监控: 存储操作的实时监控和告警
  3. 数据同步: 跨系统的数据同步和备份
  4. 内容管理: 自动化的内容分析和处理
  5. 审计合规: 完整的操作审计和日志记录

下一章我们将介绍MinIO的集群部署和高可用配置,学习如何构建生产级的MinIO存储集群。