7.1 集群架构概述
7.1.1 MinIO分布式架构
MinIO采用分布式架构设计,支持水平扩展和高可用性。其核心特性包括:
架构特点: - 无单点故障:所有节点地位平等,无主从关系 - 自动故障恢复:节点故障时自动重新分布数据 - 线性扩展:支持从4个节点扩展到数千个节点 - 强一致性:使用纠删码确保数据一致性 - 高性能:并行读写,充分利用集群资源
from minio import Minio
from minio.error import S3Error
from datetime import datetime, timedelta
import json
import requests
import threading
import time
from typing import Dict, List, Any, Optional, Tuple
import subprocess
import os
import yaml
import socket
from concurrent.futures import ThreadPoolExecutor, as_completed
import psutil
import hashlib
import uuid
class MinIOClusterManager:
"""MinIO集群管理器"""
def __init__(self):
self.cluster_config = {}
self.nodes = []
self.clients = {}
self.health_check_interval = 30
self.monitoring_active = False
self.cluster_stats = {
'total_nodes': 0,
'healthy_nodes': 0,
'total_storage': 0,
'used_storage': 0,
'total_objects': 0,
'last_updated': None
}
def generate_cluster_config(self, nodes: List[Dict],
data_dirs_per_node: int = 4,
console_port: int = 9001) -> Dict[str, Any]:
"""生成集群配置"""
try:
# 验证节点数量(必须是4的倍数且至少4个)
if len(nodes) < 4 or len(nodes) % 4 != 0:
return {
'success': False,
'error': '节点数量必须是4的倍数且至少4个节点',
'node_count': len(nodes)
}
# 验证节点配置
for i, node in enumerate(nodes):
required_fields = ['host', 'data_dirs']
for field in required_fields:
if field not in node:
return {
'success': False,
'error': f'节点{i}缺少必需字段: {field}',
'node': node
}
# 生成集群配置
cluster_config = {
'version': '1.0',
'cluster_name': f"minio-cluster-{datetime.now().strftime('%Y%m%d')}",
'created_at': datetime.now().isoformat(),
'nodes': [],
'erasure_sets': len(nodes) // 4,
'total_drives': len(nodes) * data_dirs_per_node,
'console_port': console_port,
'api_port': 9000,
'access_key': 'minioadmin',
'secret_key': 'minioadmin123',
'region': 'us-east-1'
}
# 配置每个节点
for i, node in enumerate(nodes):
node_config = {
'node_id': i + 1,
'host': node['host'],
'api_endpoint': f"http://{node['host']}:9000",
'console_endpoint': f"http://{node['host']}:{console_port}",
'data_dirs': node['data_dirs'],
'status': 'pending',
'role': 'storage',
'zone': node.get('zone', 'default'),
'rack': node.get('rack', f'rack-{i // 4 + 1}'),
'environment_vars': {
'MINIO_ROOT_USER': cluster_config['access_key'],
'MINIO_ROOT_PASSWORD': cluster_config['secret_key'],
'MINIO_REGION': cluster_config['region'],
'MINIO_CONSOLE_ADDRESS': f":{console_port}",
'MINIO_PROMETHEUS_AUTH_TYPE': 'public'
}
}
cluster_config['nodes'].append(node_config)
# 生成启动命令
cluster_config['startup_commands'] = self._generate_startup_commands(cluster_config)
# 生成Docker Compose配置
cluster_config['docker_compose'] = self._generate_docker_compose(cluster_config)
# 生成Kubernetes配置
cluster_config['kubernetes'] = self._generate_kubernetes_config(cluster_config)
self.cluster_config = cluster_config
self.nodes = cluster_config['nodes']
return {
'success': True,
'message': f'集群配置生成成功: {len(nodes)}个节点',
'cluster_config': cluster_config,
'erasure_sets': cluster_config['erasure_sets'],
'total_drives': cluster_config['total_drives']
}
except Exception as e:
return {
'success': False,
'error': f'生成集群配置失败: {e}',
'nodes': nodes
}
def _generate_startup_commands(self, config: Dict) -> Dict[str, Any]:
"""生成启动命令"""
commands = {
'distributed_mode': [],
'systemd_services': [],
'docker_commands': []
}
# 构建分布式模式的服务器列表
server_list = []
for node in config['nodes']:
for data_dir in node['data_dirs']:
server_list.append(f"http://{node['host']}:9000{data_dir}")
servers_arg = ' '.join(server_list)
# 为每个节点生成启动命令
for node in config['nodes']:
# 分布式模式命令
cmd = f"minio server {servers_arg} --console-address :{config['console_port']}"
commands['distributed_mode'].append({
'node_id': node['node_id'],
'host': node['host'],
'command': cmd,
'working_dir': '/opt/minio',
'user': 'minio'
})
# Systemd服务配置
systemd_service = f"""
[Unit]
Description=MinIO Object Storage Server
Documentation=https://docs.min.io
Wants=network-online.target
After=network-online.target
AssertFileIsExecutable=/usr/local/bin/minio
[Service]
WorkingDirectory=/opt/minio
User=minio
Group=minio
ProtectProc=invisible
EnvironmentFile=-/etc/default/minio
ExecStartPre=/bin/bash -c "if [ -z \"${{MINIO_VOLUMES}}\" ]; then echo \"Variable MINIO_VOLUMES not set in /etc/default/minio\"; exit 1; fi"
ExecStart=/usr/local/bin/minio server $MINIO_OPTS $MINIO_VOLUMES
Restart=always
LimitNOFILE=65536
TasksMax=infinity
TimeoutStopSec=infinity
SendSIGKILL=no
[Install]
WantedBy=multi-user.target
"""
commands['systemd_services'].append({
'node_id': node['node_id'],
'host': node['host'],
'service_file': f'/etc/systemd/system/minio-node{node["node_id"]}.service',
'service_content': systemd_service,
'environment_file': '/etc/default/minio',
'environment_vars': {
'MINIO_VOLUMES': servers_arg,
'MINIO_OPTS': f'--console-address :{config["console_port"]}',
'MINIO_ROOT_USER': config['access_key'],
'MINIO_ROOT_PASSWORD': config['secret_key']
}
})
# Docker命令
docker_cmd = f"""
docker run -d \
--name minio-node{node['node_id']} \
--hostname {node['host']} \
--network minio-cluster \
-p 9000:9000 \
-p {config['console_port']}:{config['console_port']} \
-e MINIO_ROOT_USER={config['access_key']} \
-e MINIO_ROOT_PASSWORD={config['secret_key']} \
-e MINIO_REGION={config['region']} \
-v /data/minio:/data \
minio/minio server {servers_arg} --console-address :{config['console_port']}
"""
commands['docker_commands'].append({
'node_id': node['node_id'],
'host': node['host'],
'command': docker_cmd.strip()
})
return commands
def _generate_docker_compose(self, config: Dict) -> str:
"""生成Docker Compose配置"""
compose_config = {
'version': '3.8',
'services': {},
'networks': {
'minio-cluster': {
'driver': 'bridge'
}
},
'volumes': {}
}
# 构建服务器列表
server_list = []
for node in config['nodes']:
for data_dir in node['data_dirs']:
server_list.append(f"http://minio-node{node['node_id']}:9000{data_dir}")
servers_arg = ' '.join(server_list)
# 为每个节点创建服务
for node in config['nodes']:
service_name = f"minio-node{node['node_id']}"
# 创建数据卷
for i, data_dir in enumerate(node['data_dirs']):
volume_name = f"minio-data-node{node['node_id']}-{i+1}"
compose_config['volumes'][volume_name] = None
# 创建服务配置
service_config = {
'image': 'minio/minio:latest',
'container_name': service_name,
'hostname': f"minio-node{node['node_id']}",
'ports': [
f"{9000 + node['node_id'] - 1}:9000",
f"{config['console_port'] + node['node_id'] - 1}:{config['console_port']}"
],
'environment': {
'MINIO_ROOT_USER': config['access_key'],
'MINIO_ROOT_PASSWORD': config['secret_key'],
'MINIO_REGION': config['region']
},
'command': f"server {servers_arg} --console-address :{config['console_port']}",
'volumes': [],
'networks': ['minio-cluster'],
'restart': 'unless-stopped',
'healthcheck': {
'test': ['CMD', 'curl', '-f', 'http://localhost:9000/minio/health/live'],
'interval': '30s',
'timeout': '20s',
'retries': 3,
'start_period': '40s'
}
}
# 添加数据卷挂载
for i, data_dir in enumerate(node['data_dirs']):
volume_name = f"minio-data-node{node['node_id']}-{i+1}"
service_config['volumes'].append(f"{volume_name}:{data_dir}")
compose_config['services'][service_name] = service_config
return yaml.dump(compose_config, default_flow_style=False, sort_keys=False)
def _generate_kubernetes_config(self, config: Dict) -> Dict[str, str]:
"""生成Kubernetes配置"""
k8s_configs = {}
# Namespace配置
namespace_config = f"""
apiVersion: v1
kind: Namespace
metadata:
name: minio-cluster
labels:
name: minio-cluster
"""
k8s_configs['namespace.yaml'] = namespace_config
# Secret配置
secret_config = f"""
apiVersion: v1
kind: Secret
metadata:
name: minio-credentials
namespace: minio-cluster
type: Opaque
data:
root-user: {self._base64_encode(config['access_key'])}
root-password: {self._base64_encode(config['secret_key'])}
"""
k8s_configs['secret.yaml'] = secret_config
# StatefulSet配置
statefulset_config = f"""
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: minio-cluster
namespace: minio-cluster
spec:
serviceName: minio-headless
replicas: {len(config['nodes'])}
selector:
matchLabels:
app: minio
template:
metadata:
labels:
app: minio
spec:
containers:
- name: minio
image: minio/minio:latest
args:
- server
- http://minio-cluster-{{0...{len(config['nodes'])-1}}}.minio-headless.minio-cluster.svc.cluster.local/data
- --console-address
- :{config['console_port']}
env:
- name: MINIO_ROOT_USER
valueFrom:
secretKeyRef:
name: minio-credentials
key: root-user
- name: MINIO_ROOT_PASSWORD
valueFrom:
secretKeyRef:
name: minio-credentials
key: root-password
- name: MINIO_REGION
value: {config['region']}
ports:
- containerPort: 9000
name: api
- containerPort: {config['console_port']}
name: console
volumeMounts:
- name: data
mountPath: /data
livenessProbe:
httpGet:
path: /minio/health/live
port: 9000
initialDelaySeconds: 120
periodSeconds: 30
readinessProbe:
httpGet:
path: /minio/health/ready
port: 9000
initialDelaySeconds: 120
periodSeconds: 15
resources:
requests:
memory: "1Gi"
cpu: "500m"
limits:
memory: "2Gi"
cpu: "1000m"
volumeClaimTemplates:
- metadata:
name: data
spec:
accessModes: [ "ReadWriteOnce" ]
resources:
requests:
storage: 100Gi
storageClassName: fast-ssd
"""
k8s_configs['statefulset.yaml'] = statefulset_config
# Headless Service配置
headless_service_config = f"""
apiVersion: v1
kind: Service
metadata:
name: minio-headless
namespace: minio-cluster
spec:
clusterIP: None
selector:
app: minio
ports:
- port: 9000
name: api
- port: {config['console_port']}
name: console
"""
k8s_configs['headless-service.yaml'] = headless_service_config
# LoadBalancer Service配置
loadbalancer_service_config = f"""
apiVersion: v1
kind: Service
metadata:
name: minio-service
namespace: minio-cluster
spec:
type: LoadBalancer
selector:
app: minio
ports:
- port: 9000
targetPort: 9000
name: api
- port: {config['console_port']}
targetPort: {config['console_port']}
name: console
"""
k8s_configs['service.yaml'] = loadbalancer_service_config
# Ingress配置
ingress_config = f"""
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: minio-ingress
namespace: minio-cluster
annotations:
nginx.ingress.kubernetes.io/rewrite-target: /
nginx.ingress.kubernetes.io/ssl-redirect: "true"
nginx.ingress.kubernetes.io/proxy-body-size: "0"
nginx.ingress.kubernetes.io/proxy-read-timeout: "600"
nginx.ingress.kubernetes.io/proxy-send-timeout: "600"
spec:
tls:
- hosts:
- minio.example.com
- console.minio.example.com
secretName: minio-tls
rules:
- host: minio.example.com
http:
paths:
- path: /
pathType: Prefix
backend:
service:
name: minio-service
port:
number: 9000
- host: console.minio.example.com
http:
paths:
- path: /
pathType: Prefix
backend:
service:
name: minio-service
port:
number: {config['console_port']}
"""
k8s_configs['ingress.yaml'] = ingress_config
return k8s_configs
def _base64_encode(self, text: str) -> str:
"""Base64编码"""
import base64
return base64.b64encode(text.encode('utf-8')).decode('utf-8')
def save_cluster_config(self, config_dir: str = './cluster-config') -> Dict[str, Any]:
"""保存集群配置"""
try:
import os
# 创建配置目录
os.makedirs(config_dir, exist_ok=True)
if not self.cluster_config:
return {
'success': False,
'error': '没有可保存的集群配置',
'config_dir': config_dir
}
saved_files = []
# 保存主配置文件
config_file = os.path.join(config_dir, 'cluster-config.json')
with open(config_file, 'w', encoding='utf-8') as f:
json.dump(self.cluster_config, f, indent=2, ensure_ascii=False)
saved_files.append(config_file)
# 保存Docker Compose配置
compose_file = os.path.join(config_dir, 'docker-compose.yml')
with open(compose_file, 'w', encoding='utf-8') as f:
f.write(self.cluster_config['docker_compose'])
saved_files.append(compose_file)
# 保存Kubernetes配置
k8s_dir = os.path.join(config_dir, 'kubernetes')
os.makedirs(k8s_dir, exist_ok=True)
for filename, content in self.cluster_config['kubernetes'].items():
k8s_file = os.path.join(k8s_dir, filename)
with open(k8s_file, 'w', encoding='utf-8') as f:
f.write(content)
saved_files.append(k8s_file)
# 保存启动脚本
scripts_dir = os.path.join(config_dir, 'scripts')
os.makedirs(scripts_dir, exist_ok=True)
# 生成启动脚本
startup_script = self._generate_startup_script()
script_file = os.path.join(scripts_dir, 'start-cluster.sh')
with open(script_file, 'w', encoding='utf-8') as f:
f.write(startup_script)
saved_files.append(script_file)
# 生成停止脚本
stop_script = self._generate_stop_script()
stop_script_file = os.path.join(scripts_dir, 'stop-cluster.sh')
with open(stop_script_file, 'w', encoding='utf-8') as f:
f.write(stop_script)
saved_files.append(stop_script_file)
# 生成健康检查脚本
health_script = self._generate_health_check_script()
health_script_file = os.path.join(scripts_dir, 'health-check.sh')
with open(health_script_file, 'w', encoding='utf-8') as f:
f.write(health_script)
saved_files.append(health_script_file)
return {
'success': True,
'message': f'集群配置已保存到: {config_dir}',
'config_dir': config_dir,
'saved_files': saved_files,
'total_files': len(saved_files)
}
except Exception as e:
return {
'success': False,
'error': f'保存集群配置失败: {e}',
'config_dir': config_dir
}
def _generate_startup_script(self) -> str:
"""生成启动脚本"""
script = f"""
#!/bin/bash
# MinIO集群启动脚本
# 生成时间: {datetime.now().isoformat()}
set -e
echo "启动MinIO集群..."
echo "集群名称: {self.cluster_config.get('cluster_name', 'unknown')}"
echo "节点数量: {len(self.cluster_config.get('nodes', []))}"
echo "纠删码集合: {self.cluster_config.get('erasure_sets', 0)}"
# 检查Docker是否运行
if ! docker info > /dev/null 2>&1; then
echo "错误: Docker未运行或无法访问"
exit 1
fi
# 创建网络
echo "创建Docker网络..."
docker network create minio-cluster 2>/dev/null || echo "网络已存在"
# 启动容器
echo "启动MinIO容器..."
"""
# 添加每个节点的启动命令
for cmd_info in self.cluster_config['startup_commands']['docker_commands']:
script += f"""
echo "启动节点 {cmd_info['node_id']} ({cmd_info['host']})..."
{cmd_info['command']}
"""
script += f"""
echo "等待集群初始化..."
sleep 30
# 健康检查
echo "执行健康检查..."
./health-check.sh
echo "MinIO集群启动完成!"
echo "API端点: http://localhost:9000"
echo "控制台: http://localhost:{self.cluster_config.get('console_port', 9001)}"
echo "用户名: {self.cluster_config.get('access_key', 'minioadmin')}"
echo "密码: {self.cluster_config.get('secret_key', 'minioadmin123')}"
"""
return script
def _generate_stop_script(self) -> str:
"""生成停止脚本"""
script = f"""
#!/bin/bash
# MinIO集群停止脚本
# 生成时间: {datetime.now().isoformat()}
set -e
echo "停止MinIO集群..."
# 停止所有MinIO容器
echo "停止MinIO容器..."
"""
# 添加每个节点的停止命令
for node in self.cluster_config.get('nodes', []):
container_name = f"minio-node{node['node_id']}"
script += f"""
echo "停止容器 {container_name}..."
docker stop {container_name} 2>/dev/null || echo "容器 {container_name} 未运行"
docker rm {container_name} 2>/dev/null || echo "容器 {container_name} 不存在"
"""
script += f"""
# 删除网络
echo "删除Docker网络..."
docker network rm minio-cluster 2>/dev/null || echo "网络不存在或仍在使用中"
echo "MinIO集群已停止"
"""
return script
def _generate_health_check_script(self) -> str:
"""生成健康检查脚本"""
script = f"""
#!/bin/bash
# MinIO集群健康检查脚本
# 生成时间: {datetime.now().isoformat()}
echo "执行MinIO集群健康检查..."
healthy_nodes=0
total_nodes={len(self.cluster_config.get('nodes', []))}
"""
# 添加每个节点的健康检查
for node in self.cluster_config.get('nodes', []):
port = 9000 + node['node_id'] - 1
script += f"""
echo "检查节点 {node['node_id']} ({node['host']})..."
if curl -f -s http://localhost:{port}/minio/health/live > /dev/null; then
echo "✅ 节点 {node['node_id']} 健康"
((healthy_nodes++))
else
echo "❌ 节点 {node['node_id']} 不健康"
fi
"""
script += f"""
echo "健康检查完成"
echo "健康节点: $healthy_nodes/$total_nodes"
if [ $healthy_nodes -eq $total_nodes ]; then
echo "✅ 集群完全健康"
exit 0
elif [ $healthy_nodes -gt 0 ]; then
echo "⚠️ 集群部分健康"
exit 1
else
echo "❌ 集群不健康"
exit 2
fi
"""
return script
def load_cluster_config(self, config_file: str) -> Dict[str, Any]:
"""加载集群配置"""
try:
with open(config_file, 'r', encoding='utf-8') as f:
self.cluster_config = json.load(f)
self.nodes = self.cluster_config.get('nodes', [])
return {
'success': True,
'message': f'集群配置加载成功: {config_file}',
'cluster_name': self.cluster_config.get('cluster_name', 'unknown'),
'node_count': len(self.nodes),
'config_file': config_file
}
except Exception as e:
return {
'success': False,
'error': f'加载集群配置失败: {e}',
'config_file': config_file
}
def initialize_cluster_clients(self) -> Dict[str, Any]:
"""初始化集群客户端"""
try:
if not self.cluster_config:
return {
'success': False,
'error': '没有可用的集群配置',
'clients': {}
}
self.clients = {}
successful_connections = 0
failed_connections = []
for node in self.nodes:
try:
# 解析端点
endpoint = node['api_endpoint'].replace('http://', '')
# 创建客户端
client = Minio(
endpoint=endpoint,
access_key=self.cluster_config['access_key'],
secret_key=self.cluster_config['secret_key'],
secure=False
)
# 测试连接
client.list_buckets()
self.clients[f"node_{node['node_id']}"] = {
'client': client,
'node_info': node,
'status': 'connected',
'connected_at': datetime.now().isoformat()
}
successful_connections += 1
except Exception as node_error:
failed_connections.append({
'node_id': node['node_id'],
'host': node['host'],
'error': str(node_error)
})
return {
'success': successful_connections > 0,
'message': f'成功连接 {successful_connections}/{len(self.nodes)} 个节点',
'successful_connections': successful_connections,
'failed_connections': failed_connections,
'total_nodes': len(self.nodes),
'clients': list(self.clients.keys())
}
except Exception as e:
return {
'success': False,
'error': f'初始化集群客户端失败: {e}',
'clients': {}
}
def check_cluster_health(self) -> Dict[str, Any]:
"""检查集群健康状态"""
try:
if not self.clients:
init_result = self.initialize_cluster_clients()
if not init_result['success']:
return init_result
health_results = []
healthy_nodes = 0
total_storage = 0
used_storage = 0
total_objects = 0
for client_name, client_info in self.clients.items():
try:
client = client_info['client']
node_info = client_info['node_info']
# 检查节点健康状态
start_time = time.time()
# 列出存储桶(测试连接)
buckets = client.list_buckets()
response_time = time.time() - start_time
# 获取存储统计(模拟)
node_storage = {
'total_capacity': 100 * 1024 * 1024 * 1024, # 100GB模拟
'used_capacity': 0,
'available_capacity': 0,
'object_count': 0
}
# 统计存储桶信息
bucket_count = len(buckets)
for bucket in buckets:
try:
# 获取存储桶中的对象(限制数量以避免性能问题)
objects = list(client.list_objects(bucket.name, recursive=True))
node_storage['object_count'] += len(objects)
# 计算使用的存储空间
for obj in objects[:100]: # 限制检查的对象数量
try:
stat = client.stat_object(bucket.name, obj.object_name)
node_storage['used_capacity'] += stat.size
except:
pass
except:
pass
node_storage['available_capacity'] = node_storage['total_capacity'] - node_storage['used_capacity']
health_result = {
'node_id': node_info['node_id'],
'host': node_info['host'],
'endpoint': node_info['api_endpoint'],
'status': 'healthy',
'response_time_ms': response_time * 1000,
'bucket_count': bucket_count,
'storage': node_storage,
'last_checked': datetime.now().isoformat()
}
health_results.append(health_result)
healthy_nodes += 1
total_storage += node_storage['total_capacity']
used_storage += node_storage['used_capacity']
total_objects += node_storage['object_count']
except Exception as node_error:
health_result = {
'node_id': node_info['node_id'],
'host': node_info['host'],
'endpoint': node_info['api_endpoint'],
'status': 'unhealthy',
'error': str(node_error),
'last_checked': datetime.now().isoformat()
}
health_results.append(health_result)
# 更新集群统计
self.cluster_stats.update({
'total_nodes': len(self.nodes),
'healthy_nodes': healthy_nodes,
'total_storage': total_storage,
'used_storage': used_storage,
'total_objects': total_objects,
'last_updated': datetime.now().isoformat()
})
# 计算集群健康度
health_percentage = (healthy_nodes / len(self.nodes)) * 100 if self.nodes else 0
cluster_status = 'healthy' if health_percentage == 100 else (
'degraded' if health_percentage >= 50 else 'critical'
)
return {
'success': True,
'cluster_status': cluster_status,
'health_percentage': health_percentage,
'healthy_nodes': healthy_nodes,
'total_nodes': len(self.nodes),
'cluster_stats': self.cluster_stats,
'node_health': health_results,
'checked_at': datetime.now().isoformat()
}
except Exception as e:
return {
'success': False,
'error': f'检查集群健康状态失败: {e}',
'cluster_status': 'unknown'
}
def start_health_monitoring(self, interval: int = 30) -> Dict[str, Any]:
"""启动健康监控"""
try:
if self.monitoring_active:
return {
'success': False,
'error': '健康监控已在运行',
'monitoring_active': True
}
self.health_check_interval = interval
self.monitoring_active = True
def monitoring_loop():
while self.monitoring_active:
try:
health_result = self.check_cluster_health()
if health_result['success']:
status = health_result['cluster_status']
healthy_nodes = health_result['healthy_nodes']
total_nodes = health_result['total_nodes']
print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] "
f"集群状态: {status} ({healthy_nodes}/{total_nodes} 节点健康)")
# 如果集群状态异常,输出详细信息
if status != 'healthy':
for node_health in health_result['node_health']:
if node_health['status'] != 'healthy':
print(f" ❌ 节点 {node_health['node_id']} ({node_health['host']}): "
f"{node_health.get('error', 'Unknown error')}")
else:
print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] "
f"健康检查失败: {health_result['error']}")
time.sleep(self.health_check_interval)
except Exception as monitor_error:
print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] "
f"监控错误: {monitor_error}")
time.sleep(self.health_check_interval)
# 在后台线程中启动监控
monitor_thread = threading.Thread(target=monitoring_loop, daemon=True)
monitor_thread.start()
return {
'success': True,
'message': f'健康监控已启动,检查间隔: {interval}秒',
'monitoring_active': True,
'check_interval': interval,
'started_at': datetime.now().isoformat()
}
except Exception as e:
return {
'success': False,
'error': f'启动健康监控失败: {e}',
'monitoring_active': False
}
def stop_health_monitoring(self) -> Dict[str, Any]:
"""停止健康监控"""
try:
if not self.monitoring_active:
return {
'success': False,
'error': '健康监控未在运行',
'monitoring_active': False
}
self.monitoring_active = False
return {
'success': True,
'message': '健康监控已停止',
'monitoring_active': False,
'stopped_at': datetime.now().isoformat()
}
except Exception as e:
return {
'success': False,
'error': f'停止健康监控失败: {e}',
'monitoring_active': self.monitoring_active
}
def get_cluster_statistics(self) -> Dict[str, Any]:
"""获取集群统计信息"""
try:
# 执行健康检查以更新统计信息
health_result = self.check_cluster_health()
if not health_result['success']:
return health_result
stats = self.cluster_stats.copy()
# 计算额外统计信息
stats['storage_utilization_percentage'] = (
(stats['used_storage'] / stats['total_storage']) * 100
) if stats['total_storage'] > 0 else 0
stats['average_objects_per_node'] = (
stats['total_objects'] / stats['healthy_nodes']
) if stats['healthy_nodes'] > 0 else 0
stats['cluster_health_percentage'] = (
(stats['healthy_nodes'] / stats['total_nodes']) * 100
) if stats['total_nodes'] > 0 else 0
# 格式化存储大小
stats['formatted_storage'] = {
'total_storage': self._format_bytes(stats['total_storage']),
'used_storage': self._format_bytes(stats['used_storage']),
'available_storage': self._format_bytes(stats['total_storage'] - stats['used_storage'])
}
return {
'success': True,
'cluster_statistics': stats,
'cluster_config': {
'cluster_name': self.cluster_config.get('cluster_name', 'unknown'),
'erasure_sets': self.cluster_config.get('erasure_sets', 0),
'total_drives': self.cluster_config.get('total_drives', 0),
'created_at': self.cluster_config.get('created_at', 'unknown')
},
'generated_at': datetime.now().isoformat()
}
except Exception as e:
return {
'success': False,
'error': f'获取集群统计信息失败: {e}',
'cluster_statistics': {}
}
def _format_bytes(self, bytes_value: int) -> str:
"""格式化字节数"""
for unit in ['B', 'KB', 'MB', 'GB', 'TB', 'PB']:
if bytes_value < 1024.0:
return f"{bytes_value:.2f} {unit}"
bytes_value /= 1024.0
return f"{bytes_value:.2f} EB"
## 7.2 高可用配置
### 7.2.1 负载均衡和故障转移
```python
class MinIOLoadBalancer:
"""MinIO负载均衡器"""
def __init__(self, cluster_manager: MinIOClusterManager):
self.cluster_manager = cluster_manager
self.load_balancing_strategies = {
'round_robin': self._round_robin_strategy,
'least_connections': self._least_connections_strategy,
'weighted_round_robin': self._weighted_round_robin_strategy,
'health_based': self._health_based_strategy
}
self.current_strategy = 'round_robin'
self.current_index = 0
self.connection_counts = {}
self.node_weights = {}
self.failed_nodes = set()
self.retry_attempts = 3
self.retry_delay = 1
def set_load_balancing_strategy(self, strategy: str) -> Dict[str, Any]:
"""设置负载均衡策略"""
try:
if strategy not in self.load_balancing_strategies:
return {
'success': False,
'error': f'不支持的负载均衡策略: {strategy}',
'supported_strategies': list(self.load_balancing_strategies.keys())
}
self.current_strategy = strategy
# 初始化策略相关数据
if strategy == 'least_connections':
for client_name in self.cluster_manager.clients:
self.connection_counts[client_name] = 0
elif strategy == 'weighted_round_robin':
# 设置默认权重
for client_name in self.cluster_manager.clients:
self.node_weights[client_name] = 1
return {
'success': True,
'message': f'负载均衡策略已设置为: {strategy}',
'strategy': strategy,
'set_at': datetime.now().isoformat()
}
except Exception as e:
return {
'success': False,
'error': f'设置负载均衡策略失败: {e}',
'strategy': strategy
}
def set_node_weights(self, weights: Dict[str, int]) -> Dict[str, Any]:
"""设置节点权重(用于加权轮询)"""
try:
for client_name, weight in weights.items():
if client_name in self.cluster_manager.clients:
self.node_weights[client_name] = max(1, weight) # 权重至少为1
return {
'success': True,
'message': '节点权重设置成功',
'node_weights': self.node_weights,
'set_at': datetime.now().isoformat()
}
except Exception as e:
return {
'success': False,
'error': f'设置节点权重失败: {e}',
'weights': weights
}
def get_next_client(self) -> Dict[str, Any]:
"""获取下一个可用的客户端"""
try:
if not self.cluster_manager.clients:
return {
'success': False,
'error': '没有可用的客户端',
'client': None
}
# 获取健康的客户端列表
healthy_clients = {
name: info for name, info in self.cluster_manager.clients.items()
if name not in self.failed_nodes
}
if not healthy_clients:
# 如果所有节点都失败,重置失败列表并重试
self.failed_nodes.clear()
healthy_clients = self.cluster_manager.clients
# 根据策略选择客户端
strategy_func = self.load_balancing_strategies[self.current_strategy]
selected_client_name = strategy_func(healthy_clients)
if not selected_client_name:
return {
'success': False,
'error': '无法选择可用的客户端',
'client': None
}
client_info = self.cluster_manager.clients[selected_client_name]
# 测试客户端连接
try:
client = client_info['client']
client.list_buckets() # 简单的连接测试
# 更新连接计数
if selected_client_name in self.connection_counts:
self.connection_counts[selected_client_name] += 1
return {
'success': True,
'client': client,
'client_name': selected_client_name,
'node_info': client_info['node_info'],
'strategy': self.current_strategy,
'selected_at': datetime.now().isoformat()
}
except Exception as client_error:
# 标记节点为失败
self.failed_nodes.add(selected_client_name)
# 递归重试其他节点
return self.get_next_client()
except Exception as e:
return {
'success': False,
'error': f'获取客户端失败: {e}',
'client': None
}
def _round_robin_strategy(self, healthy_clients: Dict) -> str:
"""轮询策略"""
client_names = list(healthy_clients.keys())
if not client_names:
return None
selected = client_names[self.current_index % len(client_names)]
self.current_index += 1
return selected
def _least_connections_strategy(self, healthy_clients: Dict) -> str:
"""最少连接策略"""
if not healthy_clients:
return None
# 找到连接数最少的客户端
min_connections = float('inf')
selected_client = None
for client_name in healthy_clients:
connections = self.connection_counts.get(client_name, 0)
if connections < min_connections:
min_connections = connections
selected_client = client_name
return selected_client
def _weighted_round_robin_strategy(self, healthy_clients: Dict) -> str:
"""加权轮询策略"""
if not healthy_clients:
return None
# 构建加权列表
weighted_clients = []
for client_name in healthy_clients:
weight = self.node_weights.get(client_name, 1)
weighted_clients.extend([client_name] * weight)
if not weighted_clients:
return None
selected = weighted_clients[self.current_index % len(weighted_clients)]
self.current_index += 1
return selected
def _health_based_strategy(self, healthy_clients: Dict) -> str:
"""基于健康状态的策略"""
if not healthy_clients:
return None
# 获取集群健康状态
health_result = self.cluster_manager.check_cluster_health()
if not health_result['success']:
# 如果无法获取健康状态,回退到轮询
return self._round_robin_strategy(healthy_clients)
# 根据响应时间选择最快的节点
best_client = None
best_response_time = float('inf')
for node_health in health_result['node_health']:
if node_health['status'] == 'healthy':
client_name = f"node_{node_health['node_id']}"
if client_name in healthy_clients:
response_time = node_health.get('response_time_ms', float('inf'))
if response_time < best_response_time:
best_response_time = response_time
best_client = client_name
return best_client or self._round_robin_strategy(healthy_clients)
def execute_with_failover(self, operation_func, *args, **kwargs) -> Dict[str, Any]:
"""执行操作并支持故障转移"""
last_error = None
for attempt in range(self.retry_attempts):
try:
# 获取可用客户端
client_result = self.get_next_client()
if not client_result['success']:
last_error = client_result['error']
continue
client = client_result['client']
client_name = client_result['client_name']
# 执行操作
start_time = time.time()
result = operation_func(client, *args, **kwargs)
execution_time = time.time() - start_time
# 操作成功,从失败列表中移除节点
self.failed_nodes.discard(client_name)
# 减少连接计数
if client_name in self.connection_counts:
self.connection_counts[client_name] = max(0, self.connection_counts[client_name] - 1)
return {
'success': True,
'result': result,
'client_name': client_name,
'node_info': client_result['node_info'],
'execution_time_ms': execution_time * 1000,
'attempt': attempt + 1,
'strategy': self.current_strategy
}
except Exception as operation_error:
last_error = str(operation_error)
# 如果有客户端信息,标记为失败
if 'client_result' in locals() and client_result['success']:
self.failed_nodes.add(client_result['client_name'])
# 等待后重试
if attempt < self.retry_attempts - 1:
time.sleep(self.retry_delay)
return {
'success': False,
'error': f'操作失败,已重试 {self.retry_attempts} 次: {last_error}',
'attempts': self.retry_attempts,
'strategy': self.current_strategy
}
def get_load_balancer_statistics(self) -> Dict[str, Any]:
"""获取负载均衡器统计信息"""
try:
return {
'success': True,
'load_balancer_statistics': {
'current_strategy': self.current_strategy,
'total_clients': len(self.cluster_manager.clients),
'healthy_clients': len(self.cluster_manager.clients) - len(self.failed_nodes),
'failed_nodes': list(self.failed_nodes),
'connection_counts': self.connection_counts.copy(),
'node_weights': self.node_weights.copy(),
'current_index': self.current_index,
'retry_attempts': self.retry_attempts,
'retry_delay': self.retry_delay
},
'generated_at': datetime.now().isoformat()
}
except Exception as e:
return {
'success': False,
'error': f'获取负载均衡器统计信息失败: {e}',
'load_balancer_statistics': {}
}
## 7.3 实际应用示例
### 7.3.1 完整的集群部署示例
```python
class MinIOClusterDeployment:
"""MinIO集群部署管理"""
def __init__(self):
self.cluster_manager = MinIOClusterManager()
self.load_balancer = None
self.deployment_status = 'not_started'
self.deployment_log = []
def deploy_cluster_example(self) -> Dict[str, Any]:
"""部署集群示例"""
try:
print("=== MinIO集群部署示例 ===")
self.deployment_status = 'in_progress'
self._log("开始集群部署")
# 1. 定义集群节点
nodes = [
{
'host': 'minio-node1',
'data_dirs': ['/data1', '/data2', '/data3', '/data4'],
'zone': 'zone-1',
'rack': 'rack-1'
},
{
'host': 'minio-node2',
'data_dirs': ['/data1', '/data2', '/data3', '/data4'],
'zone': 'zone-1',
'rack': 'rack-1'
},
{
'host': 'minio-node3',
'data_dirs': ['/data1', '/data2', '/data3', '/data4'],
'zone': 'zone-2',
'rack': 'rack-2'
},
{
'host': 'minio-node4',
'data_dirs': ['/data1', '/data2', '/data3', '/data4'],
'zone': 'zone-2',
'rack': 'rack-2'
}
]
self._log(f"定义了 {len(nodes)} 个集群节点")
# 2. 生成集群配置
print("\n1. 生成集群配置...")
config_result = self.cluster_manager.generate_cluster_config(nodes)
if not config_result['success']:
self._log(f"集群配置生成失败: {config_result['error']}")
return config_result
self._log(f"集群配置生成成功: {config_result['erasure_sets']} 个纠删码集合")
print(f"✅ 集群配置生成成功")
print(f" - 节点数量: {len(nodes)}")
print(f" - 纠删码集合: {config_result['erasure_sets']}")
print(f" - 总驱动器: {config_result['total_drives']}")
# 3. 保存配置文件
print("\n2. 保存配置文件...")
save_result = self.cluster_manager.save_cluster_config('./minio-cluster-config')
if save_result['success']:
self._log(f"配置文件保存成功: {save_result['total_files']} 个文件")
print(f"✅ 配置文件已保存到: {save_result['config_dir']}")
print(f" - 保存文件数: {save_result['total_files']}")
# 显示保存的文件
for file_path in save_result['saved_files'][:5]:
print(f" - {file_path}")
if len(save_result['saved_files']) > 5:
print(f" - ... 还有 {len(save_result['saved_files']) - 5} 个文件")
else:
self._log(f"配置文件保存失败: {save_result['error']}")
print(f"❌ 配置文件保存失败: {save_result['error']}")
# 4. 初始化客户端连接(模拟)
print("\n3. 初始化客户端连接...")
# 注意:在实际部署中,这里需要等待集群启动完成
print("⚠️ 注意:请先启动集群,然后运行客户端初始化")
print(" 启动命令示例:")
print(" docker-compose -f ./minio-cluster-config/docker-compose.yml up -d")
# 模拟客户端初始化
self._log("模拟客户端初始化")
# 5. 设置负载均衡
print("\n4. 配置负载均衡...")
self.load_balancer = MinIOLoadBalancer(self.cluster_manager)
# 设置负载均衡策略
lb_result = self.load_balancer.set_load_balancing_strategy('health_based')
if lb_result['success']:
self._log(f"负载均衡策略设置成功: {lb_result['strategy']}")
print(f"✅ 负载均衡策略: {lb_result['strategy']}")
# 6. 生成部署总结
deployment_summary = {
'cluster_name': config_result['cluster_config']['cluster_name'],
'node_count': len(nodes),
'erasure_sets': config_result['erasure_sets'],
'total_drives': config_result['total_drives'],
'config_directory': save_result.get('config_dir', 'unknown'),
'load_balancing_strategy': 'health_based',
'deployment_status': 'configured',
'next_steps': [
'启动Docker容器或Kubernetes集群',
'验证集群健康状态',
'创建初始存储桶和策略',
'配置监控和告警',
'执行性能测试'
]
}
self.deployment_status = 'configured'
self._log("集群部署配置完成")
print("\n=== 部署配置完成 ===")
print(f"集群名称: {deployment_summary['cluster_name']}")
print(f"配置目录: {deployment_summary['config_directory']}")
print("\n下一步操作:")
for i, step in enumerate(deployment_summary['next_steps'], 1):
print(f"{i}. {step}")
return {
'success': True,
'message': '集群部署配置完成',
'deployment_summary': deployment_summary,
'deployment_log': self.deployment_log,
'completed_at': datetime.now().isoformat()
}
except Exception as e:
self.deployment_status = 'failed'
self._log(f"集群部署失败: {e}")
return {
'success': False,
'error': f'集群部署失败: {e}',
'deployment_status': self.deployment_status,
'deployment_log': self.deployment_log
}
def _log(self, message: str):
"""记录部署日志"""
log_entry = {
'timestamp': datetime.now().isoformat(),
'message': message
}
self.deployment_log.append(log_entry)
def test_cluster_operations(self) -> Dict[str, Any]:
"""测试集群操作"""
try:
if not self.load_balancer:
return {
'success': False,
'error': '负载均衡器未初始化',
'test_results': []
}
test_results = []
# 测试1:创建存储桶
def create_bucket_operation(client, bucket_name):
if not client.bucket_exists(bucket_name):
client.make_bucket(bucket_name)
return f"存储桶 {bucket_name} 创建成功"
bucket_result = self.load_balancer.execute_with_failover(
create_bucket_operation, 'test-cluster-bucket'
)
test_results.append({
'test_name': '创建存储桶',
'success': bucket_result['success'],
'result': bucket_result
})
# 测试2:上传文件
def upload_file_operation(client, bucket_name, object_name, data):
from io import BytesIO
client.put_object(
bucket_name, object_name,
BytesIO(data.encode('utf-8')),
len(data.encode('utf-8'))
)
return f"文件 {object_name} 上传成功"
upload_result = self.load_balancer.execute_with_failover(
upload_file_operation, 'test-cluster-bucket',
'test-file.txt', 'Hello MinIO Cluster!'
)
test_results.append({
'test_name': '上传文件',
'success': upload_result['success'],
'result': upload_result
})
# 测试3:下载文件
def download_file_operation(client, bucket_name, object_name):
response = client.get_object(bucket_name, object_name)
data = response.read().decode('utf-8')
response.close()
return f"文件内容: {data}"
download_result = self.load_balancer.execute_with_failover(
download_file_operation, 'test-cluster-bucket', 'test-file.txt'
)
test_results.append({
'test_name': '下载文件',
'success': download_result['success'],
'result': download_result
})
# 计算成功率
successful_tests = sum(1 for test in test_results if test['success'])
success_rate = (successful_tests / len(test_results)) * 100
return {
'success': success_rate > 0,
'message': f'集群操作测试完成,成功率: {success_rate:.1f}%',
'success_rate': success_rate,
'successful_tests': successful_tests,
'total_tests': len(test_results),
'test_results': test_results,
'tested_at': datetime.now().isoformat()
}
except Exception as e:
return {
'success': False,
'error': f'集群操作测试失败: {e}',
'test_results': []
}
# 使用示例
if __name__ == "__main__":
# 创建集群部署管理器
deployment = MinIOClusterDeployment()
# 执行集群部署
result = deployment.deploy_cluster_example()
if result['success']:
print("\n集群部署配置成功!")
print("请按照生成的配置文件启动集群。")
else:
print(f"\n集群部署失败: {result['error']}")
7.4 总结
本章详细介绍了MinIO的集群部署和高可用配置,主要内容包括:
7.4.1 核心功能
集群架构设计
- 分布式架构原理
- 纠删码技术应用
- 无单点故障设计
- 自动故障恢复机制
集群配置管理
- 自动生成集群配置
- Docker Compose部署
- Kubernetes部署
- 启动脚本生成
高可用特性
- 负载均衡策略
- 故障转移机制
- 健康状态监控
- 自动重试机制
7.4.2 技术特点
- 易于部署:提供多种部署方式和自动化脚本
- 高可用性:支持节点故障自动恢复
- 负载均衡:多种负载均衡策略可选
- 监控完善:实时健康检查和统计信息
- 扩展性强:支持水平扩展和动态配置
7.4.3 最佳实践
- 部署规划:合理规划节点数量和存储配置
- 网络优化:确保节点间网络连接稳定
- 监控告警:建立完善的监控和告警机制
- 备份策略:制定数据备份和恢复计划
- 性能调优:根据业务需求优化集群性能
下一章我们将介绍MinIO的监控和运维管理,包括性能监控、日志分析、故障诊断等内容。