容器化部署
1. Docker 部署
Dockerfile 配置
# 多阶段构建 - 构建阶段
FROM python:3.11-slim as builder
# 设置工作目录
WORKDIR /app
# 安装构建依赖
RUN apt-get update && apt-get install -y \
gcc \
g++ \
make \
&& rm -rf /var/lib/apt/lists/*
# 复制依赖文件
COPY requirements.txt .
COPY requirements-dev.txt .
# 安装Python依赖
RUN pip install --no-cache-dir --user -r requirements.txt
# 多阶段构建 - 运行阶段
FROM python:3.11-slim as runtime
# 创建非root用户
RUN groupadd -r mcpuser && useradd -r -g mcpuser mcpuser
# 安装运行时依赖
RUN apt-get update && apt-get install -y \
curl \
&& rm -rf /var/lib/apt/lists/*
# 设置工作目录
WORKDIR /app
# 从构建阶段复制Python包
COPY --from=builder /root/.local /home/mcpuser/.local
# 复制应用代码
COPY --chown=mcpuser:mcpuser . .
# 设置环境变量
ENV PATH=/home/mcpuser/.local/bin:$PATH
ENV PYTHONPATH=/app
ENV MCP_ENV=production
ENV MCP_LOG_LEVEL=INFO
ENV MCP_HOST=0.0.0.0
ENV MCP_PORT=8080
# 创建必要的目录
RUN mkdir -p /app/logs /app/data && \
chown -R mcpuser:mcpuser /app
# 切换到非root用户
USER mcpuser
# 健康检查
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD curl -f http://localhost:${MCP_PORT}/health || exit 1
# 暴露端口
EXPOSE ${MCP_PORT}
# 启动命令
CMD ["python", "-m", "mcp_server.main"]
Docker Compose 配置
# docker-compose.yml
version: '3.8'
services:
mcp-server:
build:
context: .
dockerfile: Dockerfile
target: runtime
container_name: mcp-server
restart: unless-stopped
ports:
- "8080:8080"
environment:
- MCP_ENV=production
- MCP_LOG_LEVEL=INFO
- MCP_DATABASE_URL=postgresql://mcpuser:mcppass@postgres:5432/mcpdb
- MCP_REDIS_URL=redis://redis:6379/0
- MCP_SECRET_KEY=${MCP_SECRET_KEY}
volumes:
- ./logs:/app/logs
- ./data:/app/data
- ./config:/app/config:ro
depends_on:
postgres:
condition: service_healthy
redis:
condition: service_healthy
networks:
- mcp-network
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8080/health"]
interval: 30s
timeout: 10s
retries: 3
start_period: 40s
postgres:
image: postgres:15-alpine
container_name: mcp-postgres
restart: unless-stopped
environment:
- POSTGRES_DB=mcpdb
- POSTGRES_USER=mcpuser
- POSTGRES_PASSWORD=mcppass
volumes:
- postgres_data:/var/lib/postgresql/data
- ./init.sql:/docker-entrypoint-initdb.d/init.sql:ro
networks:
- mcp-network
healthcheck:
test: ["CMD-SHELL", "pg_isready -U mcpuser -d mcpdb"]
interval: 10s
timeout: 5s
retries: 5
redis:
image: redis:7-alpine
container_name: mcp-redis
restart: unless-stopped
command: redis-server --appendonly yes --requirepass redispass
volumes:
- redis_data:/data
networks:
- mcp-network
healthcheck:
test: ["CMD", "redis-cli", "--raw", "incr", "ping"]
interval: 10s
timeout: 3s
retries: 5
nginx:
image: nginx:alpine
container_name: mcp-nginx
restart: unless-stopped
ports:
- "80:80"
- "443:443"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf:ro
- ./ssl:/etc/nginx/ssl:ro
depends_on:
- mcp-server
networks:
- mcp-network
prometheus:
image: prom/prometheus:latest
container_name: mcp-prometheus
restart: unless-stopped
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml:ro
- prometheus_data:/prometheus
command:
- '--config.file=/etc/prometheus/prometheus.yml'
- '--storage.tsdb.path=/prometheus'
- '--web.console.libraries=/etc/prometheus/console_libraries'
- '--web.console.templates=/etc/prometheus/consoles'
- '--storage.tsdb.retention.time=200h'
- '--web.enable-lifecycle'
networks:
- mcp-network
grafana:
image: grafana/grafana:latest
container_name: mcp-grafana
restart: unless-stopped
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin
volumes:
- grafana_data:/var/lib/grafana
- ./grafana/dashboards:/etc/grafana/provisioning/dashboards:ro
- ./grafana/datasources:/etc/grafana/provisioning/datasources:ro
networks:
- mcp-network
volumes:
postgres_data:
redis_data:
prometheus_data:
grafana_data:
networks:
mcp-network:
driver: bridge
Nginx 配置
# nginx.conf
events {
worker_connections 1024;
}
http {
upstream mcp_backend {
least_conn;
server mcp-server:8080 max_fails=3 fail_timeout=30s;
# 可以添加更多服务器实例
# server mcp-server-2:8080 max_fails=3 fail_timeout=30s;
}
# 限流配置
limit_req_zone $binary_remote_addr zone=api:10m rate=10r/s;
limit_req_zone $binary_remote_addr zone=login:10m rate=1r/s;
# 日志格式
log_format main '$remote_addr - $remote_user [$time_local] "$request" '
'$status $body_bytes_sent "$http_referer" '
'"$http_user_agent" "$http_x_forwarded_for" '
'rt=$request_time uct="$upstream_connect_time" '
'uht="$upstream_header_time" urt="$upstream_response_time"';
access_log /var/log/nginx/access.log main;
error_log /var/log/nginx/error.log warn;
# 基本配置
sendfile on;
tcp_nopush on;
tcp_nodelay on;
keepalive_timeout 65;
types_hash_max_size 2048;
client_max_body_size 10M;
# Gzip压缩
gzip on;
gzip_vary on;
gzip_min_length 1024;
gzip_types text/plain text/css text/xml text/javascript application/json application/javascript application/xml+rss application/atom+xml;
# HTTP服务器配置
server {
listen 80;
server_name localhost;
# 重定向到HTTPS
return 301 https://$server_name$request_uri;
}
# HTTPS服务器配置
server {
listen 443 ssl http2;
server_name localhost;
# SSL配置
ssl_certificate /etc/nginx/ssl/cert.pem;
ssl_certificate_key /etc/nginx/ssl/key.pem;
ssl_session_timeout 1d;
ssl_session_cache shared:SSL:50m;
ssl_session_tickets off;
# 现代SSL配置
ssl_protocols TLSv1.2 TLSv1.3;
ssl_ciphers ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES128-GCM-SHA256:ECDHE-ECDSA-AES256-GCM-SHA384:ECDHE-RSA-AES256-GCM-SHA384;
ssl_prefer_server_ciphers off;
# HSTS
add_header Strict-Transport-Security "max-age=63072000" always;
# 健康检查端点
location /health {
access_log off;
proxy_pass http://mcp_backend/health;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}
# API端点
location /api/ {
limit_req zone=api burst=20 nodelay;
proxy_pass http://mcp_backend;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
# 超时配置
proxy_connect_timeout 30s;
proxy_send_timeout 30s;
proxy_read_timeout 30s;
# 缓冲配置
proxy_buffering on;
proxy_buffer_size 4k;
proxy_buffers 8 4k;
}
# 登录端点(更严格的限流)
location /api/auth/login {
limit_req zone=login burst=5 nodelay;
proxy_pass http://mcp_backend;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}
# WebSocket支持
location /ws/ {
proxy_pass http://mcp_backend;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
# WebSocket特定超时
proxy_read_timeout 86400;
}
# 静态文件
location /static/ {
expires 1y;
add_header Cache-Control "public, immutable";
# 如果有静态文件目录
# root /var/www;
}
}
}
2. Kubernetes 部署
命名空间配置
# namespace.yaml
apiVersion: v1
kind: Namespace
metadata:
name: mcp-system
labels:
name: mcp-system
app.kubernetes.io/name: mcp
app.kubernetes.io/version: "1.0.0"
ConfigMap 配置
# configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: mcp-config
namespace: mcp-system
data:
app.yaml: |
server:
host: "0.0.0.0"
port: 8080
workers: 4
max_connections: 1000
database:
pool_size: 20
max_overflow: 30
pool_timeout: 30
pool_recycle: 3600
cache:
backend: "redis"
default_timeout: 300
key_prefix: "mcp:"
logging:
level: "INFO"
format: "json"
handlers:
- "console"
- "file"
monitoring:
metrics_enabled: true
health_check_interval: 30
performance_monitoring: true
prometheus.yml: |
global:
scrape_interval: 15s
evaluation_interval: 15s
rule_files:
- "mcp_rules.yml"
scrape_configs:
- job_name: 'mcp-server'
static_configs:
- targets: ['mcp-server:8080']
metrics_path: '/metrics'
scrape_interval: 10s
- job_name: 'kubernetes-pods'
kubernetes_sd_configs:
- role: pod
namespaces:
names:
- mcp-system
relabel_configs:
- source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_scrape]
action: keep
regex: true
- source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_path]
action: replace
target_label: __metrics_path__
regex: (.+)
Secret 配置
# secret.yaml
apiVersion: v1
kind: Secret
metadata:
name: mcp-secrets
namespace: mcp-system
type: Opaque
data:
# Base64编码的值
database-url: cG9zdGdyZXNxbDovL21jcHVzZXI6bWNwcGFzc0Bwb3N0Z3JlczozNTQzMi9tY3BkYg==
redis-url: cmVkaXM6Ly86cmVkaXNwYXNzQHJlZGlzOjYzNzkvMA==
secret-key: eW91ci1zZWNyZXQta2V5LWhlcmU=
jwt-secret: eW91ci1qd3Qtc2VjcmV0LWtleQ==
Deployment 配置
# deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: mcp-server
namespace: mcp-system
labels:
app: mcp-server
version: v1.0.0
spec:
replicas: 3
strategy:
type: RollingUpdate
rollingUpdate:
maxSurge: 1
maxUnavailable: 1
selector:
matchLabels:
app: mcp-server
template:
metadata:
labels:
app: mcp-server
version: v1.0.0
annotations:
prometheus.io/scrape: "true"
prometheus.io/port: "8080"
prometheus.io/path: "/metrics"
spec:
serviceAccountName: mcp-service-account
securityContext:
runAsNonRoot: true
runAsUser: 1000
fsGroup: 1000
containers:
- name: mcp-server
image: mcp-server:1.0.0
imagePullPolicy: IfNotPresent
ports:
- containerPort: 8080
name: http
protocol: TCP
env:
- name: MCP_ENV
value: "production"
- name: MCP_HOST
value: "0.0.0.0"
- name: MCP_PORT
value: "8080"
- name: MCP_DATABASE_URL
valueFrom:
secretKeyRef:
name: mcp-secrets
key: database-url
- name: MCP_REDIS_URL
valueFrom:
secretKeyRef:
name: mcp-secrets
key: redis-url
- name: MCP_SECRET_KEY
valueFrom:
secretKeyRef:
name: mcp-secrets
key: secret-key
- name: MCP_JWT_SECRET
valueFrom:
secretKeyRef:
name: mcp-secrets
key: jwt-secret
volumeMounts:
- name: config
mountPath: /app/config
readOnly: true
- name: logs
mountPath: /app/logs
resources:
requests:
memory: "256Mi"
cpu: "250m"
limits:
memory: "512Mi"
cpu: "500m"
livenessProbe:
httpGet:
path: /health
port: http
initialDelaySeconds: 30
periodSeconds: 10
timeoutSeconds: 5
failureThreshold: 3
readinessProbe:
httpGet:
path: /ready
port: http
initialDelaySeconds: 5
periodSeconds: 5
timeoutSeconds: 3
failureThreshold: 3
securityContext:
allowPrivilegeEscalation: false
readOnlyRootFilesystem: true
capabilities:
drop:
- ALL
volumes:
- name: config
configMap:
name: mcp-config
- name: logs
emptyDir: {}
nodeSelector:
kubernetes.io/os: linux
tolerations:
- key: "node.kubernetes.io/not-ready"
operator: "Exists"
effect: "NoExecute"
tolerationSeconds: 300
- key: "node.kubernetes.io/unreachable"
operator: "Exists"
effect: "NoExecute"
tolerationSeconds: 300
Service 配置
# service.yaml
apiVersion: v1
kind: Service
metadata:
name: mcp-server
namespace: mcp-system
labels:
app: mcp-server
spec:
type: ClusterIP
ports:
- port: 8080
targetPort: http
protocol: TCP
name: http
selector:
app: mcp-server
---
apiVersion: v1
kind: Service
metadata:
name: mcp-server-headless
namespace: mcp-system
labels:
app: mcp-server
spec:
type: ClusterIP
clusterIP: None
ports:
- port: 8080
targetPort: http
protocol: TCP
name: http
selector:
app: mcp-server
Ingress 配置
# ingress.yaml
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: mcp-ingress
namespace: mcp-system
annotations:
kubernetes.io/ingress.class: "nginx"
nginx.ingress.kubernetes.io/ssl-redirect: "true"
nginx.ingress.kubernetes.io/force-ssl-redirect: "true"
nginx.ingress.kubernetes.io/rate-limit: "100"
nginx.ingress.kubernetes.io/rate-limit-window: "1m"
nginx.ingress.kubernetes.io/proxy-body-size: "10m"
nginx.ingress.kubernetes.io/proxy-connect-timeout: "30"
nginx.ingress.kubernetes.io/proxy-send-timeout: "30"
nginx.ingress.kubernetes.io/proxy-read-timeout: "30"
cert-manager.io/cluster-issuer: "letsencrypt-prod"
spec:
tls:
- hosts:
- mcp.example.com
secretName: mcp-tls
rules:
- host: mcp.example.com
http:
paths:
- path: /
pathType: Prefix
backend:
service:
name: mcp-server
port:
number: 8080
HorizontalPodAutoscaler 配置
# hpa.yaml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: mcp-server-hpa
namespace: mcp-system
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: mcp-server
minReplicas: 3
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
behavior:
scaleDown:
stabilizationWindowSeconds: 300
policies:
- type: Percent
value: 10
periodSeconds: 60
scaleUp:
stabilizationWindowSeconds: 60
policies:
- type: Percent
value: 50
periodSeconds: 60
- type: Pods
value: 2
periodSeconds: 60
selectPolicy: Max
监控与日志
1. Prometheus 监控配置
# monitoring/metrics.py
from prometheus_client import Counter, Histogram, Gauge, CollectorRegistry, generate_latest
from typing import Dict, Any
import time
from functools import wraps
class MCPMetrics:
"""MCP指标收集器"""
def __init__(self, registry: CollectorRegistry = None):
self.registry = registry or CollectorRegistry()
# 请求指标
self.request_count = Counter(
'mcp_requests_total',
'Total number of MCP requests',
['method', 'endpoint', 'status'],
registry=self.registry
)
self.request_duration = Histogram(
'mcp_request_duration_seconds',
'MCP request duration in seconds',
['method', 'endpoint'],
registry=self.registry
)
# 连接指标
self.active_connections = Gauge(
'mcp_active_connections',
'Number of active MCP connections',
registry=self.registry
)
self.connection_pool_size = Gauge(
'mcp_connection_pool_size',
'Size of connection pool',
['pool_name'],
registry=self.registry
)
# 工具执行指标
self.tool_executions = Counter(
'mcp_tool_executions_total',
'Total number of tool executions',
['tool_name', 'status'],
registry=self.registry
)
self.tool_execution_duration = Histogram(
'mcp_tool_execution_duration_seconds',
'Tool execution duration in seconds',
['tool_name'],
registry=self.registry
)
# 资源指标
self.resource_reads = Counter(
'mcp_resource_reads_total',
'Total number of resource reads',
['resource_type', 'status'],
registry=self.registry
)
self.resource_subscriptions = Gauge(
'mcp_resource_subscriptions',
'Number of active resource subscriptions',
['resource_type'],
registry=self.registry
)
# 缓存指标
self.cache_hits = Counter(
'mcp_cache_hits_total',
'Total number of cache hits',
['cache_name'],
registry=self.registry
)
self.cache_misses = Counter(
'mcp_cache_misses_total',
'Total number of cache misses',
['cache_name'],
registry=self.registry
)
# 错误指标
self.errors = Counter(
'mcp_errors_total',
'Total number of errors',
['error_type', 'component'],
registry=self.registry
)
def record_request(self, method: str, endpoint: str, status: str, duration: float):
"""记录请求指标"""
self.request_count.labels(method=method, endpoint=endpoint, status=status).inc()
self.request_duration.labels(method=method, endpoint=endpoint).observe(duration)
def record_tool_execution(self, tool_name: str, status: str, duration: float):
"""记录工具执行指标"""
self.tool_executions.labels(tool_name=tool_name, status=status).inc()
self.tool_execution_duration.labels(tool_name=tool_name).observe(duration)
def record_resource_read(self, resource_type: str, status: str):
"""记录资源读取指标"""
self.resource_reads.labels(resource_type=resource_type, status=status).inc()
def record_cache_hit(self, cache_name: str):
"""记录缓存命中"""
self.cache_hits.labels(cache_name=cache_name).inc()
def record_cache_miss(self, cache_name: str):
"""记录缓存未命中"""
self.cache_misses.labels(cache_name=cache_name).inc()
def record_error(self, error_type: str, component: str):
"""记录错误"""
self.errors.labels(error_type=error_type, component=component).inc()
def set_active_connections(self, count: int):
"""设置活跃连接数"""
self.active_connections.set(count)
def set_connection_pool_size(self, pool_name: str, size: int):
"""设置连接池大小"""
self.connection_pool_size.labels(pool_name=pool_name).set(size)
def set_resource_subscriptions(self, resource_type: str, count: int):
"""设置资源订阅数"""
self.resource_subscriptions.labels(resource_type=resource_type).set(count)
def get_metrics(self) -> str:
"""获取指标数据"""
return generate_latest(self.registry).decode('utf-8')
def monitor_request(metrics: MCPMetrics):
"""请求监控装饰器"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
start_time = time.time()
method = kwargs.get('method', 'unknown')
endpoint = kwargs.get('endpoint', 'unknown')
status = 'success'
try:
result = await func(*args, **kwargs)
return result
except Exception as e:
status = 'error'
metrics.record_error(type(e).__name__, func.__name__)
raise
finally:
duration = time.time() - start_time
metrics.record_request(method, endpoint, status, duration)
return wrapper
return decorator
def monitor_tool_execution(metrics: MCPMetrics):
"""工具执行监控装饰器"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
start_time = time.time()
tool_name = kwargs.get('tool_name', func.__name__)
status = 'success'
try:
result = await func(*args, **kwargs)
return result
except Exception as e:
status = 'error'
metrics.record_error(type(e).__name__, 'tool_execution')
raise
finally:
duration = time.time() - start_time
metrics.record_tool_execution(tool_name, status, duration)
return wrapper
return decorator
2. 日志配置
# logging/config.py
import logging
import logging.config
import json
from datetime import datetime
from typing import Dict, Any
import sys
import os
class JSONFormatter(logging.Formatter):
"""JSON格式化器"""
def format(self, record: logging.LogRecord) -> str:
log_entry = {
'timestamp': datetime.utcnow().isoformat() + 'Z',
'level': record.levelname,
'logger': record.name,
'message': record.getMessage(),
'module': record.module,
'function': record.funcName,
'line': record.lineno,
'process_id': os.getpid(),
'thread_id': record.thread,
}
# 添加异常信息
if record.exc_info:
log_entry['exception'] = self.formatException(record.exc_info)
# 添加额外字段
if hasattr(record, 'extra_fields'):
log_entry.update(record.extra_fields)
return json.dumps(log_entry, ensure_ascii=False)
class StructuredLogger:
"""结构化日志器"""
def __init__(self, name: str):
self.logger = logging.getLogger(name)
def _log(self, level: int, message: str, **kwargs):
"""记录日志"""
extra_fields = {
'component': kwargs.pop('component', None),
'request_id': kwargs.pop('request_id', None),
'user_id': kwargs.pop('user_id', None),
'session_id': kwargs.pop('session_id', None),
'tool_name': kwargs.pop('tool_name', None),
'resource_uri': kwargs.pop('resource_uri', None),
'duration': kwargs.pop('duration', None),
'status_code': kwargs.pop('status_code', None),
'error_code': kwargs.pop('error_code', None),
}
# 移除None值
extra_fields = {k: v for k, v in extra_fields.items() if v is not None}
# 添加剩余的kwargs
extra_fields.update(kwargs)
self.logger.log(level, message, extra={'extra_fields': extra_fields})
def debug(self, message: str, **kwargs):
self._log(logging.DEBUG, message, **kwargs)
def info(self, message: str, **kwargs):
self._log(logging.INFO, message, **kwargs)
def warning(self, message: str, **kwargs):
self._log(logging.WARNING, message, **kwargs)
def error(self, message: str, **kwargs):
self._log(logging.ERROR, message, **kwargs)
def critical(self, message: str, **kwargs):
self._log(logging.CRITICAL, message, **kwargs)
def setup_logging(config: Dict[str, Any]):
"""设置日志配置"""
log_level = config.get('level', 'INFO').upper()
log_format = config.get('format', 'text')
handlers = config.get('handlers', ['console'])
logging_config = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'json': {
'()': JSONFormatter,
},
'text': {
'format': '%(asctime)s - %(name)s - %(levelname)s - %(message)s',
'datefmt': '%Y-%m-%d %H:%M:%S'
}
},
'handlers': {},
'loggers': {
'': {
'level': log_level,
'handlers': handlers,
'propagate': False
}
}
}
# 控制台处理器
if 'console' in handlers:
logging_config['handlers']['console'] = {
'class': 'logging.StreamHandler',
'stream': sys.stdout,
'formatter': log_format,
'level': log_level
}
# 文件处理器
if 'file' in handlers:
log_file = config.get('file_path', '/app/logs/mcp.log')
os.makedirs(os.path.dirname(log_file), exist_ok=True)
logging_config['handlers']['file'] = {
'class': 'logging.handlers.RotatingFileHandler',
'filename': log_file,
'maxBytes': config.get('max_bytes', 10 * 1024 * 1024), # 10MB
'backupCount': config.get('backup_count', 5),
'formatter': log_format,
'level': log_level
}
# 系统日志处理器
if 'syslog' in handlers:
logging_config['handlers']['syslog'] = {
'class': 'logging.handlers.SysLogHandler',
'address': config.get('syslog_address', '/dev/log'),
'formatter': log_format,
'level': log_level
}
logging.config.dictConfig(logging_config)
# 日志中间件
class LoggingMiddleware:
"""日志中间件"""
def __init__(self, app, logger: StructuredLogger):
self.app = app
self.logger = logger
async def __call__(self, scope, receive, send):
if scope['type'] != 'http':
await self.app(scope, receive, send)
return
start_time = time.time()
request_id = self._generate_request_id()
# 记录请求开始
self.logger.info(
"Request started",
request_id=request_id,
method=scope['method'],
path=scope['path'],
query_string=scope['query_string'].decode(),
client=scope.get('client', ['unknown', 0])[0]
)
async def send_wrapper(message):
if message['type'] == 'http.response.start':
status_code = message['status']
duration = time.time() - start_time
# 记录请求完成
self.logger.info(
"Request completed",
request_id=request_id,
status_code=status_code,
duration=duration,
method=scope['method'],
path=scope['path']
)
await send(message)
try:
await self.app(scope, receive, send_wrapper)
except Exception as e:
duration = time.time() - start_time
# 记录请求错误
self.logger.error(
"Request failed",
request_id=request_id,
error=str(e),
error_type=type(e).__name__,
duration=duration,
method=scope['method'],
path=scope['path']
)
raise
def _generate_request_id(self) -> str:
"""生成请求ID"""
import uuid
return str(uuid.uuid4())
本章总结
本章详细介绍了MCP协议的部署与运维策略:
容器化部署方面:
- Docker部署 - 多阶段构建、安全配置和健康检查
- Docker Compose - 完整的服务编排和依赖管理
- Kubernetes部署 - 生产级的容器编排和自动扩缩容
监控与日志方面:
- Prometheus监控 - 全面的指标收集和监控
- 结构化日志 - JSON格式日志和日志中间件
- 可观测性 - 完整的监控、日志和追踪体系
这些部署和运维实践确保MCP服务能够: - 在生产环境中稳定运行 - 具备高可用性和可扩展性 - 提供完整的监控和故障排查能力
下一章我们将学习MCP协议的最佳实践与案例分析,包括实际项目经验和常见问题解决方案。