11.1 部署环境配置

11.1.1 生产环境要求

# config/production.py
import os
from datetime import timedelta

class ProductionConfig:
    """生产环境配置"""
    
    # 基础配置
    SECRET_KEY = os.environ.get('SECRET_KEY') or 'production-secret-key'
    DEBUG = False
    TESTING = False
    
    # 数据库配置
    SQLALCHEMY_DATABASE_URI = os.environ.get('DATABASE_URL') or \
        'postgresql://user:password@localhost/flask_app'
    SQLALCHEMY_TRACK_MODIFICATIONS = False
    SQLALCHEMY_ENGINE_OPTIONS = {
        'pool_size': 20,
        'pool_recycle': 3600,
        'pool_pre_ping': True,
        'max_overflow': 30
    }
    
    # Redis配置
    REDIS_URL = os.environ.get('REDIS_URL') or 'redis://localhost:6379/0'
    
    # 缓存配置
    CACHE_TYPE = 'redis'
    CACHE_REDIS_URL = REDIS_URL
    CACHE_DEFAULT_TIMEOUT = 300
    
    # 会话配置
    SESSION_TYPE = 'redis'
    SESSION_REDIS = REDIS_URL
    SESSION_PERMANENT = False
    SESSION_USE_SIGNER = True
    SESSION_KEY_PREFIX = 'flask_app:'
    PERMANENT_SESSION_LIFETIME = timedelta(hours=24)
    
    # 邮件配置
    MAIL_SERVER = os.environ.get('MAIL_SERVER')
    MAIL_PORT = int(os.environ.get('MAIL_PORT') or 587)
    MAIL_USE_TLS = os.environ.get('MAIL_USE_TLS', 'true').lower() in ['true', 'on', '1']
    MAIL_USERNAME = os.environ.get('MAIL_USERNAME')
    MAIL_PASSWORD = os.environ.get('MAIL_PASSWORD')
    MAIL_DEFAULT_SENDER = os.environ.get('MAIL_DEFAULT_SENDER')
    
    # Celery配置
    CELERY_BROKER_URL = os.environ.get('CELERY_BROKER_URL') or REDIS_URL
    CELERY_RESULT_BACKEND = os.environ.get('CELERY_RESULT_BACKEND') or REDIS_URL
    
    # 文件上传配置
    UPLOAD_FOLDER = os.environ.get('UPLOAD_FOLDER') or '/var/uploads'
    MAX_CONTENT_LENGTH = 16 * 1024 * 1024  # 16MB
    
    # 安全配置
    WTF_CSRF_ENABLED = True
    WTF_CSRF_TIME_LIMIT = 3600
    
    # 日志配置
    LOG_LEVEL = os.environ.get('LOG_LEVEL') or 'INFO'
    LOG_FILE = os.environ.get('LOG_FILE') or '/var/log/flask_app.log'
    
    # 监控配置
    SENTRY_DSN = os.environ.get('SENTRY_DSN')
    
    # 性能配置
    SEND_FILE_MAX_AGE_DEFAULT = timedelta(hours=12)
    
    @staticmethod
    def init_app(app):
        """初始化生产环境配置"""
        # 配置日志
        import logging
        from logging.handlers import RotatingFileHandler, SysLogHandler
        
        # 文件日志
        if not os.path.exists(os.path.dirname(ProductionConfig.LOG_FILE)):
            os.makedirs(os.path.dirname(ProductionConfig.LOG_FILE))
        
        file_handler = RotatingFileHandler(
            ProductionConfig.LOG_FILE,
            maxBytes=10 * 1024 * 1024,  # 10MB
            backupCount=10
        )
        
        file_handler.setFormatter(logging.Formatter(
            '%(asctime)s %(levelname)s: %(message)s [in %(pathname)s:%(lineno)d]'
        ))
        
        file_handler.setLevel(getattr(logging, ProductionConfig.LOG_LEVEL))
        app.logger.addHandler(file_handler)
        
        # 系统日志
        syslog_handler = SysLogHandler()
        syslog_handler.setLevel(logging.WARNING)
        app.logger.addHandler(syslog_handler)
        
        app.logger.setLevel(getattr(logging, ProductionConfig.LOG_LEVEL))
        app.logger.info('Flask application startup')

11.1.2 环境变量管理

# .env.production
# 数据库配置
DATABASE_URL=postgresql://username:password@localhost:5432/flask_app

# Redis配置
REDIS_URL=redis://localhost:6379/0

# 应用配置
SECRET_KEY=your-super-secret-key-here
FLASK_ENV=production
FLASK_APP=app.py

# 邮件配置
MAIL_SERVER=smtp.gmail.com
MAIL_PORT=587
MAIL_USE_TLS=true
MAIL_USERNAME=your-email@gmail.com
MAIL_PASSWORD=your-app-password
MAIL_DEFAULT_SENDER=your-email@gmail.com

# Celery配置
CELERY_BROKER_URL=redis://localhost:6379/1
CELERY_RESULT_BACKEND=redis://localhost:6379/1

# 监控配置
SENTRY_DSN=https://your-sentry-dsn@sentry.io/project-id

# 文件上传
UPLOAD_FOLDER=/var/uploads

# 日志配置
LOG_LEVEL=INFO
LOG_FILE=/var/log/flask_app.log
# scripts/env_manager.py
import os
import sys
from pathlib import Path

class EnvironmentManager:
    """环境变量管理器"""
    
    def __init__(self, env_file=None):
        self.env_file = env_file or '.env'
        self.required_vars = [
            'SECRET_KEY',
            'DATABASE_URL',
            'REDIS_URL'
        ]
        self.optional_vars = {
            'MAIL_SERVER': None,
            'SENTRY_DSN': None,
            'LOG_LEVEL': 'INFO',
            'UPLOAD_FOLDER': '/tmp/uploads'
        }
    
    def load_env(self):
        """加载环境变量"""
        if os.path.exists(self.env_file):
            with open(self.env_file, 'r') as f:
                for line in f:
                    line = line.strip()
                    if line and not line.startswith('#'):
                        key, value = line.split('=', 1)
                        os.environ[key] = value
    
    def validate_env(self):
        """验证环境变量"""
        missing_vars = []
        
        for var in self.required_vars:
            if not os.environ.get(var):
                missing_vars.append(var)
        
        if missing_vars:
            print(f"错误: 缺少必需的环境变量: {', '.join(missing_vars)}")
            sys.exit(1)
        
        # 设置可选变量的默认值
        for var, default in self.optional_vars.items():
            if not os.environ.get(var) and default:
                os.environ[var] = default
    
    def generate_env_template(self, output_file='.env.template'):
        """生成环境变量模板"""
        template_content = [
            '# Flask应用环境变量配置',
            '# 复制此文件为.env并填入实际值',
            '',
            '# 必需变量'
        ]
        
        for var in self.required_vars:
            template_content.append(f'{var}=your-{var.lower().replace("_", "-")}-here')
        
        template_content.extend([
            '',
            '# 可选变量'
        ])
        
        for var, default in self.optional_vars.items():
            value = default or f'your-{var.lower().replace("_", "-")}-here'
            template_content.append(f'{var}={value}')
        
        with open(output_file, 'w') as f:
            f.write('\n'.join(template_content))
        
        print(f"环境变量模板已生成: {output_file}")
    
    def check_security(self):
        """检查安全配置"""
        warnings = []
        
        # 检查SECRET_KEY
        secret_key = os.environ.get('SECRET_KEY')
        if not secret_key or len(secret_key) < 32:
            warnings.append('SECRET_KEY应该至少32个字符长')
        
        if secret_key and secret_key in ['dev', 'development', 'test']:
            warnings.append('SECRET_KEY不应使用默认值')
        
        # 检查数据库URL
        db_url = os.environ.get('DATABASE_URL')
        if db_url and 'localhost' in db_url:
            warnings.append('生产环境不应使用localhost数据库')
        
        # 检查调试模式
        if os.environ.get('FLASK_ENV') == 'development':
            warnings.append('生产环境不应启用调试模式')
        
        if warnings:
            print('安全警告:')
            for warning in warnings:
                print(f'  - {warning}')
        else:
            print('安全检查通过')
        
        return len(warnings) == 0

if __name__ == '__main__':
    import argparse
    
    parser = argparse.ArgumentParser(description='环境变量管理工具')
    parser.add_argument('--validate', action='store_true', help='验证环境变量')
    parser.add_argument('--template', action='store_true', help='生成环境变量模板')
    parser.add_argument('--security', action='store_true', help='安全检查')
    parser.add_argument('--env-file', default='.env', help='环境变量文件路径')
    
    args = parser.parse_args()
    
    manager = EnvironmentManager(args.env_file)
    
    if args.template:
        manager.generate_env_template()
    
    if args.validate or args.security:
        manager.load_env()
        
        if args.validate:
            manager.validate_env()
            print('环境变量验证通过')
        
        if args.security:
            manager.check_security()

11.1.3 系统依赖安装

#!/bin/bash
# scripts/install_system_deps.sh

# 更新系统包
sudo apt-get update
sudo apt-get upgrade -y

# 安装基础依赖
sudo apt-get install -y \
    python3 \
    python3-pip \
    python3-venv \
    python3-dev \
    build-essential \
    libpq-dev \
    libssl-dev \
    libffi-dev \
    nginx \
    supervisor \
    redis-server \
    postgresql \
    postgresql-contrib \
    git \
    curl \
    wget \
    unzip

# 安装Node.js (用于前端构建)
curl -fsSL https://deb.nodesource.com/setup_18.x | sudo -E bash -
sudo apt-get install -y nodejs

# 配置PostgreSQL
sudo -u postgres createuser --interactive
sudo -u postgres createdb flask_app

# 配置Redis
sudo systemctl enable redis-server
sudo systemctl start redis-server

# 配置防火墙
sudo ufw allow 22
sudo ufw allow 80
sudo ufw allow 443
sudo ufw --force enable

echo "系统依赖安装完成"
# scripts/setup_app.py
import os
import subprocess
import sys
from pathlib import Path

class AppSetup:
    """应用部署设置"""
    
    def __init__(self, app_dir='/var/www/flask_app'):
        self.app_dir = Path(app_dir)
        self.venv_dir = self.app_dir / 'venv'
        self.user = 'www-data'
        self.group = 'www-data'
    
    def create_directories(self):
        """创建必要的目录"""
        directories = [
            self.app_dir,
            self.app_dir / 'logs',
            self.app_dir / 'uploads',
            self.app_dir / 'static',
            self.app_dir / 'instance',
            Path('/var/log/flask_app')
        ]
        
        for directory in directories:
            directory.mkdir(parents=True, exist_ok=True)
            print(f"创建目录: {directory}")
    
    def setup_virtual_environment(self):
        """设置虚拟环境"""
        if not self.venv_dir.exists():
            subprocess.run([
                sys.executable, '-m', 'venv', str(self.venv_dir)
            ], check=True)
            print(f"创建虚拟环境: {self.venv_dir}")
        
        # 升级pip
        pip_path = self.venv_dir / 'bin' / 'pip'
        subprocess.run([
            str(pip_path), 'install', '--upgrade', 'pip'
        ], check=True)
        
        # 安装依赖
        if (self.app_dir / 'requirements.txt').exists():
            subprocess.run([
                str(pip_path), 'install', '-r', 
                str(self.app_dir / 'requirements.txt')
            ], check=True)
            print("安装Python依赖完成")
    
    def set_permissions(self):
        """设置文件权限"""
        # 设置应用目录所有者
        subprocess.run([
            'sudo', 'chown', '-R', f'{self.user}:{self.group}', 
            str(self.app_dir)
        ], check=True)
        
        # 设置日志目录权限
        subprocess.run([
            'sudo', 'chown', '-R', f'{self.user}:{self.group}', 
            '/var/log/flask_app'
        ], check=True)
        
        # 设置上传目录权限
        subprocess.run([
            'sudo', 'chmod', '755', str(self.app_dir / 'uploads')
        ], check=True)
        
        print("文件权限设置完成")
    
    def create_systemd_service(self):
        """创建systemd服务文件"""
        service_content = f"""[Unit]
Description=Flask App
After=network.target

[Service]
User={self.user}
Group={self.group}
WorkingDirectory={self.app_dir}
Environment=PATH={self.venv_dir}/bin
EnvironmentFile={self.app_dir}/.env
ExecStart={self.venv_dir}/bin/gunicorn --bind 127.0.0.1:5000 --workers 4 app:app
Restart=always

[Install]
WantedBy=multi-user.target
"""
        
        service_file = Path('/etc/systemd/system/flask-app.service')
        with open(service_file, 'w') as f:
            f.write(service_content)
        
        # 重新加载systemd并启用服务
        subprocess.run(['sudo', 'systemctl', 'daemon-reload'], check=True)
        subprocess.run(['sudo', 'systemctl', 'enable', 'flask-app'], check=True)
        
        print("Systemd服务创建完成")
    
    def setup_nginx(self):
        """配置Nginx"""
        nginx_config = f"""server {{
    listen 80;
    server_name your-domain.com;
    
    location / {{
        proxy_pass http://127.0.0.1:5000;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;
    }}
    
    location /static {{
        alias {self.app_dir}/static;
        expires 1y;
        add_header Cache-Control "public, immutable";
    }}
    
    location /uploads {{
        alias {self.app_dir}/uploads;
        expires 1d;
    }}
}}
"""
        
        nginx_file = Path('/etc/nginx/sites-available/flask-app')
        with open(nginx_file, 'w') as f:
            f.write(nginx_config)
        
        # 启用站点
        sites_enabled = Path('/etc/nginx/sites-enabled/flask-app')
        if not sites_enabled.exists():
            sites_enabled.symlink_to(nginx_file)
        
        # 测试配置并重启Nginx
        subprocess.run(['sudo', 'nginx', '-t'], check=True)
        subprocess.run(['sudo', 'systemctl', 'restart', 'nginx'], check=True)
        
        print("Nginx配置完成")
    
    def run_setup(self):
        """运行完整设置"""
        print("开始应用部署设置...")
        
        self.create_directories()
        self.setup_virtual_environment()
        self.set_permissions()
        self.create_systemd_service()
        self.setup_nginx()
        
        print("应用部署设置完成!")
        print(f"应用目录: {self.app_dir}")
        print("启动服务: sudo systemctl start flask-app")
        print("查看状态: sudo systemctl status flask-app")

if __name__ == '__main__':
    import argparse
    
    parser = argparse.ArgumentParser(description='Flask应用部署设置')
    parser.add_argument('--app-dir', default='/var/www/flask_app', 
                       help='应用目录路径')
    
    args = parser.parse_args()
    
    setup = AppSetup(args.app_dir)
    setup.run_setup()

11.2 Docker容器化

11.2.1 Dockerfile配置

# Dockerfile
FROM python:3.11-slim

# 设置工作目录
WORKDIR /app

# 设置环境变量
ENV PYTHONDONTWRITEBYTECODE=1 \
    PYTHONUNBUFFERED=1 \
    PIP_NO_CACHE_DIR=1 \
    PIP_DISABLE_PIP_VERSION_CHECK=1

# 安装系统依赖
RUN apt-get update && apt-get install -y \
    build-essential \
    libpq-dev \
    curl \
    && rm -rf /var/lib/apt/lists/*

# 创建非root用户
RUN groupadd -r appuser && useradd -r -g appuser appuser

# 复制requirements文件
COPY requirements.txt .

# 安装Python依赖
RUN pip install --no-cache-dir -r requirements.txt

# 复制应用代码
COPY . .

# 创建必要的目录
RUN mkdir -p /app/logs /app/uploads /app/instance && \
    chown -R appuser:appuser /app

# 切换到非root用户
USER appuser

# 暴露端口
EXPOSE 5000

# 健康检查
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
    CMD curl -f http://localhost:5000/health || exit 1

# 启动命令
CMD ["gunicorn", "--bind", "0.0.0.0:5000", "--workers", "4", "--timeout", "120", "app:app"]
# Dockerfile.multi-stage
# 多阶段构建Dockerfile

# 构建阶段
FROM python:3.11-slim as builder

WORKDIR /app

# 安装构建依赖
RUN apt-get update && apt-get install -y \
    build-essential \
    libpq-dev \
    && rm -rf /var/lib/apt/lists/*

# 复制requirements并安装依赖
COPY requirements.txt .
RUN pip install --user --no-cache-dir -r requirements.txt

# 运行阶段
FROM python:3.11-slim

WORKDIR /app

# 安装运行时依赖
RUN apt-get update && apt-get install -y \
    libpq5 \
    curl \
    && rm -rf /var/lib/apt/lists/*

# 创建用户
RUN groupadd -r appuser && useradd -r -g appuser appuser

# 从构建阶段复制Python包
COPY --from=builder /root/.local /home/appuser/.local

# 复制应用代码
COPY --chown=appuser:appuser . .

# 创建目录
RUN mkdir -p logs uploads instance && \
    chown -R appuser:appuser /app

# 设置PATH
ENV PATH=/home/appuser/.local/bin:$PATH

USER appuser

EXPOSE 5000

HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
    CMD curl -f http://localhost:5000/health || exit 1

CMD ["gunicorn", "--bind", "0.0.0.0:5000", "--workers", "4", "app:app"]

11.2.2 Docker Compose配置

# docker-compose.yml
version: '3.8'

services:
  web:
    build: .
    ports:
      - "5000:5000"
    environment:
      - FLASK_ENV=production
      - DATABASE_URL=postgresql://postgres:password@db:5432/flask_app
      - REDIS_URL=redis://redis:6379/0
      - CELERY_BROKER_URL=redis://redis:6379/1
      - CELERY_RESULT_BACKEND=redis://redis:6379/1
    volumes:
      - ./uploads:/app/uploads
      - ./logs:/app/logs
    depends_on:
      - db
      - redis
    restart: unless-stopped
    networks:
      - app-network
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:5000/health"]
      interval: 30s
      timeout: 10s
      retries: 3
      start_period: 40s

  worker:
    build: .
    command: celery -A app.celery worker --loglevel=info
    environment:
      - FLASK_ENV=production
      - DATABASE_URL=postgresql://postgres:password@db:5432/flask_app
      - REDIS_URL=redis://redis:6379/0
      - CELERY_BROKER_URL=redis://redis:6379/1
      - CELERY_RESULT_BACKEND=redis://redis:6379/1
    volumes:
      - ./uploads:/app/uploads
      - ./logs:/app/logs
    depends_on:
      - db
      - redis
    restart: unless-stopped
    networks:
      - app-network

  beat:
    build: .
    command: celery -A app.celery beat --loglevel=info
    environment:
      - FLASK_ENV=production
      - DATABASE_URL=postgresql://postgres:password@db:5432/flask_app
      - REDIS_URL=redis://redis:6379/0
      - CELERY_BROKER_URL=redis://redis:6379/1
      - CELERY_RESULT_BACKEND=redis://redis:6379/1
    volumes:
      - ./logs:/app/logs
    depends_on:
      - db
      - redis
    restart: unless-stopped
    networks:
      - app-network

  db:
    image: postgres:13
    environment:
      - POSTGRES_DB=flask_app
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=password
    volumes:
      - postgres_data:/var/lib/postgresql/data
      - ./init.sql:/docker-entrypoint-initdb.d/init.sql
    ports:
      - "5432:5432"
    restart: unless-stopped
    networks:
      - app-network
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U postgres"]
      interval: 10s
      timeout: 5s
      retries: 5

  redis:
    image: redis:6-alpine
    ports:
      - "6379:6379"
    volumes:
      - redis_data:/data
    restart: unless-stopped
    networks:
      - app-network
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 10s
      timeout: 5s
      retries: 5

  nginx:
    image: nginx:alpine
    ports:
      - "80:80"
      - "443:443"
    volumes:
      - ./nginx.conf:/etc/nginx/nginx.conf
      - ./ssl:/etc/nginx/ssl
      - ./uploads:/var/www/uploads
    depends_on:
      - web
    restart: unless-stopped
    networks:
      - app-network

volumes:
  postgres_data:
  redis_data:

networks:
  app-network:
    driver: bridge
# docker-compose.prod.yml
# 生产环境Docker Compose配置
version: '3.8'

services:
  web:
    build:
      context: .
      dockerfile: Dockerfile.multi-stage
    environment:
      - FLASK_ENV=production
      - DATABASE_URL=postgresql://postgres:${POSTGRES_PASSWORD}@db:5432/flask_app
      - REDIS_URL=redis://redis:6379/0
      - SECRET_KEY=${SECRET_KEY}
      - SENTRY_DSN=${SENTRY_DSN}
    env_file:
      - .env.production
    volumes:
      - uploads_data:/app/uploads
      - logs_data:/app/logs
    deploy:
      replicas: 3
      resources:
        limits:
          cpus: '0.5'
          memory: 512M
        reservations:
          cpus: '0.25'
          memory: 256M
      restart_policy:
        condition: on-failure
        delay: 5s
        max_attempts: 3
    depends_on:
      - db
      - redis
    networks:
      - app-network
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:5000/health"]
      interval: 30s
      timeout: 10s
      retries: 3
      start_period: 40s

  worker:
    build:
      context: .
      dockerfile: Dockerfile.multi-stage
    command: celery -A app.celery worker --loglevel=info --concurrency=4
    environment:
      - FLASK_ENV=production
      - DATABASE_URL=postgresql://postgres:${POSTGRES_PASSWORD}@db:5432/flask_app
      - REDIS_URL=redis://redis:6379/0
    env_file:
      - .env.production
    volumes:
      - uploads_data:/app/uploads
      - logs_data:/app/logs
    deploy:
      replicas: 2
      resources:
        limits:
          cpus: '0.5'
          memory: 512M
    depends_on:
      - db
      - redis
    networks:
      - app-network

  db:
    image: postgres:13
    environment:
      - POSTGRES_DB=flask_app
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=${POSTGRES_PASSWORD}
    volumes:
      - postgres_data:/var/lib/postgresql/data
      - ./backups:/backups
    deploy:
      resources:
        limits:
          cpus: '1'
          memory: 1G
    networks:
      - app-network
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U postgres"]
      interval: 10s
      timeout: 5s
      retries: 5

  redis:
    image: redis:6-alpine
    command: redis-server --appendonly yes --maxmemory 256mb --maxmemory-policy allkeys-lru
    volumes:
      - redis_data:/data
    deploy:
      resources:
        limits:
          cpus: '0.5'
          memory: 512M
    networks:
      - app-network
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 10s
      timeout: 5s
      retries: 5

  nginx:
    image: nginx:alpine
    ports:
      - "80:80"
      - "443:443"
    volumes:
      - ./nginx/nginx.conf:/etc/nginx/nginx.conf
      - ./nginx/ssl:/etc/nginx/ssl
      - uploads_data:/var/www/uploads
    deploy:
      resources:
        limits:
          cpus: '0.25'
          memory: 128M
    depends_on:
      - web
    networks:
      - app-network

volumes:
  postgres_data:
  redis_data:
  uploads_data:
  logs_data:

networks:
  app-network:
    driver: overlay
    attachable: true

11.2.3 容器管理脚本

# scripts/docker_manager.py
import subprocess
import sys
import time
import json
from pathlib import Path

class DockerManager:
    """Docker容器管理器"""
    
    def __init__(self, compose_file='docker-compose.yml'):
        self.compose_file = compose_file
        self.project_name = 'flask-app'
    
    def run_command(self, command, check=True):
        """执行命令"""
        print(f"执行命令: {' '.join(command)}")
        result = subprocess.run(command, capture_output=True, text=True)
        
        if check and result.returncode != 0:
            print(f"命令执行失败: {result.stderr}")
            sys.exit(1)
        
        return result
    
    def build(self, no_cache=False):
        """构建镜像"""
        command = ['docker-compose', '-f', self.compose_file, 'build']
        if no_cache:
            command.append('--no-cache')
        
        self.run_command(command)
        print("镜像构建完成")
    
    def up(self, detach=True, scale=None):
        """启动服务"""
        command = ['docker-compose', '-f', self.compose_file, 'up']
        
        if detach:
            command.append('-d')
        
        if scale:
            for service, count in scale.items():
                command.extend(['--scale', f'{service}={count}'])
        
        self.run_command(command)
        print("服务启动完成")
    
    def down(self, volumes=False):
        """停止服务"""
        command = ['docker-compose', '-f', self.compose_file, 'down']
        
        if volumes:
            command.append('-v')
        
        self.run_command(command)
        print("服务停止完成")
    
    def restart(self, service=None):
        """重启服务"""
        command = ['docker-compose', '-f', self.compose_file, 'restart']
        
        if service:
            command.append(service)
        
        self.run_command(command)
        print(f"服务重启完成: {service or '所有服务'}")
    
    def logs(self, service=None, follow=False, tail=100):
        """查看日志"""
        command = ['docker-compose', '-f', self.compose_file, 'logs']
        
        if follow:
            command.append('-f')
        
        if tail:
            command.extend(['--tail', str(tail)])
        
        if service:
            command.append(service)
        
        self.run_command(command, check=False)
    
    def exec_command(self, service, command):
        """在容器中执行命令"""
        exec_cmd = ['docker-compose', '-f', self.compose_file, 'exec', service] + command
        self.run_command(exec_cmd, check=False)
    
    def status(self):
        """查看服务状态"""
        result = self.run_command([
            'docker-compose', '-f', self.compose_file, 'ps', '--format', 'json'
        ])
        
        try:
            services = json.loads(result.stdout)
            print("服务状态:")
            for service in services:
                print(f"  {service['Service']}: {service['State']} ({service['Status']})")
        except json.JSONDecodeError:
            print("无法解析服务状态")
    
    def health_check(self):
        """健康检查"""
        result = self.run_command([
            'docker-compose', '-f', self.compose_file, 'ps', '--format', 'json'
        ])
        
        try:
            services = json.loads(result.stdout)
            healthy_services = []
            unhealthy_services = []
            
            for service in services:
                if 'healthy' in service['Status'].lower():
                    healthy_services.append(service['Service'])
                elif 'unhealthy' in service['Status'].lower():
                    unhealthy_services.append(service['Service'])
            
            print(f"健康服务: {', '.join(healthy_services) or '无'}")
            print(f"不健康服务: {', '.join(unhealthy_services) or '无'}")
            
            return len(unhealthy_services) == 0
        except json.JSONDecodeError:
            print("无法检查服务健康状态")
            return False
    
    def backup_data(self, backup_dir='./backups'):
        """备份数据"""
        backup_path = Path(backup_dir)
        backup_path.mkdir(exist_ok=True)
        
        timestamp = time.strftime('%Y%m%d_%H%M%S')
        
        # 备份数据库
        db_backup = backup_path / f'db_backup_{timestamp}.sql'
        self.run_command([
            'docker-compose', '-f', self.compose_file, 'exec', '-T', 'db',
            'pg_dump', '-U', 'postgres', 'flask_app'
        ])
        
        # 备份上传文件
        uploads_backup = backup_path / f'uploads_backup_{timestamp}.tar.gz'
        self.run_command([
            'docker', 'run', '--rm', '-v', 'flask-app_uploads_data:/data',
            '-v', f'{backup_path.absolute()}:/backup',
            'alpine', 'tar', 'czf', f'/backup/uploads_backup_{timestamp}.tar.gz', '/data'
        ])
        
        print(f"备份完成: {backup_path}")
    
    def deploy(self, environment='production'):
        """部署应用"""
        print(f"开始部署到 {environment} 环境...")
        
        # 构建镜像
        self.build(no_cache=True)
        
        # 停止旧服务
        self.down()
        
        # 启动新服务
        if environment == 'production':
            self.up(scale={'web': 3, 'worker': 2})
        else:
            self.up()
        
        # 等待服务启动
        print("等待服务启动...")
        time.sleep(30)
        
        # 健康检查
        if self.health_check():
            print("部署成功!")
        else:
            print("部署可能存在问题,请检查服务状态")
            self.logs(tail=50)

if __name__ == '__main__':
    import argparse
    
    parser = argparse.ArgumentParser(description='Docker容器管理工具')
    parser.add_argument('--compose-file', '-f', default='docker-compose.yml',
                       help='Docker Compose文件路径')
    
    subparsers = parser.add_subparsers(dest='command', help='可用命令')
    
    # build命令
    build_parser = subparsers.add_parser('build', help='构建镜像')
    build_parser.add_argument('--no-cache', action='store_true', help='不使用缓存')
    
    # up命令
    up_parser = subparsers.add_parser('up', help='启动服务')
    up_parser.add_argument('--no-detach', action='store_true', help='前台运行')
    
    # down命令
    down_parser = subparsers.add_parser('down', help='停止服务')
    down_parser.add_argument('--volumes', action='store_true', help='删除数据卷')
    
    # restart命令
    restart_parser = subparsers.add_parser('restart', help='重启服务')
    restart_parser.add_argument('service', nargs='?', help='服务名称')
    
    # logs命令
    logs_parser = subparsers.add_parser('logs', help='查看日志')
    logs_parser.add_argument('service', nargs='?', help='服务名称')
    logs_parser.add_argument('-f', '--follow', action='store_true', help='跟踪日志')
    logs_parser.add_argument('--tail', type=int, default=100, help='显示最后N行')
    
    # exec命令
    exec_parser = subparsers.add_parser('exec', help='执行命令')
    exec_parser.add_argument('service', help='服务名称')
    exec_parser.add_argument('command', nargs='+', help='要执行的命令')
    
    # status命令
    subparsers.add_parser('status', help='查看状态')
    
    # health命令
    subparsers.add_parser('health', help='健康检查')
    
    # backup命令
    backup_parser = subparsers.add_parser('backup', help='备份数据')
    backup_parser.add_argument('--dir', default='./backups', help='备份目录')
    
    # deploy命令
    deploy_parser = subparsers.add_parser('deploy', help='部署应用')
    deploy_parser.add_argument('--env', default='production', help='部署环境')
    
    args = parser.parse_args()
    
    if not args.command:
        parser.print_help()
        sys.exit(1)
    
    manager = DockerManager(args.compose_file)
    
    if args.command == 'build':
        manager.build(no_cache=args.no_cache)
    elif args.command == 'up':
        manager.up(detach=not args.no_detach)
    elif args.command == 'down':
        manager.down(volumes=args.volumes)
    elif args.command == 'restart':
        manager.restart(args.service)
    elif args.command == 'logs':
        manager.logs(args.service, follow=args.follow, tail=args.tail)
    elif args.command == 'exec':
        manager.exec_command(args.service, args.command)
    elif args.command == 'status':
        manager.status()
    elif args.command == 'health':
        manager.health_check()
    elif args.command == 'backup':
        manager.backup_data(args.dir)
    elif args.command == 'deploy':
        manager.deploy(args.env)

11.3 云平台部署

11.3.1 AWS部署

# scripts/aws_deploy.py
import boto3
import json
import time
from pathlib import Path

class AWSDeployer:
    """AWS部署管理器"""
    
    def __init__(self, region='us-west-2'):
        self.region = region
        self.ec2 = boto3.client('ec2', region_name=region)
        self.ecs = boto3.client('ecs', region_name=region)
        self.ecr = boto3.client('ecr', region_name=region)
        self.rds = boto3.client('rds', region_name=region)
        self.elasticache = boto3.client('elasticache', region_name=region)
        self.elbv2 = boto3.client('elbv2', region_name=region)
    
    def create_vpc_infrastructure(self):
        """创建VPC基础设施"""
        # 创建VPC
        vpc_response = self.ec2.create_vpc(
            CidrBlock='10.0.0.0/16',
            TagSpecifications=[
                {
                    'ResourceType': 'vpc',
                    'Tags': [
                        {'Key': 'Name', 'Value': 'flask-app-vpc'}
                    ]
                }
            ]
        )
        vpc_id = vpc_response['Vpc']['VpcId']
        
        # 创建子网
        public_subnet = self.ec2.create_subnet(
            VpcId=vpc_id,
            CidrBlock='10.0.1.0/24',
            AvailabilityZone=f'{self.region}a'
        )
        
        private_subnet = self.ec2.create_subnet(
            VpcId=vpc_id,
            CidrBlock='10.0.2.0/24',
            AvailabilityZone=f'{self.region}b'
        )
        
        # 创建互联网网关
        igw_response = self.ec2.create_internet_gateway()
        igw_id = igw_response['InternetGateway']['InternetGatewayId']
        
        # 附加到VPC
        self.ec2.attach_internet_gateway(
            InternetGatewayId=igw_id,
            VpcId=vpc_id
        )
        
        print(f"VPC基础设施创建完成: {vpc_id}")
        return {
            'vpc_id': vpc_id,
            'public_subnet_id': public_subnet['Subnet']['SubnetId'],
            'private_subnet_id': private_subnet['Subnet']['SubnetId'],
            'igw_id': igw_id
        }
    
    def create_rds_instance(self, vpc_id, subnet_ids):
        """创建RDS数据库实例"""
        # 创建数据库子网组
        subnet_group_response = self.rds.create_db_subnet_group(
            DBSubnetGroupName='flask-app-subnet-group',
            DBSubnetGroupDescription='Flask App Database Subnet Group',
            SubnetIds=subnet_ids
        )
        
        # 创建安全组
        security_group = self.ec2.create_security_group(
            GroupName='flask-app-db-sg',
            Description='Flask App Database Security Group',
            VpcId=vpc_id
        )
        
        sg_id = security_group['GroupId']
        
        # 添加入站规则
        self.ec2.authorize_security_group_ingress(
            GroupId=sg_id,
            IpPermissions=[
                {
                    'IpProtocol': 'tcp',
                    'FromPort': 5432,
                    'ToPort': 5432,
                    'IpRanges': [{'CidrIp': '10.0.0.0/16'}]
                }
            ]
        )
        
        # 创建RDS实例
        db_response = self.rds.create_db_instance(
            DBInstanceIdentifier='flask-app-db',
            DBInstanceClass='db.t3.micro',
            Engine='postgres',
            MasterUsername='postgres',
            MasterUserPassword='your-secure-password',
            AllocatedStorage=20,
            DBSubnetGroupName='flask-app-subnet-group',
            VpcSecurityGroupIds=[sg_id],
            BackupRetentionPeriod=7,
            MultiAZ=False,
            StorageEncrypted=True
        )
        
        print("RDS实例创建中...")
        return db_response['DBInstance']['DBInstanceIdentifier']
    
    def create_elasticache_cluster(self, vpc_id, subnet_ids):
        """创建ElastiCache Redis集群"""
        # 创建缓存子网组
        subnet_group_response = self.elasticache.create_cache_subnet_group(
            CacheSubnetGroupName='flask-app-cache-subnet-group',
            CacheSubnetGroupDescription='Flask App Cache Subnet Group',
            SubnetIds=subnet_ids
        )
        
        # 创建安全组
        security_group = self.ec2.create_security_group(
            GroupName='flask-app-cache-sg',
            Description='Flask App Cache Security Group',
            VpcId=vpc_id
        )
        
        sg_id = security_group['GroupId']
        
        # 添加入站规则
        self.ec2.authorize_security_group_ingress(
            GroupId=sg_id,
            IpPermissions=[
                {
                    'IpProtocol': 'tcp',
                    'FromPort': 6379,
                    'ToPort': 6379,
                    'IpRanges': [{'CidrIp': '10.0.0.0/16'}]
                }
            ]
        )
        
        # 创建Redis集群
        cache_response = self.elasticache.create_cache_cluster(
            CacheClusterId='flask-app-redis',
            CacheNodeType='cache.t3.micro',
            Engine='redis',
            NumCacheNodes=1,
            CacheSubnetGroupName='flask-app-cache-subnet-group',
            SecurityGroupIds=[sg_id]
        )
        
        print("ElastiCache集群创建中...")
        return cache_response['CacheCluster']['CacheClusterId']
    
    def create_ecs_cluster(self):
        """创建ECS集群"""
        cluster_response = self.ecs.create_cluster(
            clusterName='flask-app-cluster',
            capacityProviders=['FARGATE'],
            defaultCapacityProviderStrategy=[
                {
                    'capacityProvider': 'FARGATE',
                    'weight': 1
                }
            ]
        )
        
        print(f"ECS集群创建完成: {cluster_response['cluster']['clusterName']}")
        return cluster_response['cluster']['clusterArn']
    
    def create_ecr_repository(self):
        """创建ECR仓库"""
        try:
            repo_response = self.ecr.create_repository(
                repositoryName='flask-app',
                imageScanningConfiguration={'scanOnPush': True},
                encryptionConfiguration={'encryptionType': 'AES256'}
            )
            
            repo_uri = repo_response['repository']['repositoryUri']
            print(f"ECR仓库创建完成: {repo_uri}")
            return repo_uri
        except self.ecr.exceptions.RepositoryAlreadyExistsException:
            # 仓库已存在,获取URI
            repo_response = self.ecr.describe_repositories(
                repositoryNames=['flask-app']
            )
            repo_uri = repo_response['repositories'][0]['repositoryUri']
            print(f"使用现有ECR仓库: {repo_uri}")
            return repo_uri
    
    def create_task_definition(self, image_uri, db_endpoint, cache_endpoint):
        """创建ECS任务定义"""
        task_definition = {
            'family': 'flask-app-task',
            'networkMode': 'awsvpc',
            'requiresCompatibilities': ['FARGATE'],
            'cpu': '256',
            'memory': '512',
            'executionRoleArn': 'arn:aws:iam::YOUR_ACCOUNT:role/ecsTaskExecutionRole',
            'containerDefinitions': [
                {
                    'name': 'flask-app',
                    'image': image_uri,
                    'portMappings': [
                        {
                            'containerPort': 5000,
                            'protocol': 'tcp'
                        }
                    ],
                    'environment': [
                        {'name': 'FLASK_ENV', 'value': 'production'},
                        {'name': 'DATABASE_URL', 'value': f'postgresql://postgres:password@{db_endpoint}:5432/flask_app'},
                        {'name': 'REDIS_URL', 'value': f'redis://{cache_endpoint}:6379/0'}
                    ],
                    'logConfiguration': {
                        'logDriver': 'awslogs',
                        'options': {
                            'awslogs-group': '/ecs/flask-app',
                            'awslogs-region': self.region,
                            'awslogs-stream-prefix': 'ecs'
                        }
                    },
                    'healthCheck': {
                        'command': ['CMD-SHELL', 'curl -f http://localhost:5000/health || exit 1'],
                        'interval': 30,
                        'timeout': 5,
                        'retries': 3,
                        'startPeriod': 60
                    }
                }
            ]
        }
        
        response = self.ecs.register_task_definition(**task_definition)
        print(f"任务定义创建完成: {response['taskDefinition']['taskDefinitionArn']}")
        return response['taskDefinition']['taskDefinitionArn']
    
    def create_load_balancer(self, vpc_id, subnet_ids):
        """创建应用负载均衡器"""
        # 创建安全组
        security_group = self.ec2.create_security_group(
            GroupName='flask-app-alb-sg',
            Description='Flask App ALB Security Group',
            VpcId=vpc_id
        )
        
        sg_id = security_group['GroupId']
        
        # 添加入站规则
        self.ec2.authorize_security_group_ingress(
            GroupId=sg_id,
            IpPermissions=[
                {
                    'IpProtocol': 'tcp',
                    'FromPort': 80,
                    'ToPort': 80,
                    'IpRanges': [{'CidrIp': '0.0.0.0/0'}]
                },
                {
                    'IpProtocol': 'tcp',
                    'FromPort': 443,
                    'ToPort': 443,
                    'IpRanges': [{'CidrIp': '0.0.0.0/0'}]
                }
            ]
        )
        
        # 创建负载均衡器
        alb_response = self.elbv2.create_load_balancer(
            Name='flask-app-alb',
            Subnets=subnet_ids,
            SecurityGroups=[sg_id],
            Scheme='internet-facing',
            Type='application',
            IpAddressType='ipv4'
        )
        
        alb_arn = alb_response['LoadBalancers'][0]['LoadBalancerArn']
        alb_dns = alb_response['LoadBalancers'][0]['DNSName']
        
        print(f"负载均衡器创建完成: {alb_dns}")
        return alb_arn, alb_dns
    
    def deploy_full_stack(self):
        """部署完整的应用栈"""
        print("开始AWS全栈部署...")
        
        # 1. 创建VPC基础设施
        vpc_info = self.create_vpc_infrastructure()
        
        # 2. 创建ECR仓库
        ecr_uri = self.create_ecr_repository()
        
        # 3. 创建RDS实例
        db_id = self.create_rds_instance(
            vpc_info['vpc_id'],
            [vpc_info['private_subnet_id']]
        )
        
        # 4. 创建ElastiCache集群
        cache_id = self.create_elasticache_cluster(
            vpc_info['vpc_id'],
            [vpc_info['private_subnet_id']]
        )
        
        # 5. 创建ECS集群
        cluster_arn = self.create_ecs_cluster()
        
        # 6. 创建负载均衡器
        alb_arn, alb_dns = self.create_load_balancer(
            vpc_info['vpc_id'],
            [vpc_info['public_subnet_id']]
        )
        
        print("\n部署信息:")
        print(f"VPC ID: {vpc_info['vpc_id']}")
        print(f"ECR URI: {ecr_uri}")
        print(f"数据库ID: {db_id}")
        print(f"缓存ID: {cache_id}")
        print(f"ECS集群: {cluster_arn}")
        print(f"负载均衡器: {alb_dns}")
        
        return {
            'vpc_info': vpc_info,
            'ecr_uri': ecr_uri,
            'db_id': db_id,
            'cache_id': cache_id,
            'cluster_arn': cluster_arn,
            'alb_arn': alb_arn,
            'alb_dns': alb_dns
        }

if __name__ == '__main__':
    deployer = AWSDeployer()
    deployment_info = deployer.deploy_full_stack()
    
    # 保存部署信息
    with open('aws_deployment_info.json', 'w') as f:
        json.dump(deployment_info, f, indent=2, default=str)
    
    print("\n部署信息已保存到 aws_deployment_info.json")

11.3.2 Kubernetes部署

# k8s/namespace.yaml
apiVersion: v1
kind: Namespace
metadata:
  name: flask-app
  labels:
    name: flask-app
---
# k8s/configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
  name: flask-app-config
  namespace: flask-app
data:
  FLASK_ENV: "production"
  REDIS_URL: "redis://redis-service:6379/0"
  CELERY_BROKER_URL: "redis://redis-service:6379/1"
  CELERY_RESULT_BACKEND: "redis://redis-service:6379/1"
---
# k8s/secret.yaml
apiVersion: v1
kind: Secret
metadata:
  name: flask-app-secret
  namespace: flask-app
type: Opaque
data:
  SECRET_KEY: eW91ci1zdXBlci1zZWNyZXQta2V5LWhlcmU=  # base64编码
  DATABASE_URL: cG9zdGdyZXNxbDovL3Bvc3RncmVzOnBhc3N3b3JkQHBvc3RncmVzLXNlcnZpY2U6NTQzMi9mbGFza19hcHA=
  MAIL_PASSWORD: eW91ci1tYWlsLXBhc3N3b3JkLWhlcmU=
---
# k8s/postgres.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: postgres
  namespace: flask-app
spec:
  replicas: 1
  selector:
    matchLabels:
      app: postgres
  template:
    metadata:
      labels:
        app: postgres
    spec:
      containers:
      - name: postgres
        image: postgres:13
        env:
        - name: POSTGRES_DB
          value: "flask_app"
        - name: POSTGRES_USER
          value: "postgres"
        - name: POSTGRES_PASSWORD
          value: "password"
        ports:
        - containerPort: 5432
        volumeMounts:
        - name: postgres-storage
          mountPath: /var/lib/postgresql/data
        resources:
          requests:
            memory: "256Mi"
            cpu: "250m"
          limits:
            memory: "512Mi"
            cpu: "500m"
      volumes:
      - name: postgres-storage
        persistentVolumeClaim:
          claimName: postgres-pvc
---
apiVersion: v1
kind: Service
metadata:
  name: postgres-service
  namespace: flask-app
spec:
  selector:
    app: postgres
  ports:
  - port: 5432
    targetPort: 5432
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: postgres-pvc
  namespace: flask-app
spec:
  accessModes:
  - ReadWriteOnce
  resources:
    requests:
      storage: 10Gi
---
# k8s/redis.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: redis
  namespace: flask-app
spec:
  replicas: 1
  selector:
    matchLabels:
      app: redis
  template:
    metadata:
      labels:
        app: redis
    spec:
      containers:
      - name: redis
        image: redis:6-alpine
        command: ["redis-server"]
        args: ["--appendonly", "yes", "--maxmemory", "256mb", "--maxmemory-policy", "allkeys-lru"]
        ports:
        - containerPort: 6379
        volumeMounts:
        - name: redis-storage
          mountPath: /data
        resources:
          requests:
            memory: "128Mi"
            cpu: "100m"
          limits:
            memory: "256Mi"
            cpu: "200m"
      volumes:
      - name: redis-storage
        persistentVolumeClaim:
          claimName: redis-pvc
---
apiVersion: v1
kind: Service
metadata:
  name: redis-service
  namespace: flask-app
spec:
  selector:
    app: redis
  ports:
  - port: 6379
    targetPort: 6379
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: redis-pvc
  namespace: flask-app
spec:
  accessModes:
  - ReadWriteOnce
  resources:
    requests:
      storage: 5Gi
---
# k8s/flask-app.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: flask-app
  namespace: flask-app
spec:
  replicas: 3
  selector:
    matchLabels:
      app: flask-app
  template:
    metadata:
      labels:
        app: flask-app
    spec:
      containers:
      - name: flask-app
        image: your-registry/flask-app:latest
        ports:
        - containerPort: 5000
        env:
        - name: SECRET_KEY
          valueFrom:
            secretKeyRef:
              name: flask-app-secret
              key: SECRET_KEY
        - name: DATABASE_URL
          valueFrom:
            secretKeyRef:
              name: flask-app-secret
              key: DATABASE_URL
        envFrom:
        - configMapRef:
            name: flask-app-config
        livenessProbe:
          httpGet:
            path: /health
            port: 5000
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /health
            port: 5000
          initialDelaySeconds: 5
          periodSeconds: 5
        resources:
          requests:
            memory: "256Mi"
            cpu: "250m"
          limits:
            memory: "512Mi"
            cpu: "500m"
        volumeMounts:
        - name: uploads-storage
          mountPath: /app/uploads
      volumes:
      - name: uploads-storage
        persistentVolumeClaim:
          claimName: uploads-pvc
---
apiVersion: v1
kind: Service
metadata:
  name: flask-app-service
  namespace: flask-app
spec:
  selector:
    app: flask-app
  ports:
  - port: 80
    targetPort: 5000
  type: ClusterIP
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: uploads-pvc
  namespace: flask-app
spec:
  accessModes:
  - ReadWriteMany
  resources:
    requests:
      storage: 20Gi
---
# k8s/celery-worker.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: celery-worker
  namespace: flask-app
spec:
  replicas: 2
  selector:
    matchLabels:
      app: celery-worker
  template:
    metadata:
      labels:
        app: celery-worker
    spec:
      containers:
      - name: celery-worker
        image: your-registry/flask-app:latest
        command: ["celery"]
        args: ["-A", "app.celery", "worker", "--loglevel=info", "--concurrency=4"]
        env:
        - name: SECRET_KEY
          valueFrom:
            secretKeyRef:
              name: flask-app-secret
              key: SECRET_KEY
        - name: DATABASE_URL
          valueFrom:
            secretKeyRef:
              name: flask-app-secret
              key: DATABASE_URL
        envFrom:
        - configMapRef:
            name: flask-app-config
        resources:
          requests:
            memory: "256Mi"
            cpu: "250m"
          limits:
            memory: "512Mi"
            cpu: "500m"
---
# k8s/ingress.yaml
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
  name: flask-app-ingress
  namespace: flask-app
  annotations:
    kubernetes.io/ingress.class: "nginx"
    cert-manager.io/cluster-issuer: "letsencrypt-prod"
    nginx.ingress.kubernetes.io/ssl-redirect: "true"
    nginx.ingress.kubernetes.io/proxy-body-size: "16m"
spec:
  tls:
  - hosts:
    - your-domain.com
    secretName: flask-app-tls
  rules:
  - host: your-domain.com
    http:
      paths:
      - path: /
        pathType: Prefix
        backend:
          service:
            name: flask-app-service
            port:
              number: 80
---
# k8s/hpa.yaml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: flask-app-hpa
  namespace: flask-app
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: flask-app
  minReplicas: 3
  maxReplicas: 10
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 70
  - type: Resource
    resource:
      name: memory
      target:
        type: Utilization
        averageUtilization: 80
# scripts/k8s_deploy.py
import subprocess
import yaml
import time
from pathlib import Path

class KubernetesDeployer:
    """Kubernetes部署管理器"""
    
    def __init__(self, namespace='flask-app', kubeconfig=None):
        self.namespace = namespace
        self.kubeconfig = kubeconfig
        self.k8s_dir = Path('k8s')
    
    def run_kubectl(self, command, check=True):
        """执行kubectl命令"""
        cmd = ['kubectl'] + command
        if self.kubeconfig:
            cmd.extend(['--kubeconfig', self.kubeconfig])
        
        print(f"执行命令: {' '.join(cmd)}")
        result = subprocess.run(cmd, capture_output=True, text=True)
        
        if check and result.returncode != 0:
            print(f"命令执行失败: {result.stderr}")
            raise subprocess.CalledProcessError(result.returncode, cmd)
        
        return result
    
    def create_namespace(self):
        """创建命名空间"""
        try:
            self.run_kubectl(['create', 'namespace', self.namespace])
            print(f"命名空间 {self.namespace} 创建成功")
        except subprocess.CalledProcessError:
            print(f"命名空间 {self.namespace} 已存在")
    
    def apply_manifests(self, manifest_files=None):
        """应用Kubernetes清单文件"""
        if manifest_files is None:
            manifest_files = [
                'namespace.yaml',
                'configmap.yaml',
                'secret.yaml',
                'postgres.yaml',
                'redis.yaml',
                'flask-app.yaml',
                'celery-worker.yaml',
                'ingress.yaml',
                'hpa.yaml'
            ]
        
        for manifest_file in manifest_files:
            manifest_path = self.k8s_dir / manifest_file
            if manifest_path.exists():
                self.run_kubectl(['apply', '-f', str(manifest_path)])
                print(f"应用清单文件: {manifest_file}")
            else:
                print(f"清单文件不存在: {manifest_file}")
    
    def wait_for_deployment(self, deployment_name, timeout=300):
        """等待部署完成"""
        print(f"等待部署 {deployment_name} 完成...")
        self.run_kubectl([
            'wait', '--for=condition=available',
            f'deployment/{deployment_name}',
            f'--namespace={self.namespace}',
            f'--timeout={timeout}s'
        ])
        print(f"部署 {deployment_name} 完成")
    
    def get_pod_status(self):
        """获取Pod状态"""
        result = self.run_kubectl([
            'get', 'pods', f'--namespace={self.namespace}', '-o', 'wide'
        ])
        print("Pod状态:")
        print(result.stdout)
    
    def get_service_info(self):
        """获取服务信息"""
        result = self.run_kubectl([
            'get', 'services', f'--namespace={self.namespace}', '-o', 'wide'
        ])
        print("服务信息:")
        print(result.stdout)
    
    def get_ingress_info(self):
        """获取Ingress信息"""
        result = self.run_kubectl([
            'get', 'ingress', f'--namespace={self.namespace}', '-o', 'wide'
        ])
        print("Ingress信息:")
        print(result.stdout)
    
    def scale_deployment(self, deployment_name, replicas):
        """扩缩容部署"""
        self.run_kubectl([
            'scale', f'deployment/{deployment_name}',
            f'--replicas={replicas}',
            f'--namespace={self.namespace}'
        ])
        print(f"部署 {deployment_name} 扩缩容到 {replicas} 个副本")
    
    def rolling_update(self, deployment_name, image):
        """滚动更新"""
        self.run_kubectl([
            'set', 'image',
            f'deployment/{deployment_name}',
            f'{deployment_name}={image}',
            f'--namespace={self.namespace}'
        ])
        print(f"开始滚动更新 {deployment_name} 到镜像 {image}")
        
        # 等待滚动更新完成
        self.wait_for_deployment(deployment_name)
    
    def rollback_deployment(self, deployment_name, revision=None):
        """回滚部署"""
        cmd = ['rollout', 'undo', f'deployment/{deployment_name}', f'--namespace={self.namespace}']
        if revision:
            cmd.extend(['--to-revision', str(revision)])
        
        self.run_kubectl(cmd)
        print(f"回滚部署 {deployment_name}")
        
        # 等待回滚完成
        self.wait_for_deployment(deployment_name)
    
    def get_logs(self, pod_name=None, container=None, follow=False, tail=100):
        """获取日志"""
        if pod_name is None:
            # 获取第一个flask-app pod
            result = self.run_kubectl([
                'get', 'pods', f'--namespace={self.namespace}',
                '-l', 'app=flask-app', '-o', 'jsonpath={.items[0].metadata.name}'
            ])
            pod_name = result.stdout.strip()
        
        cmd = ['logs', pod_name, f'--namespace={self.namespace}']
        
        if container:
            cmd.extend(['-c', container])
        if follow:
            cmd.append('-f')
        if tail:
            cmd.extend(['--tail', str(tail)])
        
        self.run_kubectl(cmd, check=False)
    
    def exec_pod(self, pod_name, command):
        """在Pod中执行命令"""
        cmd = ['exec', '-it', pod_name, f'--namespace={self.namespace}', '--'] + command
        self.run_kubectl(cmd, check=False)
    
    def port_forward(self, service_name, local_port, remote_port):
        """端口转发"""
        cmd = [
            'port-forward', f'service/{service_name}',
            f'{local_port}:{remote_port}',
            f'--namespace={self.namespace}'
        ]
        print(f"端口转发: localhost:{local_port} -> {service_name}:{remote_port}")
        self.run_kubectl(cmd, check=False)
    
    def deploy_full_stack(self):
        """部署完整应用栈"""
        print("开始Kubernetes全栈部署...")
        
        # 1. 创建命名空间
        self.create_namespace()
        
        # 2. 应用所有清单文件
        self.apply_manifests()
        
        # 3. 等待关键部署完成
        deployments = ['postgres', 'redis', 'flask-app']
        for deployment in deployments:
            try:
                self.wait_for_deployment(deployment)
            except subprocess.CalledProcessError:
                print(f"部署 {deployment} 可能存在问题")
        
        # 4. 显示部署状态
        print("\n=== 部署状态 ===")
        self.get_pod_status()
        print("\n=== 服务信息 ===")
        self.get_service_info()
        print("\n=== Ingress信息 ===")
        self.get_ingress_info()
        
        print("\nKubernetes部署完成!")

if __name__ == '__main__':
    import argparse
    
    parser = argparse.ArgumentParser(description='Kubernetes部署管理工具')
    parser.add_argument('--namespace', '-n', default='flask-app', help='命名空间')
    parser.add_argument('--kubeconfig', help='kubeconfig文件路径')
    
    subparsers = parser.add_subparsers(dest='command', help='可用命令')
    
    # deploy命令
    subparsers.add_parser('deploy', help='部署应用')
    
    # status命令
    subparsers.add_parser('status', help='查看状态')
    
    # scale命令
    scale_parser = subparsers.add_parser('scale', help='扩缩容')
    scale_parser.add_argument('deployment', help='部署名称')
    scale_parser.add_argument('replicas', type=int, help='副本数量')
    
    # update命令
    update_parser = subparsers.add_parser('update', help='滚动更新')
    update_parser.add_argument('deployment', help='部署名称')
    update_parser.add_argument('image', help='新镜像')
    
    # rollback命令
    rollback_parser = subparsers.add_parser('rollback', help='回滚部署')
    rollback_parser.add_argument('deployment', help='部署名称')
    rollback_parser.add_argument('--revision', type=int, help='回滚到指定版本')
    
    # logs命令
    logs_parser = subparsers.add_parser('logs', help='查看日志')
    logs_parser.add_argument('--pod', help='Pod名称')
    logs_parser.add_argument('-f', '--follow', action='store_true', help='跟踪日志')
    logs_parser.add_argument('--tail', type=int, default=100, help='显示最后N行')
    
    # port-forward命令
    pf_parser = subparsers.add_parser('port-forward', help='端口转发')
    pf_parser.add_argument('service', help='服务名称')
    pf_parser.add_argument('local_port', type=int, help='本地端口')
    pf_parser.add_argument('remote_port', type=int, help='远程端口')
    
    args = parser.parse_args()
    
    if not args.command:
        parser.print_help()
        exit(1)
    
    deployer = KubernetesDeployer(args.namespace, args.kubeconfig)
    
    if args.command == 'deploy':
        deployer.deploy_full_stack()
    elif args.command == 'status':
        deployer.get_pod_status()
        deployer.get_service_info()
        deployer.get_ingress_info()
    elif args.command == 'scale':
        deployer.scale_deployment(args.deployment, args.replicas)
    elif args.command == 'update':
        deployer.rolling_update(args.deployment, args.image)
    elif args.command == 'rollback':
        deployer.rollback_deployment(args.deployment, args.revision)
    elif args.command == 'logs':
        deployer.get_logs(args.pod, follow=args.follow, tail=args.tail)
    elif args.command == 'port-forward':
        deployer.port_forward(args.service, args.local_port, args.remote_port)

11.4 监控与日志

11.4.1 应用监控

# monitoring/prometheus_metrics.py
from prometheus_client import Counter, Histogram, Gauge, generate_latest
from flask import Response
import time
import psutil
import threading

class PrometheusMetrics:
    """Prometheus指标收集器"""
    
    def __init__(self, app=None):
        self.app = app
        
        # 请求指标
        self.request_count = Counter(
            'flask_requests_total',
            'Total number of requests',
            ['method', 'endpoint', 'status']
        )
        
        self.request_duration = Histogram(
            'flask_request_duration_seconds',
            'Request duration in seconds',
            ['method', 'endpoint']
        )
        
        # 业务指标
        self.user_registrations = Counter(
            'flask_user_registrations_total',
            'Total number of user registrations'
        )
        
        self.active_users = Gauge(
            'flask_active_users',
            'Number of active users'
        )
        
        self.database_connections = Gauge(
            'flask_database_connections',
            'Number of database connections'
        )
        
        # 系统指标
        self.cpu_usage = Gauge(
            'flask_cpu_usage_percent',
            'CPU usage percentage'
        )
        
        self.memory_usage = Gauge(
            'flask_memory_usage_bytes',
            'Memory usage in bytes'
        )
        
        self.disk_usage = Gauge(
            'flask_disk_usage_percent',
            'Disk usage percentage'
        )
        
        if app:
            self.init_app(app)
    
    def init_app(self, app):
        """初始化Flask应用"""
        self.app = app
        
        # 注册请求钩子
        app.before_request(self._before_request)
        app.after_request(self._after_request)
        
        # 注册指标端点
        app.add_url_rule('/metrics', 'metrics', self.metrics_endpoint)
        
        # 启动系统指标收集线程
        self._start_system_metrics_collection()
    
    def _before_request(self):
        """请求开始前的钩子"""
        from flask import g
        g.start_time = time.time()
    
    def _after_request(self, response):
        """请求结束后的钩子"""
        from flask import request, g
        
        # 记录请求计数
        self.request_count.labels(
            method=request.method,
            endpoint=request.endpoint or 'unknown',
            status=response.status_code
        ).inc()
        
        # 记录请求持续时间
        if hasattr(g, 'start_time'):
            duration = time.time() - g.start_time
            self.request_duration.labels(
                method=request.method,
                endpoint=request.endpoint or 'unknown'
            ).observe(duration)
        
        return response
    
    def _start_system_metrics_collection(self):
        """启动系统指标收集"""
        def collect_system_metrics():
            while True:
                try:
                    # CPU使用率
                    cpu_percent = psutil.cpu_percent(interval=1)
                    self.cpu_usage.set(cpu_percent)
                    
                    # 内存使用
                    memory = psutil.virtual_memory()
                    self.memory_usage.set(memory.used)
                    
                    # 磁盘使用率
                    disk = psutil.disk_usage('/')
                    disk_percent = (disk.used / disk.total) * 100
                    self.disk_usage.set(disk_percent)
                    
                except Exception as e:
                    print(f"系统指标收集错误: {e}")
                
                time.sleep(30)  # 每30秒收集一次
        
        thread = threading.Thread(target=collect_system_metrics, daemon=True)
        thread.start()
    
    def metrics_endpoint(self):
        """Prometheus指标端点"""
        return Response(generate_latest(), mimetype='text/plain')
    
    def record_user_registration(self):
        """记录用户注册"""
        self.user_registrations.inc()
    
    def update_active_users(self, count):
        """更新活跃用户数"""
        self.active_users.set(count)
    
    def update_database_connections(self, count):
        """更新数据库连接数"""
        self.database_connections.set(count)
# monitoring/prometheus.yml
global:
  scrape_interval: 15s
  evaluation_interval: 15s

rule_files:
  - "alert_rules.yml"

alerting:
  alertmanagers:
    - static_configs:
        - targets:
          - alertmanager:9093

scrape_configs:
  - job_name: 'flask-app'
    static_configs:
      - targets: ['flask-app:5000']
    metrics_path: '/metrics'
    scrape_interval: 30s
    
  - job_name: 'postgres'
    static_configs:
      - targets: ['postgres-exporter:9187']
    
  - job_name: 'redis'
    static_configs:
      - targets: ['redis-exporter:9121']
    
  - job_name: 'nginx'
    static_configs:
      - targets: ['nginx-exporter:9113']
# monitoring/alert_rules.yml
groups:
  - name: flask-app-alerts
    rules:
      - alert: HighErrorRate
        expr: rate(flask_requests_total{status=~"5.."}[5m]) > 0.1
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "High error rate detected"
          description: "Error rate is {{ $value }} errors per second"
      
      - alert: HighResponseTime
        expr: histogram_quantile(0.95, rate(flask_request_duration_seconds_bucket[5m])) > 2
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "High response time detected"
          description: "95th percentile response time is {{ $value }} seconds"
      
      - alert: HighCPUUsage
        expr: flask_cpu_usage_percent > 80
        for: 10m
        labels:
          severity: warning
        annotations:
          summary: "High CPU usage"
          description: "CPU usage is {{ $value }}%"
      
      - alert: HighMemoryUsage
        expr: flask_memory_usage_bytes / (1024*1024*1024) > 1
        for: 10m
        labels:
          severity: warning
        annotations:
          summary: "High memory usage"
          description: "Memory usage is {{ $value }}GB"
      
      - alert: DatabaseConnectionsHigh
        expr: flask_database_connections > 50
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "High database connections"
          description: "Database connections: {{ $value }}"
# monitoring/health_check.py
from flask import Blueprint, jsonify, current_app
from sqlalchemy import text
import redis
import time

health_bp = Blueprint('health', __name__)

class HealthChecker:
    """健康检查器"""
    
    def __init__(self, app=None):
        self.app = app
        if app:
            self.init_app(app)
    
    def init_app(self, app):
        """初始化Flask应用"""
        self.app = app
        app.register_blueprint(health_bp)
    
    @staticmethod
    def check_database():
        """检查数据库连接"""
        try:
            from app import db
            db.session.execute(text('SELECT 1'))
            return {'status': 'healthy', 'response_time': 0}
        except Exception as e:
            return {'status': 'unhealthy', 'error': str(e)}
    
    @staticmethod
    def check_redis():
        """检查Redis连接"""
        try:
            r = redis.Redis.from_url(current_app.config.get('REDIS_URL'))
            start_time = time.time()
            r.ping()
            response_time = time.time() - start_time
            return {'status': 'healthy', 'response_time': response_time}
        except Exception as e:
            return {'status': 'unhealthy', 'error': str(e)}
    
    @staticmethod
    def check_disk_space():
        """检查磁盘空间"""
        try:
            import shutil
            total, used, free = shutil.disk_usage('/')
            usage_percent = (used / total) * 100
            
            status = 'healthy'
            if usage_percent > 90:
                status = 'critical'
            elif usage_percent > 80:
                status = 'warning'
            
            return {
                'status': status,
                'usage_percent': usage_percent,
                'free_gb': free // (1024**3)
            }
        except Exception as e:
            return {'status': 'unhealthy', 'error': str(e)}
    
    @staticmethod
    def check_memory():
        """检查内存使用"""
        try:
            import psutil
            memory = psutil.virtual_memory()
            
            status = 'healthy'
            if memory.percent > 90:
                status = 'critical'
            elif memory.percent > 80:
                status = 'warning'
            
            return {
                'status': status,
                'usage_percent': memory.percent,
                'available_gb': memory.available // (1024**3)
            }
        except Exception as e:
            return {'status': 'unhealthy', 'error': str(e)}

@health_bp.route('/health')
def health_check():
    """基础健康检查"""
    return jsonify({
        'status': 'healthy',
        'timestamp': time.time(),
        'version': current_app.config.get('VERSION', '1.0.0')
    })

@health_bp.route('/health/detailed')
def detailed_health_check():
    """详细健康检查"""
    checker = HealthChecker()
    
    checks = {
        'database': checker.check_database(),
        'redis': checker.check_redis(),
        'disk': checker.check_disk_space(),
        'memory': checker.check_memory()
    }
    
    # 确定整体状态
    overall_status = 'healthy'
    for check_name, check_result in checks.items():
        if check_result['status'] == 'critical':
            overall_status = 'critical'
            break
        elif check_result['status'] in ['unhealthy', 'warning']:
            overall_status = 'warning'
    
    return jsonify({
        'status': overall_status,
        'timestamp': time.time(),
        'checks': checks
    })

@health_bp.route('/health/ready')
def readiness_check():
    """就绪检查"""
    checker = HealthChecker()
    
    # 检查关键依赖
    db_check = checker.check_database()
    redis_check = checker.check_redis()
    
    if db_check['status'] == 'healthy' and redis_check['status'] == 'healthy':
        return jsonify({'status': 'ready'})
    else:
        return jsonify({
            'status': 'not_ready',
            'database': db_check,
            'redis': redis_check
        }), 503

@health_bp.route('/health/live')
def liveness_check():
    """存活检查"""
    return jsonify({'status': 'alive'})

11.4.2 日志管理

# logging_config.py
import logging
import logging.handlers
import os
from datetime import datetime
import json

class JSONFormatter(logging.Formatter):
    """JSON格式日志格式化器"""
    
    def format(self, record):
        log_entry = {
            'timestamp': datetime.utcnow().isoformat(),
            'level': record.levelname,
            'logger': record.name,
            'message': record.getMessage(),
            'module': record.module,
            'function': record.funcName,
            'line': record.lineno
        }
        
        # 添加异常信息
        if record.exc_info:
            log_entry['exception'] = self.formatException(record.exc_info)
        
        # 添加额外字段
        if hasattr(record, 'user_id'):
            log_entry['user_id'] = record.user_id
        
        if hasattr(record, 'request_id'):
            log_entry['request_id'] = record.request_id
        
        if hasattr(record, 'ip_address'):
            log_entry['ip_address'] = record.ip_address
        
        return json.dumps(log_entry)

class LoggingConfig:
    """日志配置管理器"""
    
    def __init__(self, app=None):
        self.app = app
        if app:
            self.init_app(app)
    
    def init_app(self, app):
        """初始化Flask应用日志"""
        self.app = app
        
        # 创建日志目录
        log_dir = app.config.get('LOG_DIR', 'logs')
        os.makedirs(log_dir, exist_ok=True)
        
        # 配置根日志器
        self._configure_root_logger(app, log_dir)
        
        # 配置应用日志器
        self._configure_app_logger(app, log_dir)
        
        # 配置访问日志
        self._configure_access_logger(app, log_dir)
        
        # 配置错误日志
        self._configure_error_logger(app, log_dir)
        
        # 注册请求钩子
        self._register_request_hooks(app)
    
    def _configure_root_logger(self, app, log_dir):
        """配置根日志器"""
        root_logger = logging.getLogger()
        root_logger.setLevel(getattr(logging, app.config.get('LOG_LEVEL', 'INFO')))
        
        # 控制台处理器
        if app.config.get('LOG_TO_CONSOLE', True):
            console_handler = logging.StreamHandler()
            console_handler.setLevel(logging.INFO)
            console_formatter = logging.Formatter(
                '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
            )
            console_handler.setFormatter(console_formatter)
            root_logger.addHandler(console_handler)
    
    def _configure_app_logger(self, app, log_dir):
        """配置应用日志器"""
        app_logger = logging.getLogger('flask_app')
        
        # 文件处理器
        app_log_file = os.path.join(log_dir, 'app.log')
        file_handler = logging.handlers.RotatingFileHandler(
            app_log_file,
            maxBytes=10 * 1024 * 1024,  # 10MB
            backupCount=10
        )
        file_handler.setLevel(logging.INFO)
        
        # JSON格式化器
        json_formatter = JSONFormatter()
        file_handler.setFormatter(json_formatter)
        
        app_logger.addHandler(file_handler)
        app_logger.setLevel(logging.INFO)
        
        # 设置为应用的日志器
        app.logger = app_logger
    
    def _configure_access_logger(self, app, log_dir):
        """配置访问日志器"""
        access_logger = logging.getLogger('access')
        
        access_log_file = os.path.join(log_dir, 'access.log')
        access_handler = logging.handlers.RotatingFileHandler(
            access_log_file,
            maxBytes=50 * 1024 * 1024,  # 50MB
            backupCount=20
        )
        
        access_formatter = logging.Formatter(
            '%(asctime)s - %(remote_addr)s - "%(method)s %(url)s %(protocol)s" '
            '%(status_code)s %(content_length)s "%(user_agent)s" %(response_time)s'
        )
        access_handler.setFormatter(access_formatter)
        
        access_logger.addHandler(access_handler)
        access_logger.setLevel(logging.INFO)
        access_logger.propagate = False
    
    def _configure_error_logger(self, app, log_dir):
        """配置错误日志器"""
        error_logger = logging.getLogger('error')
        
        error_log_file = os.path.join(log_dir, 'error.log')
        error_handler = logging.handlers.RotatingFileHandler(
            error_log_file,
            maxBytes=10 * 1024 * 1024,  # 10MB
            backupCount=10
        )
        error_handler.setLevel(logging.ERROR)
        
        error_formatter = JSONFormatter()
        error_handler.setFormatter(error_formatter)
        
        error_logger.addHandler(error_handler)
        error_logger.setLevel(logging.ERROR)
        error_logger.propagate = False
    
    def _register_request_hooks(self, app):
        """注册请求钩子"""
        import uuid
        import time
        from flask import request, g
        
        @app.before_request
        def before_request():
            g.start_time = time.time()
            g.request_id = str(uuid.uuid4())
        
        @app.after_request
        def after_request(response):
            # 记录访问日志
            access_logger = logging.getLogger('access')
            
            response_time = time.time() - g.start_time
            
            # 创建日志记录
            log_record = logging.LogRecord(
                name='access',
                level=logging.INFO,
                pathname='',
                lineno=0,
                msg='',
                args=(),
                exc_info=None
            )
            
            # 添加自定义字段
            log_record.remote_addr = request.remote_addr
            log_record.method = request.method
            log_record.url = request.url
            log_record.protocol = request.environ.get('SERVER_PROTOCOL')
            log_record.status_code = response.status_code
            log_record.content_length = response.content_length or 0
            log_record.user_agent = request.headers.get('User-Agent', '')
            log_record.response_time = f'{response_time:.3f}s'
            
            access_logger.handle(log_record)
            
            return response
        
        @app.errorhandler(Exception)
        def handle_exception(e):
            error_logger = logging.getLogger('error')
            
            # 创建错误日志记录
            extra = {
                'request_id': getattr(g, 'request_id', None),
                'user_id': getattr(g, 'user_id', None),
                'ip_address': request.remote_addr,
                'url': request.url,
                'method': request.method
            }
            
            error_logger.error(f'Unhandled exception: {str(e)}', exc_info=True, extra=extra)
            
            # 重新抛出异常让Flask处理
            raise e

class StructuredLogger:
    """结构化日志器"""
    
    def __init__(self, name):
        self.logger = logging.getLogger(name)
    
    def info(self, message, **kwargs):
        """记录信息日志"""
        extra = self._prepare_extra(**kwargs)
        self.logger.info(message, extra=extra)
    
    def warning(self, message, **kwargs):
        """记录警告日志"""
        extra = self._prepare_extra(**kwargs)
        self.logger.warning(message, extra=extra)
    
    def error(self, message, **kwargs):
        """记录错误日志"""
        extra = self._prepare_extra(**kwargs)
        self.logger.error(message, extra=extra)
    
    def debug(self, message, **kwargs):
        """记录调试日志"""
        extra = self._prepare_extra(**kwargs)
        self.logger.debug(message, extra=extra)
    
    def _prepare_extra(self, **kwargs):
        """准备额外字段"""
        from flask import g, request
        
        extra = kwargs.copy()
        
        # 添加请求相关信息
        if request:
            extra.setdefault('request_id', getattr(g, 'request_id', None))
            extra.setdefault('ip_address', request.remote_addr)
            extra.setdefault('user_agent', request.headers.get('User-Agent'))
        
        # 添加用户信息
        extra.setdefault('user_id', getattr(g, 'user_id', None))
        
        return extra

# 使用示例
def get_logger(name):
    """获取结构化日志器"""
    return StructuredLogger(name)

11.4.3 运维工具

# ops/deployment_manager.py
import subprocess
import os
import yaml
import json
from datetime import datetime
import shutil

class DeploymentManager:
    """部署管理器"""
    
    def __init__(self, config_file='deploy_config.yml'):
        self.config = self._load_config(config_file)
        self.deployment_history = []
    
    def _load_config(self, config_file):
        """加载部署配置"""
        try:
            with open(config_file, 'r', encoding='utf-8') as f:
                return yaml.safe_load(f)
        except FileNotFoundError:
            return self._default_config()
    
    def _default_config(self):
        """默认配置"""
        return {
            'app_name': 'flask-app',
            'environments': {
                'development': {
                    'host': 'localhost',
                    'port': 5000,
                    'workers': 1
                },
                'staging': {
                    'host': '0.0.0.0',
                    'port': 8000,
                    'workers': 2
                },
                'production': {
                    'host': '0.0.0.0',
                    'port': 8000,
                    'workers': 4
                }
            },
            'backup': {
                'enabled': True,
                'retention_days': 30,
                'backup_dir': '/backups'
            }
        }
    
    def deploy(self, environment='production', version=None):
        """部署应用"""
        print(f"开始部署到 {environment} 环境...")
        
        deployment_info = {
            'environment': environment,
            'version': version or self._get_current_version(),
            'timestamp': datetime.now().isoformat(),
            'status': 'started'
        }
        
        try:
            # 1. 备份当前版本
            if self.config['backup']['enabled']:
                self._backup_current_version(environment)
            
            # 2. 构建应用
            self._build_application()
            
            # 3. 运行测试
            self._run_tests()
            
            # 4. 部署到目标环境
            self._deploy_to_environment(environment)
            
            # 5. 健康检查
            self._health_check(environment)
            
            deployment_info['status'] = 'success'
            print(f"部署成功完成!")
            
        except Exception as e:
            deployment_info['status'] = 'failed'
            deployment_info['error'] = str(e)
            print(f"部署失败: {e}")
            
            # 回滚到上一个版本
            self._rollback(environment)
            raise
        
        finally:
            self.deployment_history.append(deployment_info)
            self._save_deployment_history()
    
    def _get_current_version(self):
        """获取当前版本"""
        try:
            result = subprocess.run(['git', 'rev-parse', '--short', 'HEAD'], 
                                  capture_output=True, text=True)
            return result.stdout.strip()
        except:
            return datetime.now().strftime('%Y%m%d%H%M%S')
    
    def _backup_current_version(self, environment):
        """备份当前版本"""
        backup_dir = self.config['backup']['backup_dir']
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        backup_path = f"{backup_dir}/{environment}_{timestamp}"
        
        os.makedirs(backup_path, exist_ok=True)
        
        # 备份应用代码
        shutil.copytree('.', f"{backup_path}/code", 
                       ignore=shutil.ignore_patterns('.git', '__pycache__', '*.pyc'))
        
        # 备份数据库
        self._backup_database(backup_path)
        
        print(f"备份完成: {backup_path}")
    
    def _backup_database(self, backup_path):
        """备份数据库"""
        db_url = os.getenv('DATABASE_URL')
        if db_url and 'postgresql' in db_url:
            backup_file = f"{backup_path}/database.sql"
            cmd = f"pg_dump {db_url} > {backup_file}"
            subprocess.run(cmd, shell=True, check=True)
    
    def _build_application(self):
        """构建应用"""
        print("构建应用...")
        
        # 安装依赖
        subprocess.run(['pip', 'install', '-r', 'requirements.txt'], check=True)
        
        # 编译静态资源
        if os.path.exists('webpack.config.js'):
            subprocess.run(['npm', 'run', 'build'], check=True)
    
    def _run_tests(self):
        """运行测试"""
        print("运行测试...")
        result = subprocess.run(['python', '-m', 'pytest', 'tests/'], 
                              capture_output=True, text=True)
        
        if result.returncode != 0:
            raise Exception(f"测试失败: {result.stderr}")
    
    def _deploy_to_environment(self, environment):
        """部署到指定环境"""
        env_config = self.config['environments'][environment]
        
        if environment == 'production':
            # 使用Docker部署
            self._deploy_with_docker(env_config)
        else:
            # 直接部署
            self._deploy_direct(env_config)
    
    def _deploy_with_docker(self, config):
        """使用Docker部署"""
        print("使用Docker部署...")
        
        # 构建镜像
        subprocess.run(['docker', 'build', '-t', self.config['app_name'], '.'], check=True)
        
        # 停止旧容器
        subprocess.run(['docker', 'stop', self.config['app_name']], 
                      capture_output=True)
        subprocess.run(['docker', 'rm', self.config['app_name']], 
                      capture_output=True)
        
        # 启动新容器
        cmd = [
            'docker', 'run', '-d',
            '--name', self.config['app_name'],
            '-p', f"{config['port']}:{config['port']}",
            '--env-file', '.env.production',
            self.config['app_name']
        ]
        subprocess.run(cmd, check=True)
    
    def _deploy_direct(self, config):
        """直接部署"""
        print("直接部署...")
        
        # 重启应用服务
        if os.path.exists('/etc/systemd/system/flask-app.service'):
            subprocess.run(['sudo', 'systemctl', 'restart', 'flask-app'], check=True)
    
    def _health_check(self, environment):
        """健康检查"""
        import requests
        import time
        
        env_config = self.config['environments'][environment]
        health_url = f"http://{env_config['host']}:{env_config['port']}/health"
        
        print("执行健康检查...")
        
        for i in range(30):  # 最多等待30秒
            try:
                response = requests.get(health_url, timeout=5)
                if response.status_code == 200:
                    print("健康检查通过")
                    return
            except requests.RequestException:
                pass
            
            time.sleep(1)
        
        raise Exception("健康检查失败")
    
    def _rollback(self, environment):
        """回滚到上一个版本"""
        print(f"回滚 {environment} 环境...")
        
        # 查找最近的成功部署
        for deployment in reversed(self.deployment_history):
            if (deployment['environment'] == environment and 
                deployment['status'] == 'success'):
                
                # 恢复备份
                self._restore_backup(environment, deployment['timestamp'])
                break
    
    def _restore_backup(self, environment, timestamp):
        """恢复备份"""
        backup_dir = self.config['backup']['backup_dir']
        # 实现备份恢复逻辑
        pass
    
    def _save_deployment_history(self):
        """保存部署历史"""
        with open('deployment_history.json', 'w') as f:
            json.dump(self.deployment_history, f, indent=2)
    
    def get_deployment_status(self):
        """获取部署状态"""
        return {
            'last_deployments': self.deployment_history[-5:],
            'total_deployments': len(self.deployment_history)
        }

class SystemMonitor:
    """系统监控器"""
    
    def __init__(self):
        self.alerts = []
    
    def check_system_health(self):
        """检查系统健康状态"""
        health_status = {
            'timestamp': datetime.now().isoformat(),
            'status': 'healthy',
            'checks': {}
        }
        
        # 检查CPU使用率
        cpu_usage = self._get_cpu_usage()
        health_status['checks']['cpu'] = {
            'usage_percent': cpu_usage,
            'status': 'warning' if cpu_usage > 80 else 'healthy'
        }
        
        # 检查内存使用率
        memory_usage = self._get_memory_usage()
        health_status['checks']['memory'] = {
            'usage_percent': memory_usage,
            'status': 'warning' if memory_usage > 80 else 'healthy'
        }
        
        # 检查磁盘空间
        disk_usage = self._get_disk_usage()
        health_status['checks']['disk'] = {
            'usage_percent': disk_usage,
            'status': 'warning' if disk_usage > 80 else 'healthy'
        }
        
        # 检查服务状态
        services_status = self._check_services()
        health_status['checks']['services'] = services_status
        
        # 确定整体状态
        if any(check['status'] == 'critical' for check in health_status['checks'].values()):
            health_status['status'] = 'critical'
        elif any(check['status'] == 'warning' for check in health_status['checks'].values()):
            health_status['status'] = 'warning'
        
        return health_status
    
    def _get_cpu_usage(self):
        """获取CPU使用率"""
        try:
            import psutil
            return psutil.cpu_percent(interval=1)
        except ImportError:
            return 0
    
    def _get_memory_usage(self):
        """获取内存使用率"""
        try:
            import psutil
            return psutil.virtual_memory().percent
        except ImportError:
            return 0
    
    def _get_disk_usage(self):
        """获取磁盘使用率"""
        try:
            import shutil
            total, used, free = shutil.disk_usage('/')
            return (used / total) * 100
        except:
            return 0
    
    def _check_services(self):
        """检查服务状态"""
        services = ['postgresql', 'redis', 'nginx']
        status = {}
        
        for service in services:
            try:
                result = subprocess.run(
                    ['systemctl', 'is-active', service],
                    capture_output=True, text=True
                )
                status[service] = {
                    'status': 'healthy' if result.stdout.strip() == 'active' else 'unhealthy',
                    'active': result.stdout.strip() == 'active'
                }
            except:
                status[service] = {'status': 'unknown', 'active': False}
        
        return status
    
    def send_alert(self, message, severity='warning'):
        """发送告警"""
        alert = {
            'timestamp': datetime.now().isoformat(),
            'message': message,
            'severity': severity
        }
        
        self.alerts.append(alert)
        
        # 发送到外部告警系统
        self._send_to_alertmanager(alert)
    
    def _send_to_alertmanager(self, alert):
        """发送到Alertmanager"""
        try:
            import requests
            
            alertmanager_url = os.getenv('ALERTMANAGER_URL')
            if alertmanager_url:
                requests.post(f"{alertmanager_url}/api/v1/alerts", json=[alert])
        except:
            pass

class BackupManager:
    """备份管理器"""
    
    def __init__(self, config=None):
        self.config = config or {
            'backup_dir': '/backups',
            'retention_days': 30,
            'compress': True
        }
    
    def create_backup(self, backup_type='full'):
        """创建备份"""
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        backup_name = f"{backup_type}_{timestamp}"
        backup_path = os.path.join(self.config['backup_dir'], backup_name)
        
        os.makedirs(backup_path, exist_ok=True)
        
        try:
            if backup_type == 'full':
                self._backup_application(backup_path)
                self._backup_database(backup_path)
                self._backup_uploads(backup_path)
            elif backup_type == 'database':
                self._backup_database(backup_path)
            elif backup_type == 'files':
                self._backup_application(backup_path)
                self._backup_uploads(backup_path)
            
            # 压缩备份
            if self.config['compress']:
                self._compress_backup(backup_path)
            
            # 清理旧备份
            self._cleanup_old_backups()
            
            return backup_path
            
        except Exception as e:
            # 清理失败的备份
            if os.path.exists(backup_path):
                shutil.rmtree(backup_path)
            raise e
    
    def _backup_application(self, backup_path):
        """备份应用代码"""
        app_backup_path = os.path.join(backup_path, 'application')
        shutil.copytree('.', app_backup_path,
                       ignore=shutil.ignore_patterns(
                           '.git', '__pycache__', '*.pyc', 'node_modules',
                           'logs', 'backups', '.env*'
                       ))
    
    def _backup_database(self, backup_path):
        """备份数据库"""
        db_url = os.getenv('DATABASE_URL')
        if not db_url:
            return
        
        db_backup_file = os.path.join(backup_path, 'database.sql')
        
        if 'postgresql' in db_url:
            cmd = f"pg_dump {db_url} > {db_backup_file}"
            subprocess.run(cmd, shell=True, check=True)
        elif 'mysql' in db_url:
            # MySQL备份逻辑
            pass
    
    def _backup_uploads(self, backup_path):
        """备份上传文件"""
        uploads_dir = 'uploads'
        if os.path.exists(uploads_dir):
            uploads_backup_path = os.path.join(backup_path, 'uploads')
            shutil.copytree(uploads_dir, uploads_backup_path)
    
    def _compress_backup(self, backup_path):
        """压缩备份"""
        archive_path = f"{backup_path}.tar.gz"
        shutil.make_archive(backup_path, 'gztar', backup_path)
        shutil.rmtree(backup_path)
        return archive_path
    
    def _cleanup_old_backups(self):
        """清理旧备份"""
        retention_days = self.config['retention_days']
        cutoff_time = datetime.now().timestamp() - (retention_days * 24 * 3600)
        
        backup_dir = self.config['backup_dir']
        if not os.path.exists(backup_dir):
            return
        
        for item in os.listdir(backup_dir):
            item_path = os.path.join(backup_dir, item)
            if os.path.getctime(item_path) < cutoff_time:
                if os.path.isfile(item_path):
                    os.remove(item_path)
                else:
                    shutil.rmtree(item_path)
    
    def restore_backup(self, backup_name):
        """恢复备份"""
        backup_path = os.path.join(self.config['backup_dir'], backup_name)
        
        if not os.path.exists(backup_path):
            # 尝试解压
            archive_path = f"{backup_path}.tar.gz"
            if os.path.exists(archive_path):
                shutil.unpack_archive(archive_path, backup_path)
            else:
                raise FileNotFoundError(f"备份不存在: {backup_name}")
        
        # 恢复应用
        app_backup = os.path.join(backup_path, 'application')
        if os.path.exists(app_backup):
            self._restore_application(app_backup)
        
        # 恢复数据库
        db_backup = os.path.join(backup_path, 'database.sql')
        if os.path.exists(db_backup):
            self._restore_database(db_backup)
        
        # 恢复上传文件
        uploads_backup = os.path.join(backup_path, 'uploads')
        if os.path.exists(uploads_backup):
            self._restore_uploads(uploads_backup)
    
    def _restore_application(self, app_backup_path):
        """恢复应用代码"""
        # 备份当前代码
        current_backup = f"current_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
        shutil.move('.', current_backup)
        
        # 恢复备份代码
        shutil.copytree(app_backup_path, '.')
    
    def _restore_database(self, db_backup_file):
        """恢复数据库"""
        db_url = os.getenv('DATABASE_URL')
        if not db_url:
            return
        
        if 'postgresql' in db_url:
            cmd = f"psql {db_url} < {db_backup_file}"
            subprocess.run(cmd, shell=True, check=True)
    
    def _restore_uploads(self, uploads_backup_path):
        """恢复上传文件"""
        uploads_dir = 'uploads'
        if os.path.exists(uploads_dir):
            shutil.rmtree(uploads_dir)
        shutil.copytree(uploads_backup_path, uploads_dir)
    
    def list_backups(self):
        """列出所有备份"""
        backup_dir = self.config['backup_dir']
        if not os.path.exists(backup_dir):
            return []
        
        backups = []
        for item in os.listdir(backup_dir):
            item_path = os.path.join(backup_dir, item)
            stat = os.stat(item_path)
            
            backups.append({
                'name': item,
                'size': stat.st_size,
                'created': datetime.fromtimestamp(stat.st_ctime).isoformat(),
                'type': 'archive' if item.endswith('.tar.gz') else 'directory'
            })
        
        return sorted(backups, key=lambda x: x['created'], reverse=True)

11.5 安全配置

11.5.1 SSL/TLS配置

# nginx/ssl.conf
server {
    listen 80;
    server_name yourdomain.com www.yourdomain.com;
    return 301 https://$server_name$request_uri;
}

server {
    listen 443 ssl http2;
    server_name yourdomain.com www.yourdomain.com;
    
    # SSL证书配置
    ssl_certificate /etc/ssl/certs/yourdomain.com.crt;
    ssl_certificate_key /etc/ssl/private/yourdomain.com.key;
    
    # SSL安全配置
    ssl_protocols TLSv1.2 TLSv1.3;
    ssl_ciphers ECDHE-RSA-AES256-GCM-SHA512:DHE-RSA-AES256-GCM-SHA512:ECDHE-RSA-AES256-GCM-SHA384:DHE-RSA-AES256-GCM-SHA384;
    ssl_prefer_server_ciphers off;
    ssl_session_cache shared:SSL:10m;
    ssl_session_timeout 10m;
    
    # HSTS
    add_header Strict-Transport-Security "max-age=31536000; includeSubDomains" always;
    
    # 其他安全头
    add_header X-Frame-Options DENY;
    add_header X-Content-Type-Options nosniff;
    add_header X-XSS-Protection "1; mode=block";
    add_header Referrer-Policy "strict-origin-when-cross-origin";
    
    location / {
        proxy_pass http://flask-app:5000;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;
    }
}

11.5.2 防火墙配置

#!/bin/bash
# scripts/setup_firewall.sh

# 启用UFW
sudo ufw --force enable

# 默认策略
sudo ufw default deny incoming
sudo ufw default allow outgoing

# 允许SSH
sudo ufw allow ssh

# 允许HTTP和HTTPS
sudo ufw allow 80/tcp
sudo ufw allow 443/tcp

# 允许应用端口(仅从本地)
sudo ufw allow from 127.0.0.1 to any port 5000
sudo ufw allow from 127.0.0.1 to any port 6379  # Redis
sudo ufw allow from 127.0.0.1 to any port 5432  # PostgreSQL

# 显示状态
sudo ufw status verbose

11.5.3 安全扫描

# security/scanner.py
import subprocess
import json
import os
from datetime import datetime

class SecurityScanner:
    """安全扫描器"""
    
    def __init__(self):
        self.scan_results = []
    
    def run_full_scan(self):
        """运行完整安全扫描"""
        print("开始安全扫描...")
        
        results = {
            'timestamp': datetime.now().isoformat(),
            'scans': {}
        }
        
        # 依赖漏洞扫描
        results['scans']['dependencies'] = self.scan_dependencies()
        
        # 代码安全扫描
        results['scans']['code'] = self.scan_code()
        
        # 配置安全检查
        results['scans']['config'] = self.check_config_security()
        
        # Docker镜像扫描
        results['scans']['docker'] = self.scan_docker_image()
        
        self.scan_results.append(results)
        return results
    
    def scan_dependencies(self):
        """扫描依赖漏洞"""
        print("扫描依赖漏洞...")
        
        try:
            # 使用safety扫描Python依赖
            result = subprocess.run(
                ['safety', 'check', '--json'],
                capture_output=True, text=True
            )
            
            if result.returncode == 0:
                return {
                    'status': 'clean',
                    'vulnerabilities': []
                }
            else:
                vulnerabilities = json.loads(result.stdout)
                return {
                    'status': 'vulnerabilities_found',
                    'vulnerabilities': vulnerabilities
                }
        except Exception as e:
            return {
                'status': 'error',
                'error': str(e)
            }
    
    def scan_code(self):
        """代码安全扫描"""
        print("扫描代码安全问题...")
        
        try:
            # 使用bandit扫描Python代码
            result = subprocess.run(
                ['bandit', '-r', '.', '-f', 'json'],
                capture_output=True, text=True
            )
            
            if result.stdout:
                bandit_results = json.loads(result.stdout)
                issues = bandit_results.get('results', [])
                
                return {
                    'status': 'completed',
                    'issues_count': len(issues),
                    'high_severity': len([i for i in issues if i['issue_severity'] == 'HIGH']),
                    'medium_severity': len([i for i in issues if i['issue_severity'] == 'MEDIUM']),
                    'low_severity': len([i for i in issues if i['issue_severity'] == 'LOW']),
                    'issues': issues[:10]  # 只返回前10个问题
                }
            else:
                return {'status': 'clean', 'issues_count': 0}
                
        except Exception as e:
            return {
                'status': 'error',
                'error': str(e)
            }
    
    def check_config_security(self):
        """检查配置安全性"""
        print("检查配置安全性...")
        
        issues = []
        
        # 检查环境变量文件
        env_files = ['.env', '.env.production', '.env.local']
        for env_file in env_files:
            if os.path.exists(env_file):
                with open(env_file, 'r') as f:
                    content = f.read()
                    
                # 检查是否有硬编码的密钥
                if 'password' in content.lower() or 'secret' in content.lower():
                    issues.append({
                        'type': 'hardcoded_secrets',
                        'file': env_file,
                        'description': '可能包含硬编码的密钥'
                    })
        
        # 检查DEBUG模式
        if os.getenv('FLASK_DEBUG') == 'True':
            issues.append({
                'type': 'debug_mode',
                'description': '生产环境中启用了DEBUG模式'
            })
        
        # 检查默认密钥
        secret_key = os.getenv('SECRET_KEY')
        if not secret_key or len(secret_key) < 32:
            issues.append({
                'type': 'weak_secret_key',
                'description': 'SECRET_KEY过短或未设置'
            })
        
        return {
            'status': 'completed',
            'issues_count': len(issues),
            'issues': issues
        }
    
    def scan_docker_image(self):
        """扫描Docker镜像"""
        print("扫描Docker镜像...")
        
        try:
            # 使用trivy扫描Docker镜像
            result = subprocess.run(
                ['trivy', 'image', '--format', 'json', 'flask-app:latest'],
                capture_output=True, text=True
            )
            
            if result.stdout:
                trivy_results = json.loads(result.stdout)
                vulnerabilities = []
                
                for result in trivy_results.get('Results', []):
                    vulnerabilities.extend(result.get('Vulnerabilities', []))
                
                return {
                    'status': 'completed',
                    'vulnerabilities_count': len(vulnerabilities),
                    'critical': len([v for v in vulnerabilities if v.get('Severity') == 'CRITICAL']),
                    'high': len([v for v in vulnerabilities if v.get('Severity') == 'HIGH']),
                    'medium': len([v for v in vulnerabilities if v.get('Severity') == 'MEDIUM']),
                    'low': len([v for v in vulnerabilities if v.get('Severity') == 'LOW'])
                }
            else:
                return {'status': 'clean'}
                
        except Exception as e:
            return {
                'status': 'error',
                'error': str(e)
            }
    
    def generate_report(self):
        """生成安全报告"""
        if not self.scan_results:
            return "没有扫描结果"
        
        latest_scan = self.scan_results[-1]
        
        report = f"""
# 安全扫描报告

**扫描时间**: {latest_scan['timestamp']}

## 依赖漏洞扫描
- 状态: {latest_scan['scans']['dependencies']['status']}
- 漏洞数量: {len(latest_scan['scans']['dependencies'].get('vulnerabilities', []))}

## 代码安全扫描
- 状态: {latest_scan['scans']['code']['status']}
- 问题数量: {latest_scan['scans']['code'].get('issues_count', 0)}
- 高危: {latest_scan['scans']['code'].get('high_severity', 0)}
- 中危: {latest_scan['scans']['code'].get('medium_severity', 0)}
- 低危: {latest_scan['scans']['code'].get('low_severity', 0)}

## 配置安全检查
- 问题数量: {latest_scan['scans']['config']['issues_count']}

## Docker镜像扫描
- 状态: {latest_scan['scans']['docker']['status']}
- 漏洞数量: {latest_scan['scans']['docker'].get('vulnerabilities_count', 0)}
- 严重: {latest_scan['scans']['docker'].get('critical', 0)}
- 高危: {latest_scan['scans']['docker'].get('high', 0)}
"""
        
        return report

11.6 故障排除

11.6.1 常见问题诊断

# troubleshooting/diagnostics.py
import subprocess
import psutil
import requests
import os
from datetime import datetime

class SystemDiagnostics:
    """系统诊断工具"""
    
    def __init__(self):
        self.diagnostic_results = {}
    
    def run_diagnostics(self):
        """运行系统诊断"""
        print("开始系统诊断...")
        
        self.diagnostic_results = {
            'timestamp': datetime.now().isoformat(),
            'system': self.check_system_resources(),
            'services': self.check_services(),
            'network': self.check_network(),
            'database': self.check_database(),
            'application': self.check_application(),
            'logs': self.check_logs()
        }
        
        return self.diagnostic_results
    
    def check_system_resources(self):
        """检查系统资源"""
        try:
            cpu_percent = psutil.cpu_percent(interval=1)
            memory = psutil.virtual_memory()
            disk = psutil.disk_usage('/')
            
            return {
                'status': 'ok',
                'cpu_usage': cpu_percent,
                'memory_usage': memory.percent,
                'memory_available': memory.available // (1024**3),  # GB
                'disk_usage': (disk.used / disk.total) * 100,
                'disk_free': disk.free // (1024**3)  # GB
            }
        except Exception as e:
            return {'status': 'error', 'error': str(e)}
    
    def check_services(self):
        """检查服务状态"""
        services = ['postgresql', 'redis', 'nginx']
        service_status = {}
        
        for service in services:
            try:
                result = subprocess.run(
                    ['systemctl', 'is-active', service],
                    capture_output=True, text=True
                )
                service_status[service] = {
                    'active': result.stdout.strip() == 'active',
                    'status': result.stdout.strip()
                }
            except Exception as e:
                service_status[service] = {
                    'active': False,
                    'error': str(e)
                }
        
        return service_status
    
    def check_network(self):
        """检查网络连接"""
        network_checks = {
            'internet': self._check_internet_connection(),
            'dns': self._check_dns_resolution(),
            'ports': self._check_ports()
        }
        
        return network_checks
    
    def _check_internet_connection(self):
        """检查互联网连接"""
        try:
            response = requests.get('https://www.google.com', timeout=5)
            return {'status': 'ok', 'response_time': response.elapsed.total_seconds()}
        except Exception as e:
            return {'status': 'error', 'error': str(e)}
    
    def _check_dns_resolution(self):
        """检查DNS解析"""
        try:
            import socket
            socket.gethostbyname('google.com')
            return {'status': 'ok'}
        except Exception as e:
            return {'status': 'error', 'error': str(e)}
    
    def _check_ports(self):
        """检查端口状态"""
        ports_to_check = [80, 443, 5432, 6379, 5000]
        port_status = {}
        
        for port in ports_to_check:
            try:
                import socket
                sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                sock.settimeout(1)
                result = sock.connect_ex(('localhost', port))
                sock.close()
                
                port_status[port] = {
                    'open': result == 0,
                    'status': 'open' if result == 0 else 'closed'
                }
            except Exception as e:
                port_status[port] = {'status': 'error', 'error': str(e)}
        
        return port_status
    
    def check_database(self):
        """检查数据库连接"""
        try:
            from sqlalchemy import create_engine, text
            
            db_url = os.getenv('DATABASE_URL')
            if not db_url:
                return {'status': 'error', 'error': 'DATABASE_URL not set'}
            
            engine = create_engine(db_url)
            with engine.connect() as conn:
                result = conn.execute(text('SELECT 1'))
                return {'status': 'ok', 'connection': 'successful'}
                
        except Exception as e:
            return {'status': 'error', 'error': str(e)}
    
    def check_application(self):
        """检查应用状态"""
        try:
            # 检查应用健康端点
            response = requests.get('http://localhost:5000/health', timeout=5)
            
            if response.status_code == 200:
                return {
                    'status': 'ok',
                    'response_time': response.elapsed.total_seconds(),
                    'health_data': response.json()
                }
            else:
                return {
                    'status': 'error',
                    'status_code': response.status_code,
                    'response': response.text
                }
        except Exception as e:
            return {'status': 'error', 'error': str(e)}
    
    def check_logs(self):
        """检查日志文件"""
        log_files = ['logs/app.log', 'logs/error.log', '/var/log/nginx/error.log']
        log_status = {}
        
        for log_file in log_files:
            if os.path.exists(log_file):
                try:
                    # 获取最近的错误日志
                    with open(log_file, 'r') as f:
                        lines = f.readlines()
                        recent_errors = [line for line in lines[-100:] 
                                       if 'ERROR' in line or 'error' in line]
                    
                    log_status[log_file] = {
                        'exists': True,
                        'size': os.path.getsize(log_file),
                        'recent_errors': len(recent_errors),
                        'last_errors': recent_errors[-5:] if recent_errors else []
                    }
                except Exception as e:
                    log_status[log_file] = {
                        'exists': True,
                        'error': str(e)
                    }
            else:
                log_status[log_file] = {'exists': False}
        
        return log_status
    
    def generate_diagnostic_report(self):
        """生成诊断报告"""
        if not self.diagnostic_results:
            return "请先运行诊断"
        
        results = self.diagnostic_results
        
        report = f"""
# 系统诊断报告

**诊断时间**: {results['timestamp']}

## 系统资源
- CPU使用率: {results['system'].get('cpu_usage', 'N/A')}%
- 内存使用率: {results['system'].get('memory_usage', 'N/A')}%
- 磁盘使用率: {results['system'].get('disk_usage', 'N/A'):.1f}%

## 服务状态
"""
        
        for service, status in results['services'].items():
            report += f"- {service}: {'运行中' if status.get('active') else '未运行'}\n"
        
        report += f"""

## 网络连接
- 互联网连接: {results['network']['internet']['status']}
- DNS解析: {results['network']['dns']['status']}

## 数据库
- 连接状态: {results['database']['status']}

## 应用状态
- 健康检查: {results['application']['status']}
"""
        
        return report

11.6.2 性能问题排查

# troubleshooting/performance.py
import time
import psutil
import threading
from collections import defaultdict

class PerformanceProfiler:
    """性能分析器"""
    
    def __init__(self):
        self.metrics = defaultdict(list)
        self.monitoring = False
        self.monitor_thread = None
    
    def start_monitoring(self, duration=60):
        """开始性能监控"""
        self.monitoring = True
        self.monitor_thread = threading.Thread(
            target=self._monitor_performance,
            args=(duration,)
        )
        self.monitor_thread.start()
    
    def stop_monitoring(self):
        """停止性能监控"""
        self.monitoring = False
        if self.monitor_thread:
            self.monitor_thread.join()
    
    def _monitor_performance(self, duration):
        """监控性能指标"""
        start_time = time.time()
        
        while self.monitoring and (time.time() - start_time) < duration:
            timestamp = time.time()
            
            # CPU使用率
            cpu_percent = psutil.cpu_percent(interval=1)
            self.metrics['cpu'].append((timestamp, cpu_percent))
            
            # 内存使用
            memory = psutil.virtual_memory()
            self.metrics['memory'].append((timestamp, memory.percent))
            
            # 磁盘I/O
            disk_io = psutil.disk_io_counters()
            if disk_io:
                self.metrics['disk_read'].append((timestamp, disk_io.read_bytes))
                self.metrics['disk_write'].append((timestamp, disk_io.write_bytes))
            
            # 网络I/O
            net_io = psutil.net_io_counters()
            if net_io:
                self.metrics['net_sent'].append((timestamp, net_io.bytes_sent))
                self.metrics['net_recv'].append((timestamp, net_io.bytes_recv))
            
            time.sleep(1)
    
    def analyze_performance(self):
        """分析性能数据"""
        analysis = {}
        
        for metric_name, data in self.metrics.items():
            if not data:
                continue
            
            values = [value for _, value in data]
            
            analysis[metric_name] = {
                'avg': sum(values) / len(values),
                'max': max(values),
                'min': min(values),
                'samples': len(values)
            }
        
        return analysis
    
    def identify_bottlenecks(self):
        """识别性能瓶颈"""
        analysis = self.analyze_performance()
        bottlenecks = []
        
        # CPU瓶颈
        if 'cpu' in analysis and analysis['cpu']['avg'] > 80:
            bottlenecks.append({
                'type': 'cpu',
                'severity': 'high' if analysis['cpu']['avg'] > 90 else 'medium',
                'description': f"CPU使用率过高: 平均{analysis['cpu']['avg']:.1f}%"
            })
        
        # 内存瓶颈
        if 'memory' in analysis and analysis['memory']['avg'] > 80:
            bottlenecks.append({
                'type': 'memory',
                'severity': 'high' if analysis['memory']['avg'] > 90 else 'medium',
                'description': f"内存使用率过高: 平均{analysis['memory']['avg']:.1f}%"
            })
        
        return bottlenecks

11.7 本章小结

技术要点总结

  1. 部署环境配置

    • 生产环境要求和配置
    • 环境变量管理
    • 系统依赖安装
  2. 容器化部署

    • Docker镜像构建和优化
    • Docker Compose编排
    • 多阶段构建策略
  3. 云平台部署

    • AWS部署配置
    • Kubernetes集群部署
    • 自动扩缩容配置
  4. 监控与日志

    • Prometheus指标收集
    • 结构化日志管理
    • 健康检查机制
  5. 运维工具

    • 自动化部署脚本
    • 备份恢复策略
    • 系统监控告警
  6. 安全配置

    • SSL/TLS配置
    • 防火墙设置
    • 安全扫描工具
  7. 故障排除

    • 系统诊断工具
    • 性能分析方法
    • 常见问题解决

部署最佳实践

  1. 自动化优先:使用CI/CD流水线实现自动化部署
  2. 监控完善:建立完整的监控和告警体系
  3. 安全第一:定期进行安全扫描和更新
  4. 备份策略:制定完善的备份和恢复计划
  5. 文档维护:保持部署文档的及时更新

运维要点

  1. 预防性维护:定期检查系统健康状态
  2. 容量规划:根据业务增长规划资源
  3. 故障响应:建立快速故障响应机制
  4. 性能优化:持续监控和优化系统性能
  5. 团队协作:建立开发和运维团队的协作机制

下一章预告

下一章我们将学习Flask项目实战,包括: - 完整项目架构设计 - 业务功能实现 - 前后端集成 - 项目部署上线 - 运维监控实践

练习题

  1. 部署实践:使用Docker部署一个Flask应用到云服务器
  2. 监控配置:配置Prometheus监控Flask应用的关键指标
  3. 安全加固:为Flask应用配置SSL证书和安全头
  4. 备份策略:实现自动化的数据库备份脚本
  5. 故障演练:模拟系统故障并使用诊断工具排查问题