11.1 部署环境配置
11.1.1 生产环境要求
# config/production.py
import os
from datetime import timedelta
class ProductionConfig:
"""生产环境配置"""
# 基础配置
SECRET_KEY = os.environ.get('SECRET_KEY') or 'production-secret-key'
DEBUG = False
TESTING = False
# 数据库配置
SQLALCHEMY_DATABASE_URI = os.environ.get('DATABASE_URL') or \
'postgresql://user:password@localhost/flask_app'
SQLALCHEMY_TRACK_MODIFICATIONS = False
SQLALCHEMY_ENGINE_OPTIONS = {
'pool_size': 20,
'pool_recycle': 3600,
'pool_pre_ping': True,
'max_overflow': 30
}
# Redis配置
REDIS_URL = os.environ.get('REDIS_URL') or 'redis://localhost:6379/0'
# 缓存配置
CACHE_TYPE = 'redis'
CACHE_REDIS_URL = REDIS_URL
CACHE_DEFAULT_TIMEOUT = 300
# 会话配置
SESSION_TYPE = 'redis'
SESSION_REDIS = REDIS_URL
SESSION_PERMANENT = False
SESSION_USE_SIGNER = True
SESSION_KEY_PREFIX = 'flask_app:'
PERMANENT_SESSION_LIFETIME = timedelta(hours=24)
# 邮件配置
MAIL_SERVER = os.environ.get('MAIL_SERVER')
MAIL_PORT = int(os.environ.get('MAIL_PORT') or 587)
MAIL_USE_TLS = os.environ.get('MAIL_USE_TLS', 'true').lower() in ['true', 'on', '1']
MAIL_USERNAME = os.environ.get('MAIL_USERNAME')
MAIL_PASSWORD = os.environ.get('MAIL_PASSWORD')
MAIL_DEFAULT_SENDER = os.environ.get('MAIL_DEFAULT_SENDER')
# Celery配置
CELERY_BROKER_URL = os.environ.get('CELERY_BROKER_URL') or REDIS_URL
CELERY_RESULT_BACKEND = os.environ.get('CELERY_RESULT_BACKEND') or REDIS_URL
# 文件上传配置
UPLOAD_FOLDER = os.environ.get('UPLOAD_FOLDER') or '/var/uploads'
MAX_CONTENT_LENGTH = 16 * 1024 * 1024 # 16MB
# 安全配置
WTF_CSRF_ENABLED = True
WTF_CSRF_TIME_LIMIT = 3600
# 日志配置
LOG_LEVEL = os.environ.get('LOG_LEVEL') or 'INFO'
LOG_FILE = os.environ.get('LOG_FILE') or '/var/log/flask_app.log'
# 监控配置
SENTRY_DSN = os.environ.get('SENTRY_DSN')
# 性能配置
SEND_FILE_MAX_AGE_DEFAULT = timedelta(hours=12)
@staticmethod
def init_app(app):
"""初始化生产环境配置"""
# 配置日志
import logging
from logging.handlers import RotatingFileHandler, SysLogHandler
# 文件日志
if not os.path.exists(os.path.dirname(ProductionConfig.LOG_FILE)):
os.makedirs(os.path.dirname(ProductionConfig.LOG_FILE))
file_handler = RotatingFileHandler(
ProductionConfig.LOG_FILE,
maxBytes=10 * 1024 * 1024, # 10MB
backupCount=10
)
file_handler.setFormatter(logging.Formatter(
'%(asctime)s %(levelname)s: %(message)s [in %(pathname)s:%(lineno)d]'
))
file_handler.setLevel(getattr(logging, ProductionConfig.LOG_LEVEL))
app.logger.addHandler(file_handler)
# 系统日志
syslog_handler = SysLogHandler()
syslog_handler.setLevel(logging.WARNING)
app.logger.addHandler(syslog_handler)
app.logger.setLevel(getattr(logging, ProductionConfig.LOG_LEVEL))
app.logger.info('Flask application startup')
11.1.2 环境变量管理
# .env.production
# 数据库配置
DATABASE_URL=postgresql://username:password@localhost:5432/flask_app
# Redis配置
REDIS_URL=redis://localhost:6379/0
# 应用配置
SECRET_KEY=your-super-secret-key-here
FLASK_ENV=production
FLASK_APP=app.py
# 邮件配置
MAIL_SERVER=smtp.gmail.com
MAIL_PORT=587
MAIL_USE_TLS=true
MAIL_USERNAME=your-email@gmail.com
MAIL_PASSWORD=your-app-password
MAIL_DEFAULT_SENDER=your-email@gmail.com
# Celery配置
CELERY_BROKER_URL=redis://localhost:6379/1
CELERY_RESULT_BACKEND=redis://localhost:6379/1
# 监控配置
SENTRY_DSN=https://your-sentry-dsn@sentry.io/project-id
# 文件上传
UPLOAD_FOLDER=/var/uploads
# 日志配置
LOG_LEVEL=INFO
LOG_FILE=/var/log/flask_app.log
# scripts/env_manager.py
import os
import sys
from pathlib import Path
class EnvironmentManager:
"""环境变量管理器"""
def __init__(self, env_file=None):
self.env_file = env_file or '.env'
self.required_vars = [
'SECRET_KEY',
'DATABASE_URL',
'REDIS_URL'
]
self.optional_vars = {
'MAIL_SERVER': None,
'SENTRY_DSN': None,
'LOG_LEVEL': 'INFO',
'UPLOAD_FOLDER': '/tmp/uploads'
}
def load_env(self):
"""加载环境变量"""
if os.path.exists(self.env_file):
with open(self.env_file, 'r') as f:
for line in f:
line = line.strip()
if line and not line.startswith('#'):
key, value = line.split('=', 1)
os.environ[key] = value
def validate_env(self):
"""验证环境变量"""
missing_vars = []
for var in self.required_vars:
if not os.environ.get(var):
missing_vars.append(var)
if missing_vars:
print(f"错误: 缺少必需的环境变量: {', '.join(missing_vars)}")
sys.exit(1)
# 设置可选变量的默认值
for var, default in self.optional_vars.items():
if not os.environ.get(var) and default:
os.environ[var] = default
def generate_env_template(self, output_file='.env.template'):
"""生成环境变量模板"""
template_content = [
'# Flask应用环境变量配置',
'# 复制此文件为.env并填入实际值',
'',
'# 必需变量'
]
for var in self.required_vars:
template_content.append(f'{var}=your-{var.lower().replace("_", "-")}-here')
template_content.extend([
'',
'# 可选变量'
])
for var, default in self.optional_vars.items():
value = default or f'your-{var.lower().replace("_", "-")}-here'
template_content.append(f'{var}={value}')
with open(output_file, 'w') as f:
f.write('\n'.join(template_content))
print(f"环境变量模板已生成: {output_file}")
def check_security(self):
"""检查安全配置"""
warnings = []
# 检查SECRET_KEY
secret_key = os.environ.get('SECRET_KEY')
if not secret_key or len(secret_key) < 32:
warnings.append('SECRET_KEY应该至少32个字符长')
if secret_key and secret_key in ['dev', 'development', 'test']:
warnings.append('SECRET_KEY不应使用默认值')
# 检查数据库URL
db_url = os.environ.get('DATABASE_URL')
if db_url and 'localhost' in db_url:
warnings.append('生产环境不应使用localhost数据库')
# 检查调试模式
if os.environ.get('FLASK_ENV') == 'development':
warnings.append('生产环境不应启用调试模式')
if warnings:
print('安全警告:')
for warning in warnings:
print(f' - {warning}')
else:
print('安全检查通过')
return len(warnings) == 0
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(description='环境变量管理工具')
parser.add_argument('--validate', action='store_true', help='验证环境变量')
parser.add_argument('--template', action='store_true', help='生成环境变量模板')
parser.add_argument('--security', action='store_true', help='安全检查')
parser.add_argument('--env-file', default='.env', help='环境变量文件路径')
args = parser.parse_args()
manager = EnvironmentManager(args.env_file)
if args.template:
manager.generate_env_template()
if args.validate or args.security:
manager.load_env()
if args.validate:
manager.validate_env()
print('环境变量验证通过')
if args.security:
manager.check_security()
11.1.3 系统依赖安装
#!/bin/bash
# scripts/install_system_deps.sh
# 更新系统包
sudo apt-get update
sudo apt-get upgrade -y
# 安装基础依赖
sudo apt-get install -y \
python3 \
python3-pip \
python3-venv \
python3-dev \
build-essential \
libpq-dev \
libssl-dev \
libffi-dev \
nginx \
supervisor \
redis-server \
postgresql \
postgresql-contrib \
git \
curl \
wget \
unzip
# 安装Node.js (用于前端构建)
curl -fsSL https://deb.nodesource.com/setup_18.x | sudo -E bash -
sudo apt-get install -y nodejs
# 配置PostgreSQL
sudo -u postgres createuser --interactive
sudo -u postgres createdb flask_app
# 配置Redis
sudo systemctl enable redis-server
sudo systemctl start redis-server
# 配置防火墙
sudo ufw allow 22
sudo ufw allow 80
sudo ufw allow 443
sudo ufw --force enable
echo "系统依赖安装完成"
# scripts/setup_app.py
import os
import subprocess
import sys
from pathlib import Path
class AppSetup:
"""应用部署设置"""
def __init__(self, app_dir='/var/www/flask_app'):
self.app_dir = Path(app_dir)
self.venv_dir = self.app_dir / 'venv'
self.user = 'www-data'
self.group = 'www-data'
def create_directories(self):
"""创建必要的目录"""
directories = [
self.app_dir,
self.app_dir / 'logs',
self.app_dir / 'uploads',
self.app_dir / 'static',
self.app_dir / 'instance',
Path('/var/log/flask_app')
]
for directory in directories:
directory.mkdir(parents=True, exist_ok=True)
print(f"创建目录: {directory}")
def setup_virtual_environment(self):
"""设置虚拟环境"""
if not self.venv_dir.exists():
subprocess.run([
sys.executable, '-m', 'venv', str(self.venv_dir)
], check=True)
print(f"创建虚拟环境: {self.venv_dir}")
# 升级pip
pip_path = self.venv_dir / 'bin' / 'pip'
subprocess.run([
str(pip_path), 'install', '--upgrade', 'pip'
], check=True)
# 安装依赖
if (self.app_dir / 'requirements.txt').exists():
subprocess.run([
str(pip_path), 'install', '-r',
str(self.app_dir / 'requirements.txt')
], check=True)
print("安装Python依赖完成")
def set_permissions(self):
"""设置文件权限"""
# 设置应用目录所有者
subprocess.run([
'sudo', 'chown', '-R', f'{self.user}:{self.group}',
str(self.app_dir)
], check=True)
# 设置日志目录权限
subprocess.run([
'sudo', 'chown', '-R', f'{self.user}:{self.group}',
'/var/log/flask_app'
], check=True)
# 设置上传目录权限
subprocess.run([
'sudo', 'chmod', '755', str(self.app_dir / 'uploads')
], check=True)
print("文件权限设置完成")
def create_systemd_service(self):
"""创建systemd服务文件"""
service_content = f"""[Unit]
Description=Flask App
After=network.target
[Service]
User={self.user}
Group={self.group}
WorkingDirectory={self.app_dir}
Environment=PATH={self.venv_dir}/bin
EnvironmentFile={self.app_dir}/.env
ExecStart={self.venv_dir}/bin/gunicorn --bind 127.0.0.1:5000 --workers 4 app:app
Restart=always
[Install]
WantedBy=multi-user.target
"""
service_file = Path('/etc/systemd/system/flask-app.service')
with open(service_file, 'w') as f:
f.write(service_content)
# 重新加载systemd并启用服务
subprocess.run(['sudo', 'systemctl', 'daemon-reload'], check=True)
subprocess.run(['sudo', 'systemctl', 'enable', 'flask-app'], check=True)
print("Systemd服务创建完成")
def setup_nginx(self):
"""配置Nginx"""
nginx_config = f"""server {{
listen 80;
server_name your-domain.com;
location / {{
proxy_pass http://127.0.0.1:5000;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}}
location /static {{
alias {self.app_dir}/static;
expires 1y;
add_header Cache-Control "public, immutable";
}}
location /uploads {{
alias {self.app_dir}/uploads;
expires 1d;
}}
}}
"""
nginx_file = Path('/etc/nginx/sites-available/flask-app')
with open(nginx_file, 'w') as f:
f.write(nginx_config)
# 启用站点
sites_enabled = Path('/etc/nginx/sites-enabled/flask-app')
if not sites_enabled.exists():
sites_enabled.symlink_to(nginx_file)
# 测试配置并重启Nginx
subprocess.run(['sudo', 'nginx', '-t'], check=True)
subprocess.run(['sudo', 'systemctl', 'restart', 'nginx'], check=True)
print("Nginx配置完成")
def run_setup(self):
"""运行完整设置"""
print("开始应用部署设置...")
self.create_directories()
self.setup_virtual_environment()
self.set_permissions()
self.create_systemd_service()
self.setup_nginx()
print("应用部署设置完成!")
print(f"应用目录: {self.app_dir}")
print("启动服务: sudo systemctl start flask-app")
print("查看状态: sudo systemctl status flask-app")
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(description='Flask应用部署设置')
parser.add_argument('--app-dir', default='/var/www/flask_app',
help='应用目录路径')
args = parser.parse_args()
setup = AppSetup(args.app_dir)
setup.run_setup()
11.2 Docker容器化
11.2.1 Dockerfile配置
# Dockerfile
FROM python:3.11-slim
# 设置工作目录
WORKDIR /app
# 设置环境变量
ENV PYTHONDONTWRITEBYTECODE=1 \
PYTHONUNBUFFERED=1 \
PIP_NO_CACHE_DIR=1 \
PIP_DISABLE_PIP_VERSION_CHECK=1
# 安装系统依赖
RUN apt-get update && apt-get install -y \
build-essential \
libpq-dev \
curl \
&& rm -rf /var/lib/apt/lists/*
# 创建非root用户
RUN groupadd -r appuser && useradd -r -g appuser appuser
# 复制requirements文件
COPY requirements.txt .
# 安装Python依赖
RUN pip install --no-cache-dir -r requirements.txt
# 复制应用代码
COPY . .
# 创建必要的目录
RUN mkdir -p /app/logs /app/uploads /app/instance && \
chown -R appuser:appuser /app
# 切换到非root用户
USER appuser
# 暴露端口
EXPOSE 5000
# 健康检查
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD curl -f http://localhost:5000/health || exit 1
# 启动命令
CMD ["gunicorn", "--bind", "0.0.0.0:5000", "--workers", "4", "--timeout", "120", "app:app"]
# Dockerfile.multi-stage
# 多阶段构建Dockerfile
# 构建阶段
FROM python:3.11-slim as builder
WORKDIR /app
# 安装构建依赖
RUN apt-get update && apt-get install -y \
build-essential \
libpq-dev \
&& rm -rf /var/lib/apt/lists/*
# 复制requirements并安装依赖
COPY requirements.txt .
RUN pip install --user --no-cache-dir -r requirements.txt
# 运行阶段
FROM python:3.11-slim
WORKDIR /app
# 安装运行时依赖
RUN apt-get update && apt-get install -y \
libpq5 \
curl \
&& rm -rf /var/lib/apt/lists/*
# 创建用户
RUN groupadd -r appuser && useradd -r -g appuser appuser
# 从构建阶段复制Python包
COPY --from=builder /root/.local /home/appuser/.local
# 复制应用代码
COPY --chown=appuser:appuser . .
# 创建目录
RUN mkdir -p logs uploads instance && \
chown -R appuser:appuser /app
# 设置PATH
ENV PATH=/home/appuser/.local/bin:$PATH
USER appuser
EXPOSE 5000
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD curl -f http://localhost:5000/health || exit 1
CMD ["gunicorn", "--bind", "0.0.0.0:5000", "--workers", "4", "app:app"]
11.2.2 Docker Compose配置
# docker-compose.yml
version: '3.8'
services:
web:
build: .
ports:
- "5000:5000"
environment:
- FLASK_ENV=production
- DATABASE_URL=postgresql://postgres:password@db:5432/flask_app
- REDIS_URL=redis://redis:6379/0
- CELERY_BROKER_URL=redis://redis:6379/1
- CELERY_RESULT_BACKEND=redis://redis:6379/1
volumes:
- ./uploads:/app/uploads
- ./logs:/app/logs
depends_on:
- db
- redis
restart: unless-stopped
networks:
- app-network
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:5000/health"]
interval: 30s
timeout: 10s
retries: 3
start_period: 40s
worker:
build: .
command: celery -A app.celery worker --loglevel=info
environment:
- FLASK_ENV=production
- DATABASE_URL=postgresql://postgres:password@db:5432/flask_app
- REDIS_URL=redis://redis:6379/0
- CELERY_BROKER_URL=redis://redis:6379/1
- CELERY_RESULT_BACKEND=redis://redis:6379/1
volumes:
- ./uploads:/app/uploads
- ./logs:/app/logs
depends_on:
- db
- redis
restart: unless-stopped
networks:
- app-network
beat:
build: .
command: celery -A app.celery beat --loglevel=info
environment:
- FLASK_ENV=production
- DATABASE_URL=postgresql://postgres:password@db:5432/flask_app
- REDIS_URL=redis://redis:6379/0
- CELERY_BROKER_URL=redis://redis:6379/1
- CELERY_RESULT_BACKEND=redis://redis:6379/1
volumes:
- ./logs:/app/logs
depends_on:
- db
- redis
restart: unless-stopped
networks:
- app-network
db:
image: postgres:13
environment:
- POSTGRES_DB=flask_app
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=password
volumes:
- postgres_data:/var/lib/postgresql/data
- ./init.sql:/docker-entrypoint-initdb.d/init.sql
ports:
- "5432:5432"
restart: unless-stopped
networks:
- app-network
healthcheck:
test: ["CMD-SHELL", "pg_isready -U postgres"]
interval: 10s
timeout: 5s
retries: 5
redis:
image: redis:6-alpine
ports:
- "6379:6379"
volumes:
- redis_data:/data
restart: unless-stopped
networks:
- app-network
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 10s
timeout: 5s
retries: 5
nginx:
image: nginx:alpine
ports:
- "80:80"
- "443:443"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf
- ./ssl:/etc/nginx/ssl
- ./uploads:/var/www/uploads
depends_on:
- web
restart: unless-stopped
networks:
- app-network
volumes:
postgres_data:
redis_data:
networks:
app-network:
driver: bridge
# docker-compose.prod.yml
# 生产环境Docker Compose配置
version: '3.8'
services:
web:
build:
context: .
dockerfile: Dockerfile.multi-stage
environment:
- FLASK_ENV=production
- DATABASE_URL=postgresql://postgres:${POSTGRES_PASSWORD}@db:5432/flask_app
- REDIS_URL=redis://redis:6379/0
- SECRET_KEY=${SECRET_KEY}
- SENTRY_DSN=${SENTRY_DSN}
env_file:
- .env.production
volumes:
- uploads_data:/app/uploads
- logs_data:/app/logs
deploy:
replicas: 3
resources:
limits:
cpus: '0.5'
memory: 512M
reservations:
cpus: '0.25'
memory: 256M
restart_policy:
condition: on-failure
delay: 5s
max_attempts: 3
depends_on:
- db
- redis
networks:
- app-network
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:5000/health"]
interval: 30s
timeout: 10s
retries: 3
start_period: 40s
worker:
build:
context: .
dockerfile: Dockerfile.multi-stage
command: celery -A app.celery worker --loglevel=info --concurrency=4
environment:
- FLASK_ENV=production
- DATABASE_URL=postgresql://postgres:${POSTGRES_PASSWORD}@db:5432/flask_app
- REDIS_URL=redis://redis:6379/0
env_file:
- .env.production
volumes:
- uploads_data:/app/uploads
- logs_data:/app/logs
deploy:
replicas: 2
resources:
limits:
cpus: '0.5'
memory: 512M
depends_on:
- db
- redis
networks:
- app-network
db:
image: postgres:13
environment:
- POSTGRES_DB=flask_app
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=${POSTGRES_PASSWORD}
volumes:
- postgres_data:/var/lib/postgresql/data
- ./backups:/backups
deploy:
resources:
limits:
cpus: '1'
memory: 1G
networks:
- app-network
healthcheck:
test: ["CMD-SHELL", "pg_isready -U postgres"]
interval: 10s
timeout: 5s
retries: 5
redis:
image: redis:6-alpine
command: redis-server --appendonly yes --maxmemory 256mb --maxmemory-policy allkeys-lru
volumes:
- redis_data:/data
deploy:
resources:
limits:
cpus: '0.5'
memory: 512M
networks:
- app-network
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 10s
timeout: 5s
retries: 5
nginx:
image: nginx:alpine
ports:
- "80:80"
- "443:443"
volumes:
- ./nginx/nginx.conf:/etc/nginx/nginx.conf
- ./nginx/ssl:/etc/nginx/ssl
- uploads_data:/var/www/uploads
deploy:
resources:
limits:
cpus: '0.25'
memory: 128M
depends_on:
- web
networks:
- app-network
volumes:
postgres_data:
redis_data:
uploads_data:
logs_data:
networks:
app-network:
driver: overlay
attachable: true
11.2.3 容器管理脚本
# scripts/docker_manager.py
import subprocess
import sys
import time
import json
from pathlib import Path
class DockerManager:
"""Docker容器管理器"""
def __init__(self, compose_file='docker-compose.yml'):
self.compose_file = compose_file
self.project_name = 'flask-app'
def run_command(self, command, check=True):
"""执行命令"""
print(f"执行命令: {' '.join(command)}")
result = subprocess.run(command, capture_output=True, text=True)
if check and result.returncode != 0:
print(f"命令执行失败: {result.stderr}")
sys.exit(1)
return result
def build(self, no_cache=False):
"""构建镜像"""
command = ['docker-compose', '-f', self.compose_file, 'build']
if no_cache:
command.append('--no-cache')
self.run_command(command)
print("镜像构建完成")
def up(self, detach=True, scale=None):
"""启动服务"""
command = ['docker-compose', '-f', self.compose_file, 'up']
if detach:
command.append('-d')
if scale:
for service, count in scale.items():
command.extend(['--scale', f'{service}={count}'])
self.run_command(command)
print("服务启动完成")
def down(self, volumes=False):
"""停止服务"""
command = ['docker-compose', '-f', self.compose_file, 'down']
if volumes:
command.append('-v')
self.run_command(command)
print("服务停止完成")
def restart(self, service=None):
"""重启服务"""
command = ['docker-compose', '-f', self.compose_file, 'restart']
if service:
command.append(service)
self.run_command(command)
print(f"服务重启完成: {service or '所有服务'}")
def logs(self, service=None, follow=False, tail=100):
"""查看日志"""
command = ['docker-compose', '-f', self.compose_file, 'logs']
if follow:
command.append('-f')
if tail:
command.extend(['--tail', str(tail)])
if service:
command.append(service)
self.run_command(command, check=False)
def exec_command(self, service, command):
"""在容器中执行命令"""
exec_cmd = ['docker-compose', '-f', self.compose_file, 'exec', service] + command
self.run_command(exec_cmd, check=False)
def status(self):
"""查看服务状态"""
result = self.run_command([
'docker-compose', '-f', self.compose_file, 'ps', '--format', 'json'
])
try:
services = json.loads(result.stdout)
print("服务状态:")
for service in services:
print(f" {service['Service']}: {service['State']} ({service['Status']})")
except json.JSONDecodeError:
print("无法解析服务状态")
def health_check(self):
"""健康检查"""
result = self.run_command([
'docker-compose', '-f', self.compose_file, 'ps', '--format', 'json'
])
try:
services = json.loads(result.stdout)
healthy_services = []
unhealthy_services = []
for service in services:
if 'healthy' in service['Status'].lower():
healthy_services.append(service['Service'])
elif 'unhealthy' in service['Status'].lower():
unhealthy_services.append(service['Service'])
print(f"健康服务: {', '.join(healthy_services) or '无'}")
print(f"不健康服务: {', '.join(unhealthy_services) or '无'}")
return len(unhealthy_services) == 0
except json.JSONDecodeError:
print("无法检查服务健康状态")
return False
def backup_data(self, backup_dir='./backups'):
"""备份数据"""
backup_path = Path(backup_dir)
backup_path.mkdir(exist_ok=True)
timestamp = time.strftime('%Y%m%d_%H%M%S')
# 备份数据库
db_backup = backup_path / f'db_backup_{timestamp}.sql'
self.run_command([
'docker-compose', '-f', self.compose_file, 'exec', '-T', 'db',
'pg_dump', '-U', 'postgres', 'flask_app'
])
# 备份上传文件
uploads_backup = backup_path / f'uploads_backup_{timestamp}.tar.gz'
self.run_command([
'docker', 'run', '--rm', '-v', 'flask-app_uploads_data:/data',
'-v', f'{backup_path.absolute()}:/backup',
'alpine', 'tar', 'czf', f'/backup/uploads_backup_{timestamp}.tar.gz', '/data'
])
print(f"备份完成: {backup_path}")
def deploy(self, environment='production'):
"""部署应用"""
print(f"开始部署到 {environment} 环境...")
# 构建镜像
self.build(no_cache=True)
# 停止旧服务
self.down()
# 启动新服务
if environment == 'production':
self.up(scale={'web': 3, 'worker': 2})
else:
self.up()
# 等待服务启动
print("等待服务启动...")
time.sleep(30)
# 健康检查
if self.health_check():
print("部署成功!")
else:
print("部署可能存在问题,请检查服务状态")
self.logs(tail=50)
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(description='Docker容器管理工具')
parser.add_argument('--compose-file', '-f', default='docker-compose.yml',
help='Docker Compose文件路径')
subparsers = parser.add_subparsers(dest='command', help='可用命令')
# build命令
build_parser = subparsers.add_parser('build', help='构建镜像')
build_parser.add_argument('--no-cache', action='store_true', help='不使用缓存')
# up命令
up_parser = subparsers.add_parser('up', help='启动服务')
up_parser.add_argument('--no-detach', action='store_true', help='前台运行')
# down命令
down_parser = subparsers.add_parser('down', help='停止服务')
down_parser.add_argument('--volumes', action='store_true', help='删除数据卷')
# restart命令
restart_parser = subparsers.add_parser('restart', help='重启服务')
restart_parser.add_argument('service', nargs='?', help='服务名称')
# logs命令
logs_parser = subparsers.add_parser('logs', help='查看日志')
logs_parser.add_argument('service', nargs='?', help='服务名称')
logs_parser.add_argument('-f', '--follow', action='store_true', help='跟踪日志')
logs_parser.add_argument('--tail', type=int, default=100, help='显示最后N行')
# exec命令
exec_parser = subparsers.add_parser('exec', help='执行命令')
exec_parser.add_argument('service', help='服务名称')
exec_parser.add_argument('command', nargs='+', help='要执行的命令')
# status命令
subparsers.add_parser('status', help='查看状态')
# health命令
subparsers.add_parser('health', help='健康检查')
# backup命令
backup_parser = subparsers.add_parser('backup', help='备份数据')
backup_parser.add_argument('--dir', default='./backups', help='备份目录')
# deploy命令
deploy_parser = subparsers.add_parser('deploy', help='部署应用')
deploy_parser.add_argument('--env', default='production', help='部署环境')
args = parser.parse_args()
if not args.command:
parser.print_help()
sys.exit(1)
manager = DockerManager(args.compose_file)
if args.command == 'build':
manager.build(no_cache=args.no_cache)
elif args.command == 'up':
manager.up(detach=not args.no_detach)
elif args.command == 'down':
manager.down(volumes=args.volumes)
elif args.command == 'restart':
manager.restart(args.service)
elif args.command == 'logs':
manager.logs(args.service, follow=args.follow, tail=args.tail)
elif args.command == 'exec':
manager.exec_command(args.service, args.command)
elif args.command == 'status':
manager.status()
elif args.command == 'health':
manager.health_check()
elif args.command == 'backup':
manager.backup_data(args.dir)
elif args.command == 'deploy':
manager.deploy(args.env)
11.3 云平台部署
11.3.1 AWS部署
# scripts/aws_deploy.py
import boto3
import json
import time
from pathlib import Path
class AWSDeployer:
"""AWS部署管理器"""
def __init__(self, region='us-west-2'):
self.region = region
self.ec2 = boto3.client('ec2', region_name=region)
self.ecs = boto3.client('ecs', region_name=region)
self.ecr = boto3.client('ecr', region_name=region)
self.rds = boto3.client('rds', region_name=region)
self.elasticache = boto3.client('elasticache', region_name=region)
self.elbv2 = boto3.client('elbv2', region_name=region)
def create_vpc_infrastructure(self):
"""创建VPC基础设施"""
# 创建VPC
vpc_response = self.ec2.create_vpc(
CidrBlock='10.0.0.0/16',
TagSpecifications=[
{
'ResourceType': 'vpc',
'Tags': [
{'Key': 'Name', 'Value': 'flask-app-vpc'}
]
}
]
)
vpc_id = vpc_response['Vpc']['VpcId']
# 创建子网
public_subnet = self.ec2.create_subnet(
VpcId=vpc_id,
CidrBlock='10.0.1.0/24',
AvailabilityZone=f'{self.region}a'
)
private_subnet = self.ec2.create_subnet(
VpcId=vpc_id,
CidrBlock='10.0.2.0/24',
AvailabilityZone=f'{self.region}b'
)
# 创建互联网网关
igw_response = self.ec2.create_internet_gateway()
igw_id = igw_response['InternetGateway']['InternetGatewayId']
# 附加到VPC
self.ec2.attach_internet_gateway(
InternetGatewayId=igw_id,
VpcId=vpc_id
)
print(f"VPC基础设施创建完成: {vpc_id}")
return {
'vpc_id': vpc_id,
'public_subnet_id': public_subnet['Subnet']['SubnetId'],
'private_subnet_id': private_subnet['Subnet']['SubnetId'],
'igw_id': igw_id
}
def create_rds_instance(self, vpc_id, subnet_ids):
"""创建RDS数据库实例"""
# 创建数据库子网组
subnet_group_response = self.rds.create_db_subnet_group(
DBSubnetGroupName='flask-app-subnet-group',
DBSubnetGroupDescription='Flask App Database Subnet Group',
SubnetIds=subnet_ids
)
# 创建安全组
security_group = self.ec2.create_security_group(
GroupName='flask-app-db-sg',
Description='Flask App Database Security Group',
VpcId=vpc_id
)
sg_id = security_group['GroupId']
# 添加入站规则
self.ec2.authorize_security_group_ingress(
GroupId=sg_id,
IpPermissions=[
{
'IpProtocol': 'tcp',
'FromPort': 5432,
'ToPort': 5432,
'IpRanges': [{'CidrIp': '10.0.0.0/16'}]
}
]
)
# 创建RDS实例
db_response = self.rds.create_db_instance(
DBInstanceIdentifier='flask-app-db',
DBInstanceClass='db.t3.micro',
Engine='postgres',
MasterUsername='postgres',
MasterUserPassword='your-secure-password',
AllocatedStorage=20,
DBSubnetGroupName='flask-app-subnet-group',
VpcSecurityGroupIds=[sg_id],
BackupRetentionPeriod=7,
MultiAZ=False,
StorageEncrypted=True
)
print("RDS实例创建中...")
return db_response['DBInstance']['DBInstanceIdentifier']
def create_elasticache_cluster(self, vpc_id, subnet_ids):
"""创建ElastiCache Redis集群"""
# 创建缓存子网组
subnet_group_response = self.elasticache.create_cache_subnet_group(
CacheSubnetGroupName='flask-app-cache-subnet-group',
CacheSubnetGroupDescription='Flask App Cache Subnet Group',
SubnetIds=subnet_ids
)
# 创建安全组
security_group = self.ec2.create_security_group(
GroupName='flask-app-cache-sg',
Description='Flask App Cache Security Group',
VpcId=vpc_id
)
sg_id = security_group['GroupId']
# 添加入站规则
self.ec2.authorize_security_group_ingress(
GroupId=sg_id,
IpPermissions=[
{
'IpProtocol': 'tcp',
'FromPort': 6379,
'ToPort': 6379,
'IpRanges': [{'CidrIp': '10.0.0.0/16'}]
}
]
)
# 创建Redis集群
cache_response = self.elasticache.create_cache_cluster(
CacheClusterId='flask-app-redis',
CacheNodeType='cache.t3.micro',
Engine='redis',
NumCacheNodes=1,
CacheSubnetGroupName='flask-app-cache-subnet-group',
SecurityGroupIds=[sg_id]
)
print("ElastiCache集群创建中...")
return cache_response['CacheCluster']['CacheClusterId']
def create_ecs_cluster(self):
"""创建ECS集群"""
cluster_response = self.ecs.create_cluster(
clusterName='flask-app-cluster',
capacityProviders=['FARGATE'],
defaultCapacityProviderStrategy=[
{
'capacityProvider': 'FARGATE',
'weight': 1
}
]
)
print(f"ECS集群创建完成: {cluster_response['cluster']['clusterName']}")
return cluster_response['cluster']['clusterArn']
def create_ecr_repository(self):
"""创建ECR仓库"""
try:
repo_response = self.ecr.create_repository(
repositoryName='flask-app',
imageScanningConfiguration={'scanOnPush': True},
encryptionConfiguration={'encryptionType': 'AES256'}
)
repo_uri = repo_response['repository']['repositoryUri']
print(f"ECR仓库创建完成: {repo_uri}")
return repo_uri
except self.ecr.exceptions.RepositoryAlreadyExistsException:
# 仓库已存在,获取URI
repo_response = self.ecr.describe_repositories(
repositoryNames=['flask-app']
)
repo_uri = repo_response['repositories'][0]['repositoryUri']
print(f"使用现有ECR仓库: {repo_uri}")
return repo_uri
def create_task_definition(self, image_uri, db_endpoint, cache_endpoint):
"""创建ECS任务定义"""
task_definition = {
'family': 'flask-app-task',
'networkMode': 'awsvpc',
'requiresCompatibilities': ['FARGATE'],
'cpu': '256',
'memory': '512',
'executionRoleArn': 'arn:aws:iam::YOUR_ACCOUNT:role/ecsTaskExecutionRole',
'containerDefinitions': [
{
'name': 'flask-app',
'image': image_uri,
'portMappings': [
{
'containerPort': 5000,
'protocol': 'tcp'
}
],
'environment': [
{'name': 'FLASK_ENV', 'value': 'production'},
{'name': 'DATABASE_URL', 'value': f'postgresql://postgres:password@{db_endpoint}:5432/flask_app'},
{'name': 'REDIS_URL', 'value': f'redis://{cache_endpoint}:6379/0'}
],
'logConfiguration': {
'logDriver': 'awslogs',
'options': {
'awslogs-group': '/ecs/flask-app',
'awslogs-region': self.region,
'awslogs-stream-prefix': 'ecs'
}
},
'healthCheck': {
'command': ['CMD-SHELL', 'curl -f http://localhost:5000/health || exit 1'],
'interval': 30,
'timeout': 5,
'retries': 3,
'startPeriod': 60
}
}
]
}
response = self.ecs.register_task_definition(**task_definition)
print(f"任务定义创建完成: {response['taskDefinition']['taskDefinitionArn']}")
return response['taskDefinition']['taskDefinitionArn']
def create_load_balancer(self, vpc_id, subnet_ids):
"""创建应用负载均衡器"""
# 创建安全组
security_group = self.ec2.create_security_group(
GroupName='flask-app-alb-sg',
Description='Flask App ALB Security Group',
VpcId=vpc_id
)
sg_id = security_group['GroupId']
# 添加入站规则
self.ec2.authorize_security_group_ingress(
GroupId=sg_id,
IpPermissions=[
{
'IpProtocol': 'tcp',
'FromPort': 80,
'ToPort': 80,
'IpRanges': [{'CidrIp': '0.0.0.0/0'}]
},
{
'IpProtocol': 'tcp',
'FromPort': 443,
'ToPort': 443,
'IpRanges': [{'CidrIp': '0.0.0.0/0'}]
}
]
)
# 创建负载均衡器
alb_response = self.elbv2.create_load_balancer(
Name='flask-app-alb',
Subnets=subnet_ids,
SecurityGroups=[sg_id],
Scheme='internet-facing',
Type='application',
IpAddressType='ipv4'
)
alb_arn = alb_response['LoadBalancers'][0]['LoadBalancerArn']
alb_dns = alb_response['LoadBalancers'][0]['DNSName']
print(f"负载均衡器创建完成: {alb_dns}")
return alb_arn, alb_dns
def deploy_full_stack(self):
"""部署完整的应用栈"""
print("开始AWS全栈部署...")
# 1. 创建VPC基础设施
vpc_info = self.create_vpc_infrastructure()
# 2. 创建ECR仓库
ecr_uri = self.create_ecr_repository()
# 3. 创建RDS实例
db_id = self.create_rds_instance(
vpc_info['vpc_id'],
[vpc_info['private_subnet_id']]
)
# 4. 创建ElastiCache集群
cache_id = self.create_elasticache_cluster(
vpc_info['vpc_id'],
[vpc_info['private_subnet_id']]
)
# 5. 创建ECS集群
cluster_arn = self.create_ecs_cluster()
# 6. 创建负载均衡器
alb_arn, alb_dns = self.create_load_balancer(
vpc_info['vpc_id'],
[vpc_info['public_subnet_id']]
)
print("\n部署信息:")
print(f"VPC ID: {vpc_info['vpc_id']}")
print(f"ECR URI: {ecr_uri}")
print(f"数据库ID: {db_id}")
print(f"缓存ID: {cache_id}")
print(f"ECS集群: {cluster_arn}")
print(f"负载均衡器: {alb_dns}")
return {
'vpc_info': vpc_info,
'ecr_uri': ecr_uri,
'db_id': db_id,
'cache_id': cache_id,
'cluster_arn': cluster_arn,
'alb_arn': alb_arn,
'alb_dns': alb_dns
}
if __name__ == '__main__':
deployer = AWSDeployer()
deployment_info = deployer.deploy_full_stack()
# 保存部署信息
with open('aws_deployment_info.json', 'w') as f:
json.dump(deployment_info, f, indent=2, default=str)
print("\n部署信息已保存到 aws_deployment_info.json")
11.3.2 Kubernetes部署
# k8s/namespace.yaml
apiVersion: v1
kind: Namespace
metadata:
name: flask-app
labels:
name: flask-app
---
# k8s/configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: flask-app-config
namespace: flask-app
data:
FLASK_ENV: "production"
REDIS_URL: "redis://redis-service:6379/0"
CELERY_BROKER_URL: "redis://redis-service:6379/1"
CELERY_RESULT_BACKEND: "redis://redis-service:6379/1"
---
# k8s/secret.yaml
apiVersion: v1
kind: Secret
metadata:
name: flask-app-secret
namespace: flask-app
type: Opaque
data:
SECRET_KEY: eW91ci1zdXBlci1zZWNyZXQta2V5LWhlcmU= # base64编码
DATABASE_URL: cG9zdGdyZXNxbDovL3Bvc3RncmVzOnBhc3N3b3JkQHBvc3RncmVzLXNlcnZpY2U6NTQzMi9mbGFza19hcHA=
MAIL_PASSWORD: eW91ci1tYWlsLXBhc3N3b3JkLWhlcmU=
---
# k8s/postgres.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: postgres
namespace: flask-app
spec:
replicas: 1
selector:
matchLabels:
app: postgres
template:
metadata:
labels:
app: postgres
spec:
containers:
- name: postgres
image: postgres:13
env:
- name: POSTGRES_DB
value: "flask_app"
- name: POSTGRES_USER
value: "postgres"
- name: POSTGRES_PASSWORD
value: "password"
ports:
- containerPort: 5432
volumeMounts:
- name: postgres-storage
mountPath: /var/lib/postgresql/data
resources:
requests:
memory: "256Mi"
cpu: "250m"
limits:
memory: "512Mi"
cpu: "500m"
volumes:
- name: postgres-storage
persistentVolumeClaim:
claimName: postgres-pvc
---
apiVersion: v1
kind: Service
metadata:
name: postgres-service
namespace: flask-app
spec:
selector:
app: postgres
ports:
- port: 5432
targetPort: 5432
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: postgres-pvc
namespace: flask-app
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 10Gi
---
# k8s/redis.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: redis
namespace: flask-app
spec:
replicas: 1
selector:
matchLabels:
app: redis
template:
metadata:
labels:
app: redis
spec:
containers:
- name: redis
image: redis:6-alpine
command: ["redis-server"]
args: ["--appendonly", "yes", "--maxmemory", "256mb", "--maxmemory-policy", "allkeys-lru"]
ports:
- containerPort: 6379
volumeMounts:
- name: redis-storage
mountPath: /data
resources:
requests:
memory: "128Mi"
cpu: "100m"
limits:
memory: "256Mi"
cpu: "200m"
volumes:
- name: redis-storage
persistentVolumeClaim:
claimName: redis-pvc
---
apiVersion: v1
kind: Service
metadata:
name: redis-service
namespace: flask-app
spec:
selector:
app: redis
ports:
- port: 6379
targetPort: 6379
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: redis-pvc
namespace: flask-app
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 5Gi
---
# k8s/flask-app.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: flask-app
namespace: flask-app
spec:
replicas: 3
selector:
matchLabels:
app: flask-app
template:
metadata:
labels:
app: flask-app
spec:
containers:
- name: flask-app
image: your-registry/flask-app:latest
ports:
- containerPort: 5000
env:
- name: SECRET_KEY
valueFrom:
secretKeyRef:
name: flask-app-secret
key: SECRET_KEY
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: flask-app-secret
key: DATABASE_URL
envFrom:
- configMapRef:
name: flask-app-config
livenessProbe:
httpGet:
path: /health
port: 5000
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /health
port: 5000
initialDelaySeconds: 5
periodSeconds: 5
resources:
requests:
memory: "256Mi"
cpu: "250m"
limits:
memory: "512Mi"
cpu: "500m"
volumeMounts:
- name: uploads-storage
mountPath: /app/uploads
volumes:
- name: uploads-storage
persistentVolumeClaim:
claimName: uploads-pvc
---
apiVersion: v1
kind: Service
metadata:
name: flask-app-service
namespace: flask-app
spec:
selector:
app: flask-app
ports:
- port: 80
targetPort: 5000
type: ClusterIP
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: uploads-pvc
namespace: flask-app
spec:
accessModes:
- ReadWriteMany
resources:
requests:
storage: 20Gi
---
# k8s/celery-worker.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: celery-worker
namespace: flask-app
spec:
replicas: 2
selector:
matchLabels:
app: celery-worker
template:
metadata:
labels:
app: celery-worker
spec:
containers:
- name: celery-worker
image: your-registry/flask-app:latest
command: ["celery"]
args: ["-A", "app.celery", "worker", "--loglevel=info", "--concurrency=4"]
env:
- name: SECRET_KEY
valueFrom:
secretKeyRef:
name: flask-app-secret
key: SECRET_KEY
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: flask-app-secret
key: DATABASE_URL
envFrom:
- configMapRef:
name: flask-app-config
resources:
requests:
memory: "256Mi"
cpu: "250m"
limits:
memory: "512Mi"
cpu: "500m"
---
# k8s/ingress.yaml
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: flask-app-ingress
namespace: flask-app
annotations:
kubernetes.io/ingress.class: "nginx"
cert-manager.io/cluster-issuer: "letsencrypt-prod"
nginx.ingress.kubernetes.io/ssl-redirect: "true"
nginx.ingress.kubernetes.io/proxy-body-size: "16m"
spec:
tls:
- hosts:
- your-domain.com
secretName: flask-app-tls
rules:
- host: your-domain.com
http:
paths:
- path: /
pathType: Prefix
backend:
service:
name: flask-app-service
port:
number: 80
---
# k8s/hpa.yaml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: flask-app-hpa
namespace: flask-app
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: flask-app
minReplicas: 3
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
# scripts/k8s_deploy.py
import subprocess
import yaml
import time
from pathlib import Path
class KubernetesDeployer:
"""Kubernetes部署管理器"""
def __init__(self, namespace='flask-app', kubeconfig=None):
self.namespace = namespace
self.kubeconfig = kubeconfig
self.k8s_dir = Path('k8s')
def run_kubectl(self, command, check=True):
"""执行kubectl命令"""
cmd = ['kubectl'] + command
if self.kubeconfig:
cmd.extend(['--kubeconfig', self.kubeconfig])
print(f"执行命令: {' '.join(cmd)}")
result = subprocess.run(cmd, capture_output=True, text=True)
if check and result.returncode != 0:
print(f"命令执行失败: {result.stderr}")
raise subprocess.CalledProcessError(result.returncode, cmd)
return result
def create_namespace(self):
"""创建命名空间"""
try:
self.run_kubectl(['create', 'namespace', self.namespace])
print(f"命名空间 {self.namespace} 创建成功")
except subprocess.CalledProcessError:
print(f"命名空间 {self.namespace} 已存在")
def apply_manifests(self, manifest_files=None):
"""应用Kubernetes清单文件"""
if manifest_files is None:
manifest_files = [
'namespace.yaml',
'configmap.yaml',
'secret.yaml',
'postgres.yaml',
'redis.yaml',
'flask-app.yaml',
'celery-worker.yaml',
'ingress.yaml',
'hpa.yaml'
]
for manifest_file in manifest_files:
manifest_path = self.k8s_dir / manifest_file
if manifest_path.exists():
self.run_kubectl(['apply', '-f', str(manifest_path)])
print(f"应用清单文件: {manifest_file}")
else:
print(f"清单文件不存在: {manifest_file}")
def wait_for_deployment(self, deployment_name, timeout=300):
"""等待部署完成"""
print(f"等待部署 {deployment_name} 完成...")
self.run_kubectl([
'wait', '--for=condition=available',
f'deployment/{deployment_name}',
f'--namespace={self.namespace}',
f'--timeout={timeout}s'
])
print(f"部署 {deployment_name} 完成")
def get_pod_status(self):
"""获取Pod状态"""
result = self.run_kubectl([
'get', 'pods', f'--namespace={self.namespace}', '-o', 'wide'
])
print("Pod状态:")
print(result.stdout)
def get_service_info(self):
"""获取服务信息"""
result = self.run_kubectl([
'get', 'services', f'--namespace={self.namespace}', '-o', 'wide'
])
print("服务信息:")
print(result.stdout)
def get_ingress_info(self):
"""获取Ingress信息"""
result = self.run_kubectl([
'get', 'ingress', f'--namespace={self.namespace}', '-o', 'wide'
])
print("Ingress信息:")
print(result.stdout)
def scale_deployment(self, deployment_name, replicas):
"""扩缩容部署"""
self.run_kubectl([
'scale', f'deployment/{deployment_name}',
f'--replicas={replicas}',
f'--namespace={self.namespace}'
])
print(f"部署 {deployment_name} 扩缩容到 {replicas} 个副本")
def rolling_update(self, deployment_name, image):
"""滚动更新"""
self.run_kubectl([
'set', 'image',
f'deployment/{deployment_name}',
f'{deployment_name}={image}',
f'--namespace={self.namespace}'
])
print(f"开始滚动更新 {deployment_name} 到镜像 {image}")
# 等待滚动更新完成
self.wait_for_deployment(deployment_name)
def rollback_deployment(self, deployment_name, revision=None):
"""回滚部署"""
cmd = ['rollout', 'undo', f'deployment/{deployment_name}', f'--namespace={self.namespace}']
if revision:
cmd.extend(['--to-revision', str(revision)])
self.run_kubectl(cmd)
print(f"回滚部署 {deployment_name}")
# 等待回滚完成
self.wait_for_deployment(deployment_name)
def get_logs(self, pod_name=None, container=None, follow=False, tail=100):
"""获取日志"""
if pod_name is None:
# 获取第一个flask-app pod
result = self.run_kubectl([
'get', 'pods', f'--namespace={self.namespace}',
'-l', 'app=flask-app', '-o', 'jsonpath={.items[0].metadata.name}'
])
pod_name = result.stdout.strip()
cmd = ['logs', pod_name, f'--namespace={self.namespace}']
if container:
cmd.extend(['-c', container])
if follow:
cmd.append('-f')
if tail:
cmd.extend(['--tail', str(tail)])
self.run_kubectl(cmd, check=False)
def exec_pod(self, pod_name, command):
"""在Pod中执行命令"""
cmd = ['exec', '-it', pod_name, f'--namespace={self.namespace}', '--'] + command
self.run_kubectl(cmd, check=False)
def port_forward(self, service_name, local_port, remote_port):
"""端口转发"""
cmd = [
'port-forward', f'service/{service_name}',
f'{local_port}:{remote_port}',
f'--namespace={self.namespace}'
]
print(f"端口转发: localhost:{local_port} -> {service_name}:{remote_port}")
self.run_kubectl(cmd, check=False)
def deploy_full_stack(self):
"""部署完整应用栈"""
print("开始Kubernetes全栈部署...")
# 1. 创建命名空间
self.create_namespace()
# 2. 应用所有清单文件
self.apply_manifests()
# 3. 等待关键部署完成
deployments = ['postgres', 'redis', 'flask-app']
for deployment in deployments:
try:
self.wait_for_deployment(deployment)
except subprocess.CalledProcessError:
print(f"部署 {deployment} 可能存在问题")
# 4. 显示部署状态
print("\n=== 部署状态 ===")
self.get_pod_status()
print("\n=== 服务信息 ===")
self.get_service_info()
print("\n=== Ingress信息 ===")
self.get_ingress_info()
print("\nKubernetes部署完成!")
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(description='Kubernetes部署管理工具')
parser.add_argument('--namespace', '-n', default='flask-app', help='命名空间')
parser.add_argument('--kubeconfig', help='kubeconfig文件路径')
subparsers = parser.add_subparsers(dest='command', help='可用命令')
# deploy命令
subparsers.add_parser('deploy', help='部署应用')
# status命令
subparsers.add_parser('status', help='查看状态')
# scale命令
scale_parser = subparsers.add_parser('scale', help='扩缩容')
scale_parser.add_argument('deployment', help='部署名称')
scale_parser.add_argument('replicas', type=int, help='副本数量')
# update命令
update_parser = subparsers.add_parser('update', help='滚动更新')
update_parser.add_argument('deployment', help='部署名称')
update_parser.add_argument('image', help='新镜像')
# rollback命令
rollback_parser = subparsers.add_parser('rollback', help='回滚部署')
rollback_parser.add_argument('deployment', help='部署名称')
rollback_parser.add_argument('--revision', type=int, help='回滚到指定版本')
# logs命令
logs_parser = subparsers.add_parser('logs', help='查看日志')
logs_parser.add_argument('--pod', help='Pod名称')
logs_parser.add_argument('-f', '--follow', action='store_true', help='跟踪日志')
logs_parser.add_argument('--tail', type=int, default=100, help='显示最后N行')
# port-forward命令
pf_parser = subparsers.add_parser('port-forward', help='端口转发')
pf_parser.add_argument('service', help='服务名称')
pf_parser.add_argument('local_port', type=int, help='本地端口')
pf_parser.add_argument('remote_port', type=int, help='远程端口')
args = parser.parse_args()
if not args.command:
parser.print_help()
exit(1)
deployer = KubernetesDeployer(args.namespace, args.kubeconfig)
if args.command == 'deploy':
deployer.deploy_full_stack()
elif args.command == 'status':
deployer.get_pod_status()
deployer.get_service_info()
deployer.get_ingress_info()
elif args.command == 'scale':
deployer.scale_deployment(args.deployment, args.replicas)
elif args.command == 'update':
deployer.rolling_update(args.deployment, args.image)
elif args.command == 'rollback':
deployer.rollback_deployment(args.deployment, args.revision)
elif args.command == 'logs':
deployer.get_logs(args.pod, follow=args.follow, tail=args.tail)
elif args.command == 'port-forward':
deployer.port_forward(args.service, args.local_port, args.remote_port)
11.4 监控与日志
11.4.1 应用监控
# monitoring/prometheus_metrics.py
from prometheus_client import Counter, Histogram, Gauge, generate_latest
from flask import Response
import time
import psutil
import threading
class PrometheusMetrics:
"""Prometheus指标收集器"""
def __init__(self, app=None):
self.app = app
# 请求指标
self.request_count = Counter(
'flask_requests_total',
'Total number of requests',
['method', 'endpoint', 'status']
)
self.request_duration = Histogram(
'flask_request_duration_seconds',
'Request duration in seconds',
['method', 'endpoint']
)
# 业务指标
self.user_registrations = Counter(
'flask_user_registrations_total',
'Total number of user registrations'
)
self.active_users = Gauge(
'flask_active_users',
'Number of active users'
)
self.database_connections = Gauge(
'flask_database_connections',
'Number of database connections'
)
# 系统指标
self.cpu_usage = Gauge(
'flask_cpu_usage_percent',
'CPU usage percentage'
)
self.memory_usage = Gauge(
'flask_memory_usage_bytes',
'Memory usage in bytes'
)
self.disk_usage = Gauge(
'flask_disk_usage_percent',
'Disk usage percentage'
)
if app:
self.init_app(app)
def init_app(self, app):
"""初始化Flask应用"""
self.app = app
# 注册请求钩子
app.before_request(self._before_request)
app.after_request(self._after_request)
# 注册指标端点
app.add_url_rule('/metrics', 'metrics', self.metrics_endpoint)
# 启动系统指标收集线程
self._start_system_metrics_collection()
def _before_request(self):
"""请求开始前的钩子"""
from flask import g
g.start_time = time.time()
def _after_request(self, response):
"""请求结束后的钩子"""
from flask import request, g
# 记录请求计数
self.request_count.labels(
method=request.method,
endpoint=request.endpoint or 'unknown',
status=response.status_code
).inc()
# 记录请求持续时间
if hasattr(g, 'start_time'):
duration = time.time() - g.start_time
self.request_duration.labels(
method=request.method,
endpoint=request.endpoint or 'unknown'
).observe(duration)
return response
def _start_system_metrics_collection(self):
"""启动系统指标收集"""
def collect_system_metrics():
while True:
try:
# CPU使用率
cpu_percent = psutil.cpu_percent(interval=1)
self.cpu_usage.set(cpu_percent)
# 内存使用
memory = psutil.virtual_memory()
self.memory_usage.set(memory.used)
# 磁盘使用率
disk = psutil.disk_usage('/')
disk_percent = (disk.used / disk.total) * 100
self.disk_usage.set(disk_percent)
except Exception as e:
print(f"系统指标收集错误: {e}")
time.sleep(30) # 每30秒收集一次
thread = threading.Thread(target=collect_system_metrics, daemon=True)
thread.start()
def metrics_endpoint(self):
"""Prometheus指标端点"""
return Response(generate_latest(), mimetype='text/plain')
def record_user_registration(self):
"""记录用户注册"""
self.user_registrations.inc()
def update_active_users(self, count):
"""更新活跃用户数"""
self.active_users.set(count)
def update_database_connections(self, count):
"""更新数据库连接数"""
self.database_connections.set(count)
# monitoring/prometheus.yml
global:
scrape_interval: 15s
evaluation_interval: 15s
rule_files:
- "alert_rules.yml"
alerting:
alertmanagers:
- static_configs:
- targets:
- alertmanager:9093
scrape_configs:
- job_name: 'flask-app'
static_configs:
- targets: ['flask-app:5000']
metrics_path: '/metrics'
scrape_interval: 30s
- job_name: 'postgres'
static_configs:
- targets: ['postgres-exporter:9187']
- job_name: 'redis'
static_configs:
- targets: ['redis-exporter:9121']
- job_name: 'nginx'
static_configs:
- targets: ['nginx-exporter:9113']
# monitoring/alert_rules.yml
groups:
- name: flask-app-alerts
rules:
- alert: HighErrorRate
expr: rate(flask_requests_total{status=~"5.."}[5m]) > 0.1
for: 5m
labels:
severity: warning
annotations:
summary: "High error rate detected"
description: "Error rate is {{ $value }} errors per second"
- alert: HighResponseTime
expr: histogram_quantile(0.95, rate(flask_request_duration_seconds_bucket[5m])) > 2
for: 5m
labels:
severity: warning
annotations:
summary: "High response time detected"
description: "95th percentile response time is {{ $value }} seconds"
- alert: HighCPUUsage
expr: flask_cpu_usage_percent > 80
for: 10m
labels:
severity: warning
annotations:
summary: "High CPU usage"
description: "CPU usage is {{ $value }}%"
- alert: HighMemoryUsage
expr: flask_memory_usage_bytes / (1024*1024*1024) > 1
for: 10m
labels:
severity: warning
annotations:
summary: "High memory usage"
description: "Memory usage is {{ $value }}GB"
- alert: DatabaseConnectionsHigh
expr: flask_database_connections > 50
for: 5m
labels:
severity: warning
annotations:
summary: "High database connections"
description: "Database connections: {{ $value }}"
# monitoring/health_check.py
from flask import Blueprint, jsonify, current_app
from sqlalchemy import text
import redis
import time
health_bp = Blueprint('health', __name__)
class HealthChecker:
"""健康检查器"""
def __init__(self, app=None):
self.app = app
if app:
self.init_app(app)
def init_app(self, app):
"""初始化Flask应用"""
self.app = app
app.register_blueprint(health_bp)
@staticmethod
def check_database():
"""检查数据库连接"""
try:
from app import db
db.session.execute(text('SELECT 1'))
return {'status': 'healthy', 'response_time': 0}
except Exception as e:
return {'status': 'unhealthy', 'error': str(e)}
@staticmethod
def check_redis():
"""检查Redis连接"""
try:
r = redis.Redis.from_url(current_app.config.get('REDIS_URL'))
start_time = time.time()
r.ping()
response_time = time.time() - start_time
return {'status': 'healthy', 'response_time': response_time}
except Exception as e:
return {'status': 'unhealthy', 'error': str(e)}
@staticmethod
def check_disk_space():
"""检查磁盘空间"""
try:
import shutil
total, used, free = shutil.disk_usage('/')
usage_percent = (used / total) * 100
status = 'healthy'
if usage_percent > 90:
status = 'critical'
elif usage_percent > 80:
status = 'warning'
return {
'status': status,
'usage_percent': usage_percent,
'free_gb': free // (1024**3)
}
except Exception as e:
return {'status': 'unhealthy', 'error': str(e)}
@staticmethod
def check_memory():
"""检查内存使用"""
try:
import psutil
memory = psutil.virtual_memory()
status = 'healthy'
if memory.percent > 90:
status = 'critical'
elif memory.percent > 80:
status = 'warning'
return {
'status': status,
'usage_percent': memory.percent,
'available_gb': memory.available // (1024**3)
}
except Exception as e:
return {'status': 'unhealthy', 'error': str(e)}
@health_bp.route('/health')
def health_check():
"""基础健康检查"""
return jsonify({
'status': 'healthy',
'timestamp': time.time(),
'version': current_app.config.get('VERSION', '1.0.0')
})
@health_bp.route('/health/detailed')
def detailed_health_check():
"""详细健康检查"""
checker = HealthChecker()
checks = {
'database': checker.check_database(),
'redis': checker.check_redis(),
'disk': checker.check_disk_space(),
'memory': checker.check_memory()
}
# 确定整体状态
overall_status = 'healthy'
for check_name, check_result in checks.items():
if check_result['status'] == 'critical':
overall_status = 'critical'
break
elif check_result['status'] in ['unhealthy', 'warning']:
overall_status = 'warning'
return jsonify({
'status': overall_status,
'timestamp': time.time(),
'checks': checks
})
@health_bp.route('/health/ready')
def readiness_check():
"""就绪检查"""
checker = HealthChecker()
# 检查关键依赖
db_check = checker.check_database()
redis_check = checker.check_redis()
if db_check['status'] == 'healthy' and redis_check['status'] == 'healthy':
return jsonify({'status': 'ready'})
else:
return jsonify({
'status': 'not_ready',
'database': db_check,
'redis': redis_check
}), 503
@health_bp.route('/health/live')
def liveness_check():
"""存活检查"""
return jsonify({'status': 'alive'})
11.4.2 日志管理
# logging_config.py
import logging
import logging.handlers
import os
from datetime import datetime
import json
class JSONFormatter(logging.Formatter):
"""JSON格式日志格式化器"""
def format(self, record):
log_entry = {
'timestamp': datetime.utcnow().isoformat(),
'level': record.levelname,
'logger': record.name,
'message': record.getMessage(),
'module': record.module,
'function': record.funcName,
'line': record.lineno
}
# 添加异常信息
if record.exc_info:
log_entry['exception'] = self.formatException(record.exc_info)
# 添加额外字段
if hasattr(record, 'user_id'):
log_entry['user_id'] = record.user_id
if hasattr(record, 'request_id'):
log_entry['request_id'] = record.request_id
if hasattr(record, 'ip_address'):
log_entry['ip_address'] = record.ip_address
return json.dumps(log_entry)
class LoggingConfig:
"""日志配置管理器"""
def __init__(self, app=None):
self.app = app
if app:
self.init_app(app)
def init_app(self, app):
"""初始化Flask应用日志"""
self.app = app
# 创建日志目录
log_dir = app.config.get('LOG_DIR', 'logs')
os.makedirs(log_dir, exist_ok=True)
# 配置根日志器
self._configure_root_logger(app, log_dir)
# 配置应用日志器
self._configure_app_logger(app, log_dir)
# 配置访问日志
self._configure_access_logger(app, log_dir)
# 配置错误日志
self._configure_error_logger(app, log_dir)
# 注册请求钩子
self._register_request_hooks(app)
def _configure_root_logger(self, app, log_dir):
"""配置根日志器"""
root_logger = logging.getLogger()
root_logger.setLevel(getattr(logging, app.config.get('LOG_LEVEL', 'INFO')))
# 控制台处理器
if app.config.get('LOG_TO_CONSOLE', True):
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
console_formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
console_handler.setFormatter(console_formatter)
root_logger.addHandler(console_handler)
def _configure_app_logger(self, app, log_dir):
"""配置应用日志器"""
app_logger = logging.getLogger('flask_app')
# 文件处理器
app_log_file = os.path.join(log_dir, 'app.log')
file_handler = logging.handlers.RotatingFileHandler(
app_log_file,
maxBytes=10 * 1024 * 1024, # 10MB
backupCount=10
)
file_handler.setLevel(logging.INFO)
# JSON格式化器
json_formatter = JSONFormatter()
file_handler.setFormatter(json_formatter)
app_logger.addHandler(file_handler)
app_logger.setLevel(logging.INFO)
# 设置为应用的日志器
app.logger = app_logger
def _configure_access_logger(self, app, log_dir):
"""配置访问日志器"""
access_logger = logging.getLogger('access')
access_log_file = os.path.join(log_dir, 'access.log')
access_handler = logging.handlers.RotatingFileHandler(
access_log_file,
maxBytes=50 * 1024 * 1024, # 50MB
backupCount=20
)
access_formatter = logging.Formatter(
'%(asctime)s - %(remote_addr)s - "%(method)s %(url)s %(protocol)s" '
'%(status_code)s %(content_length)s "%(user_agent)s" %(response_time)s'
)
access_handler.setFormatter(access_formatter)
access_logger.addHandler(access_handler)
access_logger.setLevel(logging.INFO)
access_logger.propagate = False
def _configure_error_logger(self, app, log_dir):
"""配置错误日志器"""
error_logger = logging.getLogger('error')
error_log_file = os.path.join(log_dir, 'error.log')
error_handler = logging.handlers.RotatingFileHandler(
error_log_file,
maxBytes=10 * 1024 * 1024, # 10MB
backupCount=10
)
error_handler.setLevel(logging.ERROR)
error_formatter = JSONFormatter()
error_handler.setFormatter(error_formatter)
error_logger.addHandler(error_handler)
error_logger.setLevel(logging.ERROR)
error_logger.propagate = False
def _register_request_hooks(self, app):
"""注册请求钩子"""
import uuid
import time
from flask import request, g
@app.before_request
def before_request():
g.start_time = time.time()
g.request_id = str(uuid.uuid4())
@app.after_request
def after_request(response):
# 记录访问日志
access_logger = logging.getLogger('access')
response_time = time.time() - g.start_time
# 创建日志记录
log_record = logging.LogRecord(
name='access',
level=logging.INFO,
pathname='',
lineno=0,
msg='',
args=(),
exc_info=None
)
# 添加自定义字段
log_record.remote_addr = request.remote_addr
log_record.method = request.method
log_record.url = request.url
log_record.protocol = request.environ.get('SERVER_PROTOCOL')
log_record.status_code = response.status_code
log_record.content_length = response.content_length or 0
log_record.user_agent = request.headers.get('User-Agent', '')
log_record.response_time = f'{response_time:.3f}s'
access_logger.handle(log_record)
return response
@app.errorhandler(Exception)
def handle_exception(e):
error_logger = logging.getLogger('error')
# 创建错误日志记录
extra = {
'request_id': getattr(g, 'request_id', None),
'user_id': getattr(g, 'user_id', None),
'ip_address': request.remote_addr,
'url': request.url,
'method': request.method
}
error_logger.error(f'Unhandled exception: {str(e)}', exc_info=True, extra=extra)
# 重新抛出异常让Flask处理
raise e
class StructuredLogger:
"""结构化日志器"""
def __init__(self, name):
self.logger = logging.getLogger(name)
def info(self, message, **kwargs):
"""记录信息日志"""
extra = self._prepare_extra(**kwargs)
self.logger.info(message, extra=extra)
def warning(self, message, **kwargs):
"""记录警告日志"""
extra = self._prepare_extra(**kwargs)
self.logger.warning(message, extra=extra)
def error(self, message, **kwargs):
"""记录错误日志"""
extra = self._prepare_extra(**kwargs)
self.logger.error(message, extra=extra)
def debug(self, message, **kwargs):
"""记录调试日志"""
extra = self._prepare_extra(**kwargs)
self.logger.debug(message, extra=extra)
def _prepare_extra(self, **kwargs):
"""准备额外字段"""
from flask import g, request
extra = kwargs.copy()
# 添加请求相关信息
if request:
extra.setdefault('request_id', getattr(g, 'request_id', None))
extra.setdefault('ip_address', request.remote_addr)
extra.setdefault('user_agent', request.headers.get('User-Agent'))
# 添加用户信息
extra.setdefault('user_id', getattr(g, 'user_id', None))
return extra
# 使用示例
def get_logger(name):
"""获取结构化日志器"""
return StructuredLogger(name)
11.4.3 运维工具
# ops/deployment_manager.py
import subprocess
import os
import yaml
import json
from datetime import datetime
import shutil
class DeploymentManager:
"""部署管理器"""
def __init__(self, config_file='deploy_config.yml'):
self.config = self._load_config(config_file)
self.deployment_history = []
def _load_config(self, config_file):
"""加载部署配置"""
try:
with open(config_file, 'r', encoding='utf-8') as f:
return yaml.safe_load(f)
except FileNotFoundError:
return self._default_config()
def _default_config(self):
"""默认配置"""
return {
'app_name': 'flask-app',
'environments': {
'development': {
'host': 'localhost',
'port': 5000,
'workers': 1
},
'staging': {
'host': '0.0.0.0',
'port': 8000,
'workers': 2
},
'production': {
'host': '0.0.0.0',
'port': 8000,
'workers': 4
}
},
'backup': {
'enabled': True,
'retention_days': 30,
'backup_dir': '/backups'
}
}
def deploy(self, environment='production', version=None):
"""部署应用"""
print(f"开始部署到 {environment} 环境...")
deployment_info = {
'environment': environment,
'version': version or self._get_current_version(),
'timestamp': datetime.now().isoformat(),
'status': 'started'
}
try:
# 1. 备份当前版本
if self.config['backup']['enabled']:
self._backup_current_version(environment)
# 2. 构建应用
self._build_application()
# 3. 运行测试
self._run_tests()
# 4. 部署到目标环境
self._deploy_to_environment(environment)
# 5. 健康检查
self._health_check(environment)
deployment_info['status'] = 'success'
print(f"部署成功完成!")
except Exception as e:
deployment_info['status'] = 'failed'
deployment_info['error'] = str(e)
print(f"部署失败: {e}")
# 回滚到上一个版本
self._rollback(environment)
raise
finally:
self.deployment_history.append(deployment_info)
self._save_deployment_history()
def _get_current_version(self):
"""获取当前版本"""
try:
result = subprocess.run(['git', 'rev-parse', '--short', 'HEAD'],
capture_output=True, text=True)
return result.stdout.strip()
except:
return datetime.now().strftime('%Y%m%d%H%M%S')
def _backup_current_version(self, environment):
"""备份当前版本"""
backup_dir = self.config['backup']['backup_dir']
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
backup_path = f"{backup_dir}/{environment}_{timestamp}"
os.makedirs(backup_path, exist_ok=True)
# 备份应用代码
shutil.copytree('.', f"{backup_path}/code",
ignore=shutil.ignore_patterns('.git', '__pycache__', '*.pyc'))
# 备份数据库
self._backup_database(backup_path)
print(f"备份完成: {backup_path}")
def _backup_database(self, backup_path):
"""备份数据库"""
db_url = os.getenv('DATABASE_URL')
if db_url and 'postgresql' in db_url:
backup_file = f"{backup_path}/database.sql"
cmd = f"pg_dump {db_url} > {backup_file}"
subprocess.run(cmd, shell=True, check=True)
def _build_application(self):
"""构建应用"""
print("构建应用...")
# 安装依赖
subprocess.run(['pip', 'install', '-r', 'requirements.txt'], check=True)
# 编译静态资源
if os.path.exists('webpack.config.js'):
subprocess.run(['npm', 'run', 'build'], check=True)
def _run_tests(self):
"""运行测试"""
print("运行测试...")
result = subprocess.run(['python', '-m', 'pytest', 'tests/'],
capture_output=True, text=True)
if result.returncode != 0:
raise Exception(f"测试失败: {result.stderr}")
def _deploy_to_environment(self, environment):
"""部署到指定环境"""
env_config = self.config['environments'][environment]
if environment == 'production':
# 使用Docker部署
self._deploy_with_docker(env_config)
else:
# 直接部署
self._deploy_direct(env_config)
def _deploy_with_docker(self, config):
"""使用Docker部署"""
print("使用Docker部署...")
# 构建镜像
subprocess.run(['docker', 'build', '-t', self.config['app_name'], '.'], check=True)
# 停止旧容器
subprocess.run(['docker', 'stop', self.config['app_name']],
capture_output=True)
subprocess.run(['docker', 'rm', self.config['app_name']],
capture_output=True)
# 启动新容器
cmd = [
'docker', 'run', '-d',
'--name', self.config['app_name'],
'-p', f"{config['port']}:{config['port']}",
'--env-file', '.env.production',
self.config['app_name']
]
subprocess.run(cmd, check=True)
def _deploy_direct(self, config):
"""直接部署"""
print("直接部署...")
# 重启应用服务
if os.path.exists('/etc/systemd/system/flask-app.service'):
subprocess.run(['sudo', 'systemctl', 'restart', 'flask-app'], check=True)
def _health_check(self, environment):
"""健康检查"""
import requests
import time
env_config = self.config['environments'][environment]
health_url = f"http://{env_config['host']}:{env_config['port']}/health"
print("执行健康检查...")
for i in range(30): # 最多等待30秒
try:
response = requests.get(health_url, timeout=5)
if response.status_code == 200:
print("健康检查通过")
return
except requests.RequestException:
pass
time.sleep(1)
raise Exception("健康检查失败")
def _rollback(self, environment):
"""回滚到上一个版本"""
print(f"回滚 {environment} 环境...")
# 查找最近的成功部署
for deployment in reversed(self.deployment_history):
if (deployment['environment'] == environment and
deployment['status'] == 'success'):
# 恢复备份
self._restore_backup(environment, deployment['timestamp'])
break
def _restore_backup(self, environment, timestamp):
"""恢复备份"""
backup_dir = self.config['backup']['backup_dir']
# 实现备份恢复逻辑
pass
def _save_deployment_history(self):
"""保存部署历史"""
with open('deployment_history.json', 'w') as f:
json.dump(self.deployment_history, f, indent=2)
def get_deployment_status(self):
"""获取部署状态"""
return {
'last_deployments': self.deployment_history[-5:],
'total_deployments': len(self.deployment_history)
}
class SystemMonitor:
"""系统监控器"""
def __init__(self):
self.alerts = []
def check_system_health(self):
"""检查系统健康状态"""
health_status = {
'timestamp': datetime.now().isoformat(),
'status': 'healthy',
'checks': {}
}
# 检查CPU使用率
cpu_usage = self._get_cpu_usage()
health_status['checks']['cpu'] = {
'usage_percent': cpu_usage,
'status': 'warning' if cpu_usage > 80 else 'healthy'
}
# 检查内存使用率
memory_usage = self._get_memory_usage()
health_status['checks']['memory'] = {
'usage_percent': memory_usage,
'status': 'warning' if memory_usage > 80 else 'healthy'
}
# 检查磁盘空间
disk_usage = self._get_disk_usage()
health_status['checks']['disk'] = {
'usage_percent': disk_usage,
'status': 'warning' if disk_usage > 80 else 'healthy'
}
# 检查服务状态
services_status = self._check_services()
health_status['checks']['services'] = services_status
# 确定整体状态
if any(check['status'] == 'critical' for check in health_status['checks'].values()):
health_status['status'] = 'critical'
elif any(check['status'] == 'warning' for check in health_status['checks'].values()):
health_status['status'] = 'warning'
return health_status
def _get_cpu_usage(self):
"""获取CPU使用率"""
try:
import psutil
return psutil.cpu_percent(interval=1)
except ImportError:
return 0
def _get_memory_usage(self):
"""获取内存使用率"""
try:
import psutil
return psutil.virtual_memory().percent
except ImportError:
return 0
def _get_disk_usage(self):
"""获取磁盘使用率"""
try:
import shutil
total, used, free = shutil.disk_usage('/')
return (used / total) * 100
except:
return 0
def _check_services(self):
"""检查服务状态"""
services = ['postgresql', 'redis', 'nginx']
status = {}
for service in services:
try:
result = subprocess.run(
['systemctl', 'is-active', service],
capture_output=True, text=True
)
status[service] = {
'status': 'healthy' if result.stdout.strip() == 'active' else 'unhealthy',
'active': result.stdout.strip() == 'active'
}
except:
status[service] = {'status': 'unknown', 'active': False}
return status
def send_alert(self, message, severity='warning'):
"""发送告警"""
alert = {
'timestamp': datetime.now().isoformat(),
'message': message,
'severity': severity
}
self.alerts.append(alert)
# 发送到外部告警系统
self._send_to_alertmanager(alert)
def _send_to_alertmanager(self, alert):
"""发送到Alertmanager"""
try:
import requests
alertmanager_url = os.getenv('ALERTMANAGER_URL')
if alertmanager_url:
requests.post(f"{alertmanager_url}/api/v1/alerts", json=[alert])
except:
pass
class BackupManager:
"""备份管理器"""
def __init__(self, config=None):
self.config = config or {
'backup_dir': '/backups',
'retention_days': 30,
'compress': True
}
def create_backup(self, backup_type='full'):
"""创建备份"""
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
backup_name = f"{backup_type}_{timestamp}"
backup_path = os.path.join(self.config['backup_dir'], backup_name)
os.makedirs(backup_path, exist_ok=True)
try:
if backup_type == 'full':
self._backup_application(backup_path)
self._backup_database(backup_path)
self._backup_uploads(backup_path)
elif backup_type == 'database':
self._backup_database(backup_path)
elif backup_type == 'files':
self._backup_application(backup_path)
self._backup_uploads(backup_path)
# 压缩备份
if self.config['compress']:
self._compress_backup(backup_path)
# 清理旧备份
self._cleanup_old_backups()
return backup_path
except Exception as e:
# 清理失败的备份
if os.path.exists(backup_path):
shutil.rmtree(backup_path)
raise e
def _backup_application(self, backup_path):
"""备份应用代码"""
app_backup_path = os.path.join(backup_path, 'application')
shutil.copytree('.', app_backup_path,
ignore=shutil.ignore_patterns(
'.git', '__pycache__', '*.pyc', 'node_modules',
'logs', 'backups', '.env*'
))
def _backup_database(self, backup_path):
"""备份数据库"""
db_url = os.getenv('DATABASE_URL')
if not db_url:
return
db_backup_file = os.path.join(backup_path, 'database.sql')
if 'postgresql' in db_url:
cmd = f"pg_dump {db_url} > {db_backup_file}"
subprocess.run(cmd, shell=True, check=True)
elif 'mysql' in db_url:
# MySQL备份逻辑
pass
def _backup_uploads(self, backup_path):
"""备份上传文件"""
uploads_dir = 'uploads'
if os.path.exists(uploads_dir):
uploads_backup_path = os.path.join(backup_path, 'uploads')
shutil.copytree(uploads_dir, uploads_backup_path)
def _compress_backup(self, backup_path):
"""压缩备份"""
archive_path = f"{backup_path}.tar.gz"
shutil.make_archive(backup_path, 'gztar', backup_path)
shutil.rmtree(backup_path)
return archive_path
def _cleanup_old_backups(self):
"""清理旧备份"""
retention_days = self.config['retention_days']
cutoff_time = datetime.now().timestamp() - (retention_days * 24 * 3600)
backup_dir = self.config['backup_dir']
if not os.path.exists(backup_dir):
return
for item in os.listdir(backup_dir):
item_path = os.path.join(backup_dir, item)
if os.path.getctime(item_path) < cutoff_time:
if os.path.isfile(item_path):
os.remove(item_path)
else:
shutil.rmtree(item_path)
def restore_backup(self, backup_name):
"""恢复备份"""
backup_path = os.path.join(self.config['backup_dir'], backup_name)
if not os.path.exists(backup_path):
# 尝试解压
archive_path = f"{backup_path}.tar.gz"
if os.path.exists(archive_path):
shutil.unpack_archive(archive_path, backup_path)
else:
raise FileNotFoundError(f"备份不存在: {backup_name}")
# 恢复应用
app_backup = os.path.join(backup_path, 'application')
if os.path.exists(app_backup):
self._restore_application(app_backup)
# 恢复数据库
db_backup = os.path.join(backup_path, 'database.sql')
if os.path.exists(db_backup):
self._restore_database(db_backup)
# 恢复上传文件
uploads_backup = os.path.join(backup_path, 'uploads')
if os.path.exists(uploads_backup):
self._restore_uploads(uploads_backup)
def _restore_application(self, app_backup_path):
"""恢复应用代码"""
# 备份当前代码
current_backup = f"current_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
shutil.move('.', current_backup)
# 恢复备份代码
shutil.copytree(app_backup_path, '.')
def _restore_database(self, db_backup_file):
"""恢复数据库"""
db_url = os.getenv('DATABASE_URL')
if not db_url:
return
if 'postgresql' in db_url:
cmd = f"psql {db_url} < {db_backup_file}"
subprocess.run(cmd, shell=True, check=True)
def _restore_uploads(self, uploads_backup_path):
"""恢复上传文件"""
uploads_dir = 'uploads'
if os.path.exists(uploads_dir):
shutil.rmtree(uploads_dir)
shutil.copytree(uploads_backup_path, uploads_dir)
def list_backups(self):
"""列出所有备份"""
backup_dir = self.config['backup_dir']
if not os.path.exists(backup_dir):
return []
backups = []
for item in os.listdir(backup_dir):
item_path = os.path.join(backup_dir, item)
stat = os.stat(item_path)
backups.append({
'name': item,
'size': stat.st_size,
'created': datetime.fromtimestamp(stat.st_ctime).isoformat(),
'type': 'archive' if item.endswith('.tar.gz') else 'directory'
})
return sorted(backups, key=lambda x: x['created'], reverse=True)
11.5 安全配置
11.5.1 SSL/TLS配置
# nginx/ssl.conf
server {
listen 80;
server_name yourdomain.com www.yourdomain.com;
return 301 https://$server_name$request_uri;
}
server {
listen 443 ssl http2;
server_name yourdomain.com www.yourdomain.com;
# SSL证书配置
ssl_certificate /etc/ssl/certs/yourdomain.com.crt;
ssl_certificate_key /etc/ssl/private/yourdomain.com.key;
# SSL安全配置
ssl_protocols TLSv1.2 TLSv1.3;
ssl_ciphers ECDHE-RSA-AES256-GCM-SHA512:DHE-RSA-AES256-GCM-SHA512:ECDHE-RSA-AES256-GCM-SHA384:DHE-RSA-AES256-GCM-SHA384;
ssl_prefer_server_ciphers off;
ssl_session_cache shared:SSL:10m;
ssl_session_timeout 10m;
# HSTS
add_header Strict-Transport-Security "max-age=31536000; includeSubDomains" always;
# 其他安全头
add_header X-Frame-Options DENY;
add_header X-Content-Type-Options nosniff;
add_header X-XSS-Protection "1; mode=block";
add_header Referrer-Policy "strict-origin-when-cross-origin";
location / {
proxy_pass http://flask-app:5000;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}
}
11.5.2 防火墙配置
#!/bin/bash
# scripts/setup_firewall.sh
# 启用UFW
sudo ufw --force enable
# 默认策略
sudo ufw default deny incoming
sudo ufw default allow outgoing
# 允许SSH
sudo ufw allow ssh
# 允许HTTP和HTTPS
sudo ufw allow 80/tcp
sudo ufw allow 443/tcp
# 允许应用端口(仅从本地)
sudo ufw allow from 127.0.0.1 to any port 5000
sudo ufw allow from 127.0.0.1 to any port 6379 # Redis
sudo ufw allow from 127.0.0.1 to any port 5432 # PostgreSQL
# 显示状态
sudo ufw status verbose
11.5.3 安全扫描
# security/scanner.py
import subprocess
import json
import os
from datetime import datetime
class SecurityScanner:
"""安全扫描器"""
def __init__(self):
self.scan_results = []
def run_full_scan(self):
"""运行完整安全扫描"""
print("开始安全扫描...")
results = {
'timestamp': datetime.now().isoformat(),
'scans': {}
}
# 依赖漏洞扫描
results['scans']['dependencies'] = self.scan_dependencies()
# 代码安全扫描
results['scans']['code'] = self.scan_code()
# 配置安全检查
results['scans']['config'] = self.check_config_security()
# Docker镜像扫描
results['scans']['docker'] = self.scan_docker_image()
self.scan_results.append(results)
return results
def scan_dependencies(self):
"""扫描依赖漏洞"""
print("扫描依赖漏洞...")
try:
# 使用safety扫描Python依赖
result = subprocess.run(
['safety', 'check', '--json'],
capture_output=True, text=True
)
if result.returncode == 0:
return {
'status': 'clean',
'vulnerabilities': []
}
else:
vulnerabilities = json.loads(result.stdout)
return {
'status': 'vulnerabilities_found',
'vulnerabilities': vulnerabilities
}
except Exception as e:
return {
'status': 'error',
'error': str(e)
}
def scan_code(self):
"""代码安全扫描"""
print("扫描代码安全问题...")
try:
# 使用bandit扫描Python代码
result = subprocess.run(
['bandit', '-r', '.', '-f', 'json'],
capture_output=True, text=True
)
if result.stdout:
bandit_results = json.loads(result.stdout)
issues = bandit_results.get('results', [])
return {
'status': 'completed',
'issues_count': len(issues),
'high_severity': len([i for i in issues if i['issue_severity'] == 'HIGH']),
'medium_severity': len([i for i in issues if i['issue_severity'] == 'MEDIUM']),
'low_severity': len([i for i in issues if i['issue_severity'] == 'LOW']),
'issues': issues[:10] # 只返回前10个问题
}
else:
return {'status': 'clean', 'issues_count': 0}
except Exception as e:
return {
'status': 'error',
'error': str(e)
}
def check_config_security(self):
"""检查配置安全性"""
print("检查配置安全性...")
issues = []
# 检查环境变量文件
env_files = ['.env', '.env.production', '.env.local']
for env_file in env_files:
if os.path.exists(env_file):
with open(env_file, 'r') as f:
content = f.read()
# 检查是否有硬编码的密钥
if 'password' in content.lower() or 'secret' in content.lower():
issues.append({
'type': 'hardcoded_secrets',
'file': env_file,
'description': '可能包含硬编码的密钥'
})
# 检查DEBUG模式
if os.getenv('FLASK_DEBUG') == 'True':
issues.append({
'type': 'debug_mode',
'description': '生产环境中启用了DEBUG模式'
})
# 检查默认密钥
secret_key = os.getenv('SECRET_KEY')
if not secret_key or len(secret_key) < 32:
issues.append({
'type': 'weak_secret_key',
'description': 'SECRET_KEY过短或未设置'
})
return {
'status': 'completed',
'issues_count': len(issues),
'issues': issues
}
def scan_docker_image(self):
"""扫描Docker镜像"""
print("扫描Docker镜像...")
try:
# 使用trivy扫描Docker镜像
result = subprocess.run(
['trivy', 'image', '--format', 'json', 'flask-app:latest'],
capture_output=True, text=True
)
if result.stdout:
trivy_results = json.loads(result.stdout)
vulnerabilities = []
for result in trivy_results.get('Results', []):
vulnerabilities.extend(result.get('Vulnerabilities', []))
return {
'status': 'completed',
'vulnerabilities_count': len(vulnerabilities),
'critical': len([v for v in vulnerabilities if v.get('Severity') == 'CRITICAL']),
'high': len([v for v in vulnerabilities if v.get('Severity') == 'HIGH']),
'medium': len([v for v in vulnerabilities if v.get('Severity') == 'MEDIUM']),
'low': len([v for v in vulnerabilities if v.get('Severity') == 'LOW'])
}
else:
return {'status': 'clean'}
except Exception as e:
return {
'status': 'error',
'error': str(e)
}
def generate_report(self):
"""生成安全报告"""
if not self.scan_results:
return "没有扫描结果"
latest_scan = self.scan_results[-1]
report = f"""
# 安全扫描报告
**扫描时间**: {latest_scan['timestamp']}
## 依赖漏洞扫描
- 状态: {latest_scan['scans']['dependencies']['status']}
- 漏洞数量: {len(latest_scan['scans']['dependencies'].get('vulnerabilities', []))}
## 代码安全扫描
- 状态: {latest_scan['scans']['code']['status']}
- 问题数量: {latest_scan['scans']['code'].get('issues_count', 0)}
- 高危: {latest_scan['scans']['code'].get('high_severity', 0)}
- 中危: {latest_scan['scans']['code'].get('medium_severity', 0)}
- 低危: {latest_scan['scans']['code'].get('low_severity', 0)}
## 配置安全检查
- 问题数量: {latest_scan['scans']['config']['issues_count']}
## Docker镜像扫描
- 状态: {latest_scan['scans']['docker']['status']}
- 漏洞数量: {latest_scan['scans']['docker'].get('vulnerabilities_count', 0)}
- 严重: {latest_scan['scans']['docker'].get('critical', 0)}
- 高危: {latest_scan['scans']['docker'].get('high', 0)}
"""
return report
11.6 故障排除
11.6.1 常见问题诊断
# troubleshooting/diagnostics.py
import subprocess
import psutil
import requests
import os
from datetime import datetime
class SystemDiagnostics:
"""系统诊断工具"""
def __init__(self):
self.diagnostic_results = {}
def run_diagnostics(self):
"""运行系统诊断"""
print("开始系统诊断...")
self.diagnostic_results = {
'timestamp': datetime.now().isoformat(),
'system': self.check_system_resources(),
'services': self.check_services(),
'network': self.check_network(),
'database': self.check_database(),
'application': self.check_application(),
'logs': self.check_logs()
}
return self.diagnostic_results
def check_system_resources(self):
"""检查系统资源"""
try:
cpu_percent = psutil.cpu_percent(interval=1)
memory = psutil.virtual_memory()
disk = psutil.disk_usage('/')
return {
'status': 'ok',
'cpu_usage': cpu_percent,
'memory_usage': memory.percent,
'memory_available': memory.available // (1024**3), # GB
'disk_usage': (disk.used / disk.total) * 100,
'disk_free': disk.free // (1024**3) # GB
}
except Exception as e:
return {'status': 'error', 'error': str(e)}
def check_services(self):
"""检查服务状态"""
services = ['postgresql', 'redis', 'nginx']
service_status = {}
for service in services:
try:
result = subprocess.run(
['systemctl', 'is-active', service],
capture_output=True, text=True
)
service_status[service] = {
'active': result.stdout.strip() == 'active',
'status': result.stdout.strip()
}
except Exception as e:
service_status[service] = {
'active': False,
'error': str(e)
}
return service_status
def check_network(self):
"""检查网络连接"""
network_checks = {
'internet': self._check_internet_connection(),
'dns': self._check_dns_resolution(),
'ports': self._check_ports()
}
return network_checks
def _check_internet_connection(self):
"""检查互联网连接"""
try:
response = requests.get('https://www.google.com', timeout=5)
return {'status': 'ok', 'response_time': response.elapsed.total_seconds()}
except Exception as e:
return {'status': 'error', 'error': str(e)}
def _check_dns_resolution(self):
"""检查DNS解析"""
try:
import socket
socket.gethostbyname('google.com')
return {'status': 'ok'}
except Exception as e:
return {'status': 'error', 'error': str(e)}
def _check_ports(self):
"""检查端口状态"""
ports_to_check = [80, 443, 5432, 6379, 5000]
port_status = {}
for port in ports_to_check:
try:
import socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1)
result = sock.connect_ex(('localhost', port))
sock.close()
port_status[port] = {
'open': result == 0,
'status': 'open' if result == 0 else 'closed'
}
except Exception as e:
port_status[port] = {'status': 'error', 'error': str(e)}
return port_status
def check_database(self):
"""检查数据库连接"""
try:
from sqlalchemy import create_engine, text
db_url = os.getenv('DATABASE_URL')
if not db_url:
return {'status': 'error', 'error': 'DATABASE_URL not set'}
engine = create_engine(db_url)
with engine.connect() as conn:
result = conn.execute(text('SELECT 1'))
return {'status': 'ok', 'connection': 'successful'}
except Exception as e:
return {'status': 'error', 'error': str(e)}
def check_application(self):
"""检查应用状态"""
try:
# 检查应用健康端点
response = requests.get('http://localhost:5000/health', timeout=5)
if response.status_code == 200:
return {
'status': 'ok',
'response_time': response.elapsed.total_seconds(),
'health_data': response.json()
}
else:
return {
'status': 'error',
'status_code': response.status_code,
'response': response.text
}
except Exception as e:
return {'status': 'error', 'error': str(e)}
def check_logs(self):
"""检查日志文件"""
log_files = ['logs/app.log', 'logs/error.log', '/var/log/nginx/error.log']
log_status = {}
for log_file in log_files:
if os.path.exists(log_file):
try:
# 获取最近的错误日志
with open(log_file, 'r') as f:
lines = f.readlines()
recent_errors = [line for line in lines[-100:]
if 'ERROR' in line or 'error' in line]
log_status[log_file] = {
'exists': True,
'size': os.path.getsize(log_file),
'recent_errors': len(recent_errors),
'last_errors': recent_errors[-5:] if recent_errors else []
}
except Exception as e:
log_status[log_file] = {
'exists': True,
'error': str(e)
}
else:
log_status[log_file] = {'exists': False}
return log_status
def generate_diagnostic_report(self):
"""生成诊断报告"""
if not self.diagnostic_results:
return "请先运行诊断"
results = self.diagnostic_results
report = f"""
# 系统诊断报告
**诊断时间**: {results['timestamp']}
## 系统资源
- CPU使用率: {results['system'].get('cpu_usage', 'N/A')}%
- 内存使用率: {results['system'].get('memory_usage', 'N/A')}%
- 磁盘使用率: {results['system'].get('disk_usage', 'N/A'):.1f}%
## 服务状态
"""
for service, status in results['services'].items():
report += f"- {service}: {'运行中' if status.get('active') else '未运行'}\n"
report += f"""
## 网络连接
- 互联网连接: {results['network']['internet']['status']}
- DNS解析: {results['network']['dns']['status']}
## 数据库
- 连接状态: {results['database']['status']}
## 应用状态
- 健康检查: {results['application']['status']}
"""
return report
11.6.2 性能问题排查
# troubleshooting/performance.py
import time
import psutil
import threading
from collections import defaultdict
class PerformanceProfiler:
"""性能分析器"""
def __init__(self):
self.metrics = defaultdict(list)
self.monitoring = False
self.monitor_thread = None
def start_monitoring(self, duration=60):
"""开始性能监控"""
self.monitoring = True
self.monitor_thread = threading.Thread(
target=self._monitor_performance,
args=(duration,)
)
self.monitor_thread.start()
def stop_monitoring(self):
"""停止性能监控"""
self.monitoring = False
if self.monitor_thread:
self.monitor_thread.join()
def _monitor_performance(self, duration):
"""监控性能指标"""
start_time = time.time()
while self.monitoring and (time.time() - start_time) < duration:
timestamp = time.time()
# CPU使用率
cpu_percent = psutil.cpu_percent(interval=1)
self.metrics['cpu'].append((timestamp, cpu_percent))
# 内存使用
memory = psutil.virtual_memory()
self.metrics['memory'].append((timestamp, memory.percent))
# 磁盘I/O
disk_io = psutil.disk_io_counters()
if disk_io:
self.metrics['disk_read'].append((timestamp, disk_io.read_bytes))
self.metrics['disk_write'].append((timestamp, disk_io.write_bytes))
# 网络I/O
net_io = psutil.net_io_counters()
if net_io:
self.metrics['net_sent'].append((timestamp, net_io.bytes_sent))
self.metrics['net_recv'].append((timestamp, net_io.bytes_recv))
time.sleep(1)
def analyze_performance(self):
"""分析性能数据"""
analysis = {}
for metric_name, data in self.metrics.items():
if not data:
continue
values = [value for _, value in data]
analysis[metric_name] = {
'avg': sum(values) / len(values),
'max': max(values),
'min': min(values),
'samples': len(values)
}
return analysis
def identify_bottlenecks(self):
"""识别性能瓶颈"""
analysis = self.analyze_performance()
bottlenecks = []
# CPU瓶颈
if 'cpu' in analysis and analysis['cpu']['avg'] > 80:
bottlenecks.append({
'type': 'cpu',
'severity': 'high' if analysis['cpu']['avg'] > 90 else 'medium',
'description': f"CPU使用率过高: 平均{analysis['cpu']['avg']:.1f}%"
})
# 内存瓶颈
if 'memory' in analysis and analysis['memory']['avg'] > 80:
bottlenecks.append({
'type': 'memory',
'severity': 'high' if analysis['memory']['avg'] > 90 else 'medium',
'description': f"内存使用率过高: 平均{analysis['memory']['avg']:.1f}%"
})
return bottlenecks
11.7 本章小结
技术要点总结
部署环境配置
- 生产环境要求和配置
- 环境变量管理
- 系统依赖安装
容器化部署
- Docker镜像构建和优化
- Docker Compose编排
- 多阶段构建策略
云平台部署
- AWS部署配置
- Kubernetes集群部署
- 自动扩缩容配置
监控与日志
- Prometheus指标收集
- 结构化日志管理
- 健康检查机制
运维工具
- 自动化部署脚本
- 备份恢复策略
- 系统监控告警
安全配置
- SSL/TLS配置
- 防火墙设置
- 安全扫描工具
故障排除
- 系统诊断工具
- 性能分析方法
- 常见问题解决
部署最佳实践
- 自动化优先:使用CI/CD流水线实现自动化部署
- 监控完善:建立完整的监控和告警体系
- 安全第一:定期进行安全扫描和更新
- 备份策略:制定完善的备份和恢复计划
- 文档维护:保持部署文档的及时更新
运维要点
- 预防性维护:定期检查系统健康状态
- 容量规划:根据业务增长规划资源
- 故障响应:建立快速故障响应机制
- 性能优化:持续监控和优化系统性能
- 团队协作:建立开发和运维团队的协作机制
下一章预告
下一章我们将学习Flask项目实战,包括: - 完整项目架构设计 - 业务功能实现 - 前后端集成 - 项目部署上线 - 运维监控实践
练习题
- 部署实践:使用Docker部署一个Flask应用到云服务器
- 监控配置:配置Prometheus监控Flask应用的关键指标
- 安全加固:为Flask应用配置SSL证书和安全头
- 备份策略:实现自动化的数据库备份脚本
- 故障演练:模拟系统故障并使用诊断工具排查问题