4.1 预签名URL概述
4.1.1 预签名URL的概念和优势
预签名URL是一种临时的、带有签名的URL,允许客户端在不提供访问凭证的情况下访问MinIO中的对象。这种机制提供了安全、灵活的访问控制方式。
主要优势: - 安全性:无需暴露访问密钥 - 时效性:可设置过期时间 - 灵活性:支持不同的HTTP方法 - 便利性:可直接在浏览器中使用
”`python from minio import Minio from minio.error import S3Error from datetime import datetime, timedelta from urllib.parse import urlparse, parse_qs import requests import json from typing import Dict, List, Any, Optional, Union
class PresignedURLManager: “”“预签名URL管理器”“”
def __init__(self, client: Minio):
self.client = client
def generate_download_url(self, bucket_name: str, object_name: str,
expires: timedelta = timedelta(hours=1),
response_headers: Dict[str, str] = None,
version_id: str = None) -> Dict[str, Any]:
"""生成下载预签名URL"""
try:
# 检查对象是否存在
try:
self.client.stat_object(bucket_name, object_name, version_id=version_id)
except S3Error as e:
if e.code == 'NoSuchKey':
return {
'success': False,
'error': f"对象不存在: {bucket_name}/{object_name}",
'bucket_name': bucket_name,
'object_name': object_name
}
raise
# 生成预签名URL
presigned_url = self.client.presigned_get_object(
bucket_name=bucket_name,
object_name=object_name,
expires=expires,
response_headers=response_headers,
version_id=version_id
)
# 计算过期时间
expires_at = datetime.now() + expires
return {
'success': True,
'message': f"下载URL生成成功: {bucket_name}/{object_name}",
'bucket_name': bucket_name,
'object_name': object_name,
'version_id': version_id,
'presigned_url': presigned_url,
'expires_in_seconds': int(expires.total_seconds()),
'expires_at': expires_at.isoformat(),
'response_headers': response_headers,
'method': 'GET',
'generated_at': datetime.now().isoformat()
}
except S3Error as e:
return {
'success': False,
'error': f"生成下载URL失败: {e}",
'bucket_name': bucket_name,
'object_name': object_name
}
def generate_upload_url(self, bucket_name: str, object_name: str,
expires: timedelta = timedelta(hours=1),
content_type: str = None,
content_length_range: tuple = None) -> Dict[str, Any]:
"""生成上传预签名URL"""
try:
# 准备上传策略
policy = {}
if content_type:
policy['content-type'] = content_type
if content_length_range:
policy['content-length-range'] = content_length_range
# 生成预签名URL
presigned_url = self.client.presigned_put_object(
bucket_name=bucket_name,
object_name=object_name,
expires=expires
)
# 计算过期时间
expires_at = datetime.now() + expires
return {
'success': True,
'message': f"上传URL生成成功: {bucket_name}/{object_name}",
'bucket_name': bucket_name,
'object_name': object_name,
'presigned_url': presigned_url,
'expires_in_seconds': int(expires.total_seconds()),
'expires_at': expires_at.isoformat(),
'content_type': content_type,
'content_length_range': content_length_range,
'method': 'PUT',
'generated_at': datetime.now().isoformat()
}
except S3Error as e:
return {
'success': False,
'error': f"生成上传URL失败: {e}",
'bucket_name': bucket_name,
'object_name': object_name
}
def generate_post_policy(self, bucket_name: str, object_name_prefix: str = "",
expires: timedelta = timedelta(hours=1),
content_length_range: tuple = None,
content_type_starts_with: str = None) -> Dict[str, Any]:
"""生成POST表单上传策略"""
try:
from minio.policy import PostPolicy
# 创建POST策略
policy = PostPolicy()
policy.set_bucket_name(bucket_name)
# 设置对象名称前缀
if object_name_prefix:
policy.set_key_startswith(object_name_prefix)
# 设置过期时间
policy.set_expires(datetime.now() + expires)
# 设置内容长度范围
if content_length_range:
policy.set_content_length_range(content_length_range[0], content_length_range[1])
# 设置内容类型
if content_type_starts_with:
policy.set_content_type_startswith(content_type_starts_with)
# 生成POST表单数据
form_data = self.client.presigned_post_policy(policy)
return {
'success': True,
'message': f"POST策略生成成功: {bucket_name}",
'bucket_name': bucket_name,
'object_name_prefix': object_name_prefix,
'form_data': form_data,
'expires_in_seconds': int(expires.total_seconds()),
'expires_at': (datetime.now() + expires).isoformat(),
'content_length_range': content_length_range,
'content_type_starts_with': content_type_starts_with,
'method': 'POST',
'generated_at': datetime.now().isoformat()
}
except Exception as e:
return {
'success': False,
'error': f"生成POST策略失败: {e}",
'bucket_name': bucket_name
}
def validate_url(self, presigned_url: str) -> Dict[str, Any]:
"""验证预签名URL"""
try:
# 解析URL
parsed_url = urlparse(presigned_url)
query_params = parse_qs(parsed_url.query)
# 提取签名信息
signature = query_params.get('X-Amz-Signature', [None])[0]
expires = query_params.get('X-Amz-Expires', [None])[0]
date = query_params.get('X-Amz-Date', [None])[0]
# 验证URL格式
is_valid_format = all([signature, expires, date])
# 检查是否过期(基于URL中的时间信息)
is_expired = False
if date and expires:
try:
# 解析日期和过期时间
url_date = datetime.strptime(date, '%Y%m%dT%H%M%SZ')
expires_seconds = int(expires)
expiry_time = url_date + timedelta(seconds=expires_seconds)
is_expired = datetime.now() > expiry_time
except:
pass
return {
'success': True,
'presigned_url': presigned_url,
'is_valid_format': is_valid_format,
'is_expired': is_expired,
'signature': signature,
'expires_in_seconds': expires,
'date': date,
'host': parsed_url.netloc,
'path': parsed_url.path,
'validated_at': datetime.now().isoformat()
}
except Exception as e:
return {
'success': False,
'error': f"验证URL失败: {e}",
'presigned_url': presigned_url
}
4.2 安全访问控制
4.2.1 访问策略管理
”`python import json from minio.policy import Policy, Statement, Effect, Principal, Action, Resource, Condition
class AccessPolicyManager: “”“访问策略管理器”“”
def __init__(self, client: Minio):
self.client = client
def create_read_only_policy(self, bucket_name: str, object_prefix: str = "*") -> str:
"""创建只读访问策略"""
policy = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {"AWS": "*"},
"Action": ["s3:GetObject"],
"Resource": [f"arn:aws:s3:::{bucket_name}/{object_prefix}"]
}
]
}
return json.dumps(policy, indent=2)
def create_read_write_policy(self, bucket_name: str, object_prefix: str = "*") -> str:
"""创建读写访问策略"""
policy = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {"AWS": "*"},
"Action": [
"s3:GetObject",
"s3:PutObject",
"s3:DeleteObject"
],
"Resource": [f"arn:aws:s3:::{bucket_name}/{object_prefix}"]
},
{
"Effect": "Allow",
"Principal": {"AWS": "*"},
"Action": ["s3:ListBucket"],
"Resource": [f"arn:aws:s3:::{bucket_name}"],
"Condition": {
"StringLike": {
"s3:prefix": [object_prefix.rstrip('*')]
}
}
}
]
}
return json.dumps(policy, indent=2)
def create_time_limited_policy(self, bucket_name: str, object_prefix: str = "*",
start_time: datetime = None,
end_time: datetime = None) -> str:
"""创建时间限制访问策略"""
if not start_time:
start_time = datetime.now()
if not end_time:
end_time = start_time + timedelta(hours=24)
policy = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {"AWS": "*"},
"Action": ["s3:GetObject"],
"Resource": [f"arn:aws:s3:::{bucket_name}/{object_prefix}"],
"Condition": {
"DateGreaterThan": {
"aws:CurrentTime": start_time.strftime("%Y-%m-%dT%H:%M:%SZ")
},
"DateLessThan": {
"aws:CurrentTime": end_time.strftime("%Y-%m-%dT%H:%M:%SZ")
}
}
}
]
}
return json.dumps(policy, indent=2)
def create_ip_restricted_policy(self, bucket_name: str, allowed_ips: List[str],
object_prefix: str = "*") -> str:
"""创建IP限制访问策略"""
policy = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {"AWS": "*"},
"Action": ["s3:GetObject"],
"Resource": [f"arn:aws:s3:::{bucket_name}/{object_prefix}"],
"Condition": {
"IpAddress": {
"aws:SourceIp": allowed_ips
}
}
}
]
}
return json.dumps(policy, indent=2)
def set_bucket_policy(self, bucket_name: str, policy: str) -> Dict[str, Any]:
"""设置存储桶策略"""
try:
# 验证策略格式
try:
json.loads(policy)
except json.JSONDecodeError as e:
return {
'success': False,
'error': f"策略JSON格式无效: {e}",
'bucket_name': bucket_name
}
# 设置策略
self.client.set_bucket_policy(bucket_name, policy)
return {
'success': True,
'message': f"存储桶策略设置成功: {bucket_name}",
'bucket_name': bucket_name,
'policy': policy,
'set_at': datetime.now().isoformat()
}
except S3Error as e:
return {
'success': False,
'error': f"设置存储桶策略失败: {e}",
'bucket_name': bucket_name
}
def get_bucket_policy(self, bucket_name: str) -> Dict[str, Any]:
"""获取存储桶策略"""
try:
policy = self.client.get_bucket_policy(bucket_name)
# 解析策略
try:
policy_dict = json.loads(policy)
except:
policy_dict = None
return {
'success': True,
'bucket_name': bucket_name,
'policy': policy,
'policy_dict': policy_dict,
'retrieved_at': datetime.now().isoformat()
}
except S3Error as e:
if e.code == 'NoSuchBucketPolicy':
return {
'success': True,
'bucket_name': bucket_name,
'policy': None,
'message': "存储桶没有设置策略",
'retrieved_at': datetime.now().isoformat()
}
return {
'success': False,
'error': f"获取存储桶策略失败: {e}",
'bucket_name': bucket_name
}
def delete_bucket_policy(self, bucket_name: str) -> Dict[str, Any]:
"""删除存储桶策略"""
try:
self.client.delete_bucket_policy(bucket_name)
return {
'success': True,
'message': f"存储桶策略删除成功: {bucket_name}",
'bucket_name': bucket_name,
'deleted_at': datetime.now().isoformat()
}
except S3Error as e:
return {
'success': False,
'error': f"删除存储桶策略失败: {e}",
'bucket_name': bucket_name
}
4.2.2 用户和组管理
”`python class UserAccessManager: “”“用户访问管理器”“”
def __init__(self, admin_client: Minio):
self.admin_client = admin_client
self.users = {} # 模拟用户数据库
self.groups = {} # 模拟组数据库
def create_user_credentials(self, username: str, permissions: List[str],
expires: timedelta = timedelta(days=30)) -> Dict[str, Any]:
"""创建用户凭证"""
try:
import secrets
import string
# 生成访问密钥
access_key = f"user_{username}_{secrets.token_hex(8)}"
secret_key = ''.join(secrets.choice(string.ascii_letters + string.digits) for _ in range(32))
# 计算过期时间
expires_at = datetime.now() + expires
# 存储用户信息
user_info = {
'username': username,
'access_key': access_key,
'secret_key': secret_key,
'permissions': permissions,
'created_at': datetime.now().isoformat(),
'expires_at': expires_at.isoformat(),
'is_active': True
}
self.users[username] = user_info
return {
'success': True,
'message': f"用户凭证创建成功: {username}",
'username': username,
'access_key': access_key,
'secret_key': secret_key,
'permissions': permissions,
'expires_at': expires_at.isoformat(),
'created_at': datetime.now().isoformat()
}
except Exception as e:
return {
'success': False,
'error': f"创建用户凭证失败: {e}",
'username': username
}
def create_temporary_access(self, bucket_name: str, object_prefix: str,
permissions: List[str],
expires: timedelta = timedelta(hours=1)) -> Dict[str, Any]:
"""创建临时访问凭证"""
try:
# 生成临时凭证
temp_credentials = self.create_user_credentials(
username=f"temp_{secrets.token_hex(4)}",
permissions=permissions,
expires=expires
)
if not temp_credentials['success']:
return temp_credentials
# 创建临时客户端
temp_client = Minio(
endpoint=self.admin_client._base_url.netloc,
access_key=temp_credentials['access_key'],
secret_key=temp_credentials['secret_key'],
secure=self.admin_client._base_url.scheme == 'https'
)
# 生成预签名URL(如果需要)
presigned_urls = {}
if 'read' in permissions:
url_manager = PresignedURLManager(temp_client)
download_url = url_manager.generate_download_url(
bucket_name=bucket_name,
object_name=f"{object_prefix}*",
expires=expires
)
if download_url['success']:
presigned_urls['download'] = download_url['presigned_url']
if 'write' in permissions:
url_manager = PresignedURLManager(temp_client)
upload_url = url_manager.generate_upload_url(
bucket_name=bucket_name,
object_name=f"{object_prefix}temp_upload",
expires=expires
)
if upload_url['success']:
presigned_urls['upload'] = upload_url['presigned_url']
return {
'success': True,
'message': f"临时访问凭证创建成功",
'bucket_name': bucket_name,
'object_prefix': object_prefix,
'permissions': permissions,
'credentials': {
'access_key': temp_credentials['access_key'],
'secret_key': temp_credentials['secret_key']
},
'presigned_urls': presigned_urls,
'expires_at': temp_credentials['expires_at'],
'created_at': datetime.now().isoformat()
}
except Exception as e:
return {
'success': False,
'error': f"创建临时访问失败: {e}",
'bucket_name': bucket_name
}
def validate_user_access(self, username: str, bucket_name: str,
object_name: str, action: str) -> Dict[str, Any]:
"""验证用户访问权限"""
try:
# 检查用户是否存在
if username not in self.users:
return {
'success': False,
'error': f"用户不存在: {username}",
'username': username,
'access_granted': False
}
user_info = self.users[username]
# 检查用户是否激活
if not user_info['is_active']:
return {
'success': False,
'error': f"用户已被禁用: {username}",
'username': username,
'access_granted': False
}
# 检查是否过期
expires_at = datetime.fromisoformat(user_info['expires_at'])
if datetime.now() > expires_at:
return {
'success': False,
'error': f"用户凭证已过期: {username}",
'username': username,
'expires_at': user_info['expires_at'],
'access_granted': False
}
# 检查权限
permissions = user_info['permissions']
action_permission_map = {
'read': ['read', 'read_write', 'admin'],
'write': ['write', 'read_write', 'admin'],
'delete': ['delete', 'admin'],
'admin': ['admin']
}
required_permissions = action_permission_map.get(action, [])
has_permission = any(perm in permissions for perm in required_permissions)
return {
'success': True,
'username': username,
'bucket_name': bucket_name,
'object_name': object_name,
'action': action,
'access_granted': has_permission,
'user_permissions': permissions,
'required_permissions': required_permissions,
'validated_at': datetime.now().isoformat()
}
except Exception as e:
return {
'success': False,
'error': f"验证用户访问失败: {e}",
'username': username
}
def revoke_user_access(self, username: str) -> Dict[str, Any]:
"""撤销用户访问"""
try:
if username not in self.users:
return {
'success': False,
'error': f"用户不存在: {username}",
'username': username
}
# 禁用用户
self.users[username]['is_active'] = False
self.users[username]['revoked_at'] = datetime.now().isoformat()
return {
'success': True,
'message': f"用户访问已撤销: {username}",
'username': username,
'revoked_at': datetime.now().isoformat()
}
except Exception as e:
return {
'success': False,
'error': f"撤销用户访问失败: {e}",
'username': username
}
4.3 安全最佳实践
4.3.1 安全配置检查器
”`python class SecurityAuditor: “”“安全审计器”“”
def __init__(self, client: Minio):
self.client = client
self.policy_manager = AccessPolicyManager(client)
def audit_bucket_security(self, bucket_name: str) -> Dict[str, Any]:
"""审计存储桶安全配置"""
audit_results = {
'bucket_name': bucket_name,
'audit_timestamp': datetime.now().isoformat(),
'security_score': 0,
'max_score': 100,
'checks': [],
'recommendations': []
}
try:
# 检查1: 存储桶是否存在
if not self.client.bucket_exists(bucket_name):
audit_results['checks'].append({
'check': 'bucket_exists',
'status': 'fail',
'message': '存储桶不存在',
'score': 0
})
return audit_results
audit_results['checks'].append({
'check': 'bucket_exists',
'status': 'pass',
'message': '存储桶存在',
'score': 10
})
audit_results['security_score'] += 10
# 检查2: 存储桶策略
policy_result = self.policy_manager.get_bucket_policy(bucket_name)
if policy_result['success'] and policy_result['policy']:
audit_results['checks'].append({
'check': 'bucket_policy',
'status': 'pass',
'message': '存储桶已设置访问策略',
'score': 20
})
audit_results['security_score'] += 20
# 分析策略内容
try:
policy_dict = json.loads(policy_result['policy'])
statements = policy_dict.get('Statement', [])
# 检查是否有过于宽松的策略
has_public_read = False
has_public_write = False
for statement in statements:
principal = statement.get('Principal', {})
actions = statement.get('Action', [])
if principal == '*' or principal.get('AWS') == '*':
if 's3:GetObject' in actions:
has_public_read = True
if any(action in actions for action in ['s3:PutObject', 's3:DeleteObject']):
has_public_write = True
if has_public_write:
audit_results['checks'].append({
'check': 'public_write_access',
'status': 'warning',
'message': '存储桶允许公共写入访问',
'score': -10
})
audit_results['security_score'] -= 10
audit_results['recommendations'].append(
'限制公共写入访问,使用预签名URL或特定用户权限'
)
if has_public_read:
audit_results['checks'].append({
'check': 'public_read_access',
'status': 'info',
'message': '存储桶允许公共读取访问',
'score': 0
})
audit_results['recommendations'].append(
'确认公共读取访问是否必要,考虑使用预签名URL'
)
except:
audit_results['checks'].append({
'check': 'policy_analysis',
'status': 'warning',
'message': '无法解析存储桶策略',
'score': -5
})
audit_results['security_score'] -= 5
else:
audit_results['checks'].append({
'check': 'bucket_policy',
'status': 'warning',
'message': '存储桶未设置访问策略',
'score': 0
})
audit_results['recommendations'].append(
'设置适当的存储桶访问策略以控制访问权限'
)
# 检查3: 版本控制
try:
versioning = self.client.get_bucket_versioning(bucket_name)
if versioning and versioning.status == 'Enabled':
audit_results['checks'].append({
'check': 'versioning',
'status': 'pass',
'message': '版本控制已启用',
'score': 15
})
audit_results['security_score'] += 15
else:
audit_results['checks'].append({
'check': 'versioning',
'status': 'warning',
'message': '版本控制未启用',
'score': 0
})
audit_results['recommendations'].append(
'启用版本控制以防止意外删除和修改'
)
except:
audit_results['checks'].append({
'check': 'versioning',
'status': 'unknown',
'message': '无法检查版本控制状态',
'score': 0
})
# 检查4: 加密配置
try:
encryption = self.client.get_bucket_encryption(bucket_name)
if encryption:
audit_results['checks'].append({
'check': 'encryption',
'status': 'pass',
'message': '存储桶已启用加密',
'score': 20
})
audit_results['security_score'] += 20
else:
audit_results['checks'].append({
'check': 'encryption',
'status': 'warning',
'message': '存储桶未启用加密',
'score': 0
})
audit_results['recommendations'].append(
'启用服务器端加密以保护数据安全'
)
except:
audit_results['checks'].append({
'check': 'encryption',
'status': 'unknown',
'message': '无法检查加密配置',
'score': 0
})
# 检查5: 对象数量和大小
try:
from ..object_operations import ObjectLister
lister = ObjectLister(self.client)
objects_result = lister.list_objects(bucket_name, max_keys=1000)
if objects_result['success']:
object_count = objects_result['total_objects']
total_size_mb = objects_result['total_size_mb']
audit_results['checks'].append({
'check': 'object_inventory',
'status': 'info',
'message': f'存储桶包含 {object_count} 个对象,总大小 {total_size_mb} MB',
'score': 5
})
audit_results['security_score'] += 5
# 检查是否有敏感文件名
sensitive_patterns = ['password', 'secret', 'key', 'token', 'credential']
sensitive_objects = []
for obj in objects_result['objects']:
obj_name_lower = obj['object_name'].lower()
for pattern in sensitive_patterns:
if pattern in obj_name_lower:
sensitive_objects.append(obj['object_name'])
break
if sensitive_objects:
audit_results['checks'].append({
'check': 'sensitive_object_names',
'status': 'warning',
'message': f'发现 {len(sensitive_objects)} 个可能包含敏感信息的对象名称',
'score': -5,
'details': sensitive_objects[:5] # 只显示前5个
})
audit_results['security_score'] -= 5
audit_results['recommendations'].append(
'避免在对象名称中包含敏感信息,使用通用的命名规则'
)
except:
pass
# 计算最终安全评级
if audit_results['security_score'] >= 80:
audit_results['security_level'] = 'excellent'
elif audit_results['security_score'] >= 60:
audit_results['security_level'] = 'good'
elif audit_results['security_score'] >= 40:
audit_results['security_level'] = 'fair'
else:
audit_results['security_level'] = 'poor'
return audit_results
except Exception as e:
audit_results['error'] = f"安全审计失败: {e}"
return audit_results
def generate_security_report(self, bucket_names: List[str]) -> Dict[str, Any]:
"""生成安全报告"""
report = {
'report_timestamp': datetime.now().isoformat(),
'total_buckets': len(bucket_names),
'bucket_audits': [],
'summary': {
'excellent': 0,
'good': 0,
'fair': 0,
'poor': 0
},
'overall_recommendations': []
}
for bucket_name in bucket_names:
audit_result = self.audit_bucket_security(bucket_name)
report['bucket_audits'].append(audit_result)
# 统计安全等级
security_level = audit_result.get('security_level', 'unknown')
if security_level in report['summary']:
report['summary'][security_level] += 1
# 生成总体建议
if report['summary']['poor'] > 0:
report['overall_recommendations'].append(
f"{report['summary']['poor']} 个存储桶安全配置较差,需要立即改进"
)
if report['summary']['fair'] > 0:
report['overall_recommendations'].append(
f"{report['summary']['fair']} 个存储桶安全配置一般,建议优化"
)
# 计算总体安全评分
total_score = sum(
audit['security_score'] for audit in report['bucket_audits']
if 'security_score' in audit
)
max_total_score = len(bucket_names) * 100
report['overall_security_score'] = (total_score / max_total_score * 100) if max_total_score > 0 else 0
return report
4.4 实际应用示例
4.4.1 安全文件分享系统
”`python class SecureFileSharing: “”“安全文件分享系统”“”
def __init__(self, client: Minio):
self.client = client
self.url_manager = PresignedURLManager(client)
self.policy_manager = AccessPolicyManager(client)
self.user_manager = UserAccessManager(client)
self.auditor = SecurityAuditor(client)
self.shares = {} # 分享记录
def create_secure_share(self, bucket_name: str, object_name: str,
share_type: str = "download",
expires: timedelta = timedelta(hours=24),
password: str = None,
max_downloads: int = None,
allowed_ips: List[str] = None) -> Dict[str, Any]:
"""创建安全分享链接"""
try:
import secrets
# 生成分享ID
share_id = secrets.token_urlsafe(16)
# 生成预签名URL
if share_type == "download":
url_result = self.url_manager.generate_download_url(
bucket_name=bucket_name,
object_name=object_name,
expires=expires
)
elif share_type == "upload":
url_result = self.url_manager.generate_upload_url(
bucket_name=bucket_name,
object_name=object_name,
expires=expires
)
else:
return {
'success': False,
'error': f"不支持的分享类型: {share_type}"
}
if not url_result['success']:
return url_result
# 创建分享记录
share_info = {
'share_id': share_id,
'bucket_name': bucket_name,
'object_name': object_name,
'share_type': share_type,
'presigned_url': url_result['presigned_url'],
'expires_at': url_result['expires_at'],
'password': password,
'max_downloads': max_downloads,
'download_count': 0,
'allowed_ips': allowed_ips or [],
'created_at': datetime.now().isoformat(),
'is_active': True,
'access_log': []
}
self.shares[share_id] = share_info
# 生成安全分享URL
secure_url = f"https://your-app.com/share/{share_id}"
return {
'success': True,
'message': f"安全分享创建成功: {bucket_name}/{object_name}",
'share_id': share_id,
'secure_url': secure_url,
'presigned_url': url_result['presigned_url'],
'share_type': share_type,
'expires_at': url_result['expires_at'],
'has_password': password is not None,
'max_downloads': max_downloads,
'allowed_ips': allowed_ips,
'created_at': datetime.now().isoformat()
}
except Exception as e:
return {
'success': False,
'error': f"创建安全分享失败: {e}",
'bucket_name': bucket_name,
'object_name': object_name
}
def access_share(self, share_id: str, password: str = None,
client_ip: str = None) -> Dict[str, Any]:
"""访问分享链接"""
try:
# 检查分享是否存在
if share_id not in self.shares:
return {
'success': False,
'error': '分享链接不存在或已过期',
'share_id': share_id
}
share_info = self.shares[share_id]
# 检查分享是否激活
if not share_info['is_active']:
return {
'success': False,
'error': '分享链接已被禁用',
'share_id': share_id
}
# 检查是否过期
expires_at = datetime.fromisoformat(share_info['expires_at'])
if datetime.now() > expires_at:
share_info['is_active'] = False
return {
'success': False,
'error': '分享链接已过期',
'share_id': share_id,
'expires_at': share_info['expires_at']
}
# 检查密码
if share_info['password'] and password != share_info['password']:
# 记录访问失败
share_info['access_log'].append({
'timestamp': datetime.now().isoformat(),
'client_ip': client_ip,
'status': 'password_failed',
'message': '密码错误'
})
return {
'success': False,
'error': '密码错误',
'share_id': share_id
}
# 检查IP限制
if share_info['allowed_ips'] and client_ip not in share_info['allowed_ips']:
share_info['access_log'].append({
'timestamp': datetime.now().isoformat(),
'client_ip': client_ip,
'status': 'ip_blocked',
'message': 'IP地址不在允许列表中'
})
return {
'success': False,
'error': 'IP地址不在允许列表中',
'share_id': share_id
}
# 检查下载次数限制
if (share_info['max_downloads'] and
share_info['download_count'] >= share_info['max_downloads']):
share_info['is_active'] = False
return {
'success': False,
'error': '已达到最大下载次数限制',
'share_id': share_id,
'max_downloads': share_info['max_downloads']
}
# 记录成功访问
share_info['access_log'].append({
'timestamp': datetime.now().isoformat(),
'client_ip': client_ip,
'status': 'success',
'message': '访问成功'
})
# 增加下载计数
if share_info['share_type'] == 'download':
share_info['download_count'] += 1
return {
'success': True,
'message': '分享访问成功',
'share_id': share_id,
'presigned_url': share_info['presigned_url'],
'bucket_name': share_info['bucket_name'],
'object_name': share_info['object_name'],
'share_type': share_info['share_type'],
'download_count': share_info['download_count'],
'max_downloads': share_info['max_downloads'],
'accessed_at': datetime.now().isoformat()
}
except Exception as e:
return {
'success': False,
'error': f"访问分享失败: {e}",
'share_id': share_id
}
def revoke_share(self, share_id: str) -> Dict[str, Any]:
"""撤销分享"""
try:
if share_id not in self.shares:
return {
'success': False,
'error': '分享不存在',
'share_id': share_id
}
self.shares[share_id]['is_active'] = False
self.shares[share_id]['revoked_at'] = datetime.now().isoformat()
return {
'success': True,
'message': '分享已撤销',
'share_id': share_id,
'revoked_at': datetime.now().isoformat()
}
except Exception as e:
return {
'success': False,
'error': f"撤销分享失败: {e}",
'share_id': share_id
}
def get_share_analytics(self, share_id: str) -> Dict[str, Any]:
"""获取分享分析数据"""
try:
if share_id not in self.shares:
return {
'success': False,
'error': '分享不存在',
'share_id': share_id
}
share_info = self.shares[share_id]
access_log = share_info['access_log']
# 统计访问数据
total_accesses = len(access_log)
successful_accesses = len([log for log in access_log if log['status'] == 'success'])
failed_accesses = total_accesses - successful_accesses
# 统计IP访问
ip_stats = {}
for log in access_log:
ip = log.get('client_ip', 'unknown')
if ip not in ip_stats:
ip_stats[ip] = {'total': 0, 'success': 0, 'failed': 0}
ip_stats[ip]['total'] += 1
if log['status'] == 'success':
ip_stats[ip]['success'] += 1
else:
ip_stats[ip]['failed'] += 1
return {
'success': True,
'share_id': share_id,
'bucket_name': share_info['bucket_name'],
'object_name': share_info['object_name'],
'share_type': share_info['share_type'],
'created_at': share_info['created_at'],
'expires_at': share_info['expires_at'],
'is_active': share_info['is_active'],
'statistics': {
'total_accesses': total_accesses,
'successful_accesses': successful_accesses,
'failed_accesses': failed_accesses,
'success_rate': (successful_accesses / total_accesses * 100) if total_accesses > 0 else 0,
'download_count': share_info['download_count'],
'max_downloads': share_info['max_downloads'],
'ip_statistics': ip_stats
},
'access_log': access_log,
'analyzed_at': datetime.now().isoformat()
}
except Exception as e:
return {
'success': False,
'error': f"获取分享分析失败: {e}",
'share_id': share_id
}
使用示例
def security_example(): “”“安全访问示例”“”
print("=== MinIO安全访问示例 ===")
try:
# 连接MinIO
client = Minio(
endpoint="localhost:9000",
access_key="minioadmin",
secret_key="minioadmin",
secure=False
)
bucket_name = "security-demo"
# 创建存储桶
if not client.bucket_exists(bucket_name):
client.make_bucket(bucket_name)
print(f"✅ 创建存储桶: {bucket_name}")
# 创建安全文件分享系统
sharing_system = SecureFileSharing(client)
# 1. 上传测试文件
print("\n1. 上传测试文件...")
test_content = "这是一个需要安全分享的文件\n" + datetime.now().isoformat()
client.put_object(
bucket_name=bucket_name,
object_name="secure/test_document.txt",
data=io.BytesIO(test_content.encode()),
length=len(test_content.encode()),
content_type="text/plain"
)
# 2. 创建安全分享
print("\n2. 创建安全分享...")
share_result = sharing_system.create_secure_share(
bucket_name=bucket_name,
object_name="secure/test_document.txt",
share_type="download",
expires=timedelta(hours=1),
password="secure123",
max_downloads=5,
allowed_ips=["127.0.0.1", "192.168.1.100"]
)
if share_result['success']:
print(f"分享ID: {share_result['share_id']}")
print(f"安全URL: {share_result['secure_url']}")
print(f"过期时间: {share_result['expires_at']}")
# 3. 模拟访问分享
print("\n3. 模拟访问分享...")
# 错误密码访问
access_result = sharing_system.access_share(
share_id=share_result['share_id'],
password="wrong_password",
client_ip="127.0.0.1"
)
print(f"错误密码访问: {access_result['success']}")
# 正确密码访问
access_result = sharing_system.access_share(
share_id=share_result['share_id'],
password="secure123",
client_ip="127.0.0.1"
)
print(f"正确密码访问: {access_result['success']}")
# 4. 安全审计
print("\n4. 执行安全审计...")
auditor = SecurityAuditor(client)
audit_result = auditor.audit_bucket_security(bucket_name)
print(f"安全评分: {audit_result['security_score']}/100")
print(f"安全等级: {audit_result['security_level']}")
print(f"检查项目: {len(audit_result['checks'])}")
print(f"建议数量: {len(audit_result['recommendations'])}")
# 5. 获取分享分析
print("\n5. 获取分享分析...")
analytics_result = sharing_system.get_share_analytics(share_result['share_id'])
if analytics_result['success']:
stats = analytics_result['statistics']
print(f"总访问次数: {stats['total_accesses']}")
print(f"成功访问: {stats['successful_accesses']}")
print(f"失败访问: {stats['failed_accesses']}")
print(f"成功率: {stats['success_rate']:.1f}%")
print("\n✅ 安全访问示例完成!")
except Exception as e:
print(f"❌ 示例执行失败: {e}")
if name == “main”: security_example()
4.5 总结
本章详细介绍了MinIO的预签名URL和安全访问控制:
4.5.1 核心功能
- 预签名URL:下载、上传、POST策略生成
- 访问策略:只读、读写、时间限制、IP限制策略
- 用户管理:临时凭证、权限验证、访问撤销
- 安全审计:配置检查、安全评分、报告生成
- 安全分享:密码保护、访问限制、分析统计
4.5.2 安全最佳实践
- 设置合理的URL过期时间
- 使用最小权限原则
- 定期审计安全配置
- 监控访问日志
- 实施多层安全防护
4.5.3 下一步学习
- 多部分上传和大文件处理
- 对象生命周期管理
- 跨区域复制和灾备
- 性能优化和监控
下一章将介绍MinIO的多部分上传和大文件处理技术。