6.1 集群架构设计
6.1.1 集群规划
在部署Kafka集群之前,需要进行详细的架构设计和容量规划。
import math
from typing import Dict, List, Any, Optional, Tuple
from dataclasses import dataclass, field
from enum import Enum
import json
class NodeType(Enum):
"""节点类型"""
BROKER = "broker"
ZOOKEEPER = "zookeeper"
KAFKA_CONNECT = "kafka_connect"
SCHEMA_REGISTRY = "schema_registry"
KSQL = "ksql"
class DeploymentMode(Enum):
"""部署模式"""
STANDALONE = "standalone"
CLUSTER = "cluster"
DOCKER = "docker"
KUBERNETES = "kubernetes"
@dataclass
class HardwareSpec:
"""硬件规格"""
cpu_cores: int
memory_gb: int
disk_size_gb: int
disk_type: str # SSD, HDD
network_bandwidth_mbps: int
def calculate_score(self) -> float:
"""计算硬件评分"""
# 基础评分算法
cpu_score = self.cpu_cores * 10
memory_score = self.memory_gb * 2
disk_score = self.disk_size_gb * 0.1
# SSD加分
if self.disk_type.upper() == "SSD":
disk_score *= 2
network_score = self.network_bandwidth_mbps * 0.01
return cpu_score + memory_score + disk_score + network_score
@dataclass
class NodeConfig:
"""节点配置"""
node_id: str
node_type: NodeType
hostname: str
ip_address: str
hardware: HardwareSpec
rack_id: Optional[str] = None
availability_zone: Optional[str] = None
# Kafka特定配置
broker_id: Optional[int] = None
log_dirs: List[str] = field(default_factory=list)
# ZooKeeper特定配置
zk_id: Optional[int] = None
zk_data_dir: Optional[str] = None
@dataclass
class WorkloadRequirement:
"""工作负载需求"""
# 吞吐量需求
messages_per_second: int
bytes_per_second: int
# 存储需求
retention_days: int
replication_factor: int
# 可用性需求
availability_target: float # 99.9%
max_downtime_minutes: int
# 延迟需求
max_latency_ms: int
# 并发需求
max_producers: int
max_consumers: int
max_topics: int
max_partitions_per_topic: int
class ClusterPlanner:
"""集群规划器"""
def __init__(self):
self.min_brokers = 3
self.min_zookeepers = 3
self.safety_factor = 1.5 # 安全系数
def plan_cluster(self, workload: WorkloadRequirement,
available_hardware: List[HardwareSpec]) -> Dict[str, Any]:
"""规划集群"""
plan = {
"workload_analysis": self._analyze_workload(workload),
"capacity_planning": self._plan_capacity(workload),
"hardware_selection": self._select_hardware(workload, available_hardware),
"topology_design": self._design_topology(workload),
"configuration_recommendations": self._recommend_configurations(workload)
}
return plan
def _analyze_workload(self, workload: WorkloadRequirement) -> Dict[str, Any]:
"""分析工作负载"""
# 计算存储需求
daily_data_gb = (workload.bytes_per_second * 86400) / (1024**3)
total_storage_gb = daily_data_gb * workload.retention_days * workload.replication_factor
# 计算网络需求
network_in_mbps = (workload.bytes_per_second * 8) / (1024**2)
network_out_mbps = network_in_mbps * (workload.max_consumers / workload.max_producers if workload.max_producers > 0 else 1)
# 计算分区需求
recommended_partitions = max(
math.ceil(workload.messages_per_second / 10000), # 基于吞吐量
workload.max_consumers, # 基于消费者数量
6 # 最小分区数
)
return {
"daily_data_gb": daily_data_gb,
"total_storage_gb": total_storage_gb,
"network_in_mbps": network_in_mbps,
"network_out_mbps": network_out_mbps,
"recommended_partitions": recommended_partitions,
"workload_type": self._classify_workload(workload)
}
def _classify_workload(self, workload: WorkloadRequirement) -> str:
"""分类工作负载"""
if workload.messages_per_second > 100000:
return "high_throughput"
elif workload.max_latency_ms < 10:
return "low_latency"
elif workload.retention_days > 30:
return "long_retention"
else:
return "standard"
def _plan_capacity(self, workload: WorkloadRequirement) -> Dict[str, Any]:
"""容量规划"""
# 计算所需Broker数量
# 基于吞吐量
throughput_brokers = math.ceil(workload.messages_per_second / 50000) # 每个Broker 5万消息/秒
# 基于存储
daily_data_gb = (workload.bytes_per_second * 86400) / (1024**3)
total_storage_gb = daily_data_gb * workload.retention_days * workload.replication_factor
storage_brokers = math.ceil(total_storage_gb / 1000) # 每个Broker 1TB
# 基于分区数
total_partitions = workload.max_topics * workload.max_partitions_per_topic
partition_brokers = math.ceil(total_partitions / 1000) # 每个Broker 1000分区
# 取最大值并应用安全系数
required_brokers = max(throughput_brokers, storage_brokers, partition_brokers, self.min_brokers)
recommended_brokers = math.ceil(required_brokers * self.safety_factor)
# ZooKeeper数量(奇数)
if recommended_brokers <= 5:
zk_nodes = 3
elif recommended_brokers <= 10:
zk_nodes = 5
else:
zk_nodes = 7
return {
"required_brokers": required_brokers,
"recommended_brokers": recommended_brokers,
"zookeeper_nodes": zk_nodes,
"calculation_details": {
"throughput_based": throughput_brokers,
"storage_based": storage_brokers,
"partition_based": partition_brokers
}
}
def _select_hardware(self, workload: WorkloadRequirement,
available_hardware: List[HardwareSpec]) -> Dict[str, Any]:
"""选择硬件"""
workload_type = self._classify_workload(workload)
# 根据工作负载类型推荐硬件
if workload_type == "high_throughput":
min_cpu = 16
min_memory = 64
min_disk = 2000
preferred_disk = "SSD"
min_network = 10000
elif workload_type == "low_latency":
min_cpu = 8
min_memory = 32
min_disk = 500
preferred_disk = "SSD"
min_network = 1000
elif workload_type == "long_retention":
min_cpu = 8
min_memory = 32
min_disk = 5000
preferred_disk = "HDD"
min_network = 1000
else:
min_cpu = 8
min_memory = 16
min_disk = 1000
preferred_disk = "SSD"
min_network = 1000
# 筛选合适的硬件
suitable_hardware = []
for hw in available_hardware:
if (hw.cpu_cores >= min_cpu and
hw.memory_gb >= min_memory and
hw.disk_size_gb >= min_disk and
hw.network_bandwidth_mbps >= min_network):
suitable_hardware.append(hw)
# 按评分排序
suitable_hardware.sort(key=lambda x: x.calculate_score(), reverse=True)
return {
"requirements": {
"min_cpu_cores": min_cpu,
"min_memory_gb": min_memory,
"min_disk_gb": min_disk,
"preferred_disk_type": preferred_disk,
"min_network_mbps": min_network
},
"suitable_options": suitable_hardware[:3], # 前3个选项
"recommended": suitable_hardware[0] if suitable_hardware else None
}
def _design_topology(self, workload: WorkloadRequirement) -> Dict[str, Any]:
"""设计拓扑"""
# 机架感知配置
rack_awareness = workload.availability_target >= 0.999
# 跨可用区部署
multi_az = workload.max_downtime_minutes < 60
# 网络分段
network_segmentation = workload.availability_target >= 0.9999
return {
"rack_awareness": rack_awareness,
"multi_availability_zone": multi_az,
"network_segmentation": network_segmentation,
"load_balancer_required": workload.max_producers > 10,
"monitoring_required": True,
"backup_strategy": self._design_backup_strategy(workload)
}
def _design_backup_strategy(self, workload: WorkloadRequirement) -> Dict[str, Any]:
"""设计备份策略"""
if workload.availability_target >= 0.9999:
return {
"type": "continuous_replication",
"cross_region": True,
"rpo_minutes": 5,
"rto_minutes": 15
}
elif workload.availability_target >= 0.999:
return {
"type": "daily_snapshot",
"cross_region": False,
"rpo_minutes": 60,
"rto_minutes": 30
}
else:
return {
"type": "weekly_backup",
"cross_region": False,
"rpo_minutes": 1440,
"rto_minutes": 120
}
def _recommend_configurations(self, workload: WorkloadRequirement) -> Dict[str, Any]:
"""推荐配置"""
workload_type = self._classify_workload(workload)
broker_config = {
"num.network.threads": 8 if workload_type == "high_throughput" else 4,
"num.io.threads": 16 if workload_type == "high_throughput" else 8,
"socket.send.buffer.bytes": 102400,
"socket.receive.buffer.bytes": 102400,
"socket.request.max.bytes": 104857600,
"log.retention.hours": workload.retention_days * 24,
"log.segment.bytes": 1073741824,
"log.retention.check.interval.ms": 300000,
"default.replication.factor": workload.replication_factor,
"min.insync.replicas": max(1, workload.replication_factor - 1)
}
if workload_type == "low_latency":
broker_config.update({
"log.flush.interval.messages": 1,
"log.flush.interval.ms": 1
})
producer_config = {
"batch.size": 65536 if workload_type == "high_throughput" else 16384,
"linger.ms": 10 if workload_type == "high_throughput" else 0,
"compression.type": "lz4",
"acks": "all" if workload.availability_target >= 0.999 else "1"
}
consumer_config = {
"fetch.min.bytes": 50000 if workload_type == "high_throughput" else 1,
"fetch.max.wait.ms": 500,
"max.partition.fetch.bytes": 1048576
}
return {
"broker": broker_config,
"producer": producer_config,
"consumer": consumer_config,
"jvm_settings": self._recommend_jvm_settings(workload_type)
}
def _recommend_jvm_settings(self, workload_type: str) -> Dict[str, str]:
"""推荐JVM设置"""
if workload_type == "high_throughput":
return {
"heap_size": "8G",
"gc_algorithm": "G1GC",
"gc_options": "-XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35"
}
elif workload_type == "low_latency":
return {
"heap_size": "4G",
"gc_algorithm": "ParallelGC",
"gc_options": "-XX:MaxGCPauseMillis=10"
}
else:
return {
"heap_size": "6G",
"gc_algorithm": "G1GC",
"gc_options": "-XX:MaxGCPauseMillis=50"
}
class ClusterConfigGenerator:
"""集群配置生成器"""
def __init__(self):
self.base_port = 9092
self.zk_base_port = 2181
def generate_cluster_config(self, nodes: List[NodeConfig],
workload: WorkloadRequirement) -> Dict[str, Any]:
"""生成集群配置"""
broker_nodes = [n for n in nodes if n.node_type == NodeType.BROKER]
zk_nodes = [n for n in nodes if n.node_type == NodeType.ZOOKEEPER]
config = {
"cluster_info": {
"cluster_id": "kafka-cluster-001",
"broker_count": len(broker_nodes),
"zookeeper_count": len(zk_nodes)
},
"zookeeper_config": self._generate_zk_config(zk_nodes),
"broker_configs": self._generate_broker_configs(broker_nodes, zk_nodes, workload),
"client_config": self._generate_client_config(broker_nodes),
"monitoring_config": self._generate_monitoring_config(nodes)
}
return config
def _generate_zk_config(self, zk_nodes: List[NodeConfig]) -> Dict[str, Any]:
"""生成ZooKeeper配置"""
zk_config = {
"ensemble": [],
"common_config": {
"tickTime": 2000,
"initLimit": 10,
"syncLimit": 5,
"maxClientCnxns": 60,
"autopurge.snapRetainCount": 3,
"autopurge.purgeInterval": 24
}
}
for i, node in enumerate(zk_nodes, 1):
node.zk_id = i
node.zk_data_dir = f"/var/lib/zookeeper"
zk_config["ensemble"].append({
"id": i,
"hostname": node.hostname,
"ip": node.ip_address,
"client_port": self.zk_base_port,
"peer_port": self.zk_base_port + 1000,
"leader_port": self.zk_base_port + 2000,
"data_dir": node.zk_data_dir
})
# 生成连接字符串
zk_connect = ",".join([f"{node.hostname}:{self.zk_base_port}" for node in zk_nodes])
zk_config["connect_string"] = zk_connect
return zk_config
def _generate_broker_configs(self, broker_nodes: List[NodeConfig],
zk_nodes: List[NodeConfig],
workload: WorkloadRequirement) -> List[Dict[str, Any]]:
"""生成Broker配置"""
zk_connect = ",".join([f"{node.hostname}:{self.zk_base_port}" for node in zk_nodes])
broker_configs = []
for i, node in enumerate(broker_nodes, 1):
node.broker_id = i
node.log_dirs = [f"/var/kafka-logs-{j}" for j in range(1, 4)] # 3个日志目录
config = {
"broker_id": i,
"hostname": node.hostname,
"ip": node.ip_address,
"port": self.base_port,
"log_dirs": node.log_dirs,
"zookeeper_connect": zk_connect,
"properties": {
# 基础配置
"broker.id": i,
"listeners": f"PLAINTEXT://{node.ip_address}:{self.base_port}",
"log.dirs": ",".join(node.log_dirs),
"zookeeper.connect": zk_connect,
# 网络配置
"num.network.threads": 8,
"num.io.threads": 16,
"socket.send.buffer.bytes": 102400,
"socket.receive.buffer.bytes": 102400,
"socket.request.max.bytes": 104857600,
# 日志配置
"log.retention.hours": workload.retention_days * 24,
"log.segment.bytes": 1073741824,
"log.retention.check.interval.ms": 300000,
"log.cleanup.policy": "delete",
# 副本配置
"default.replication.factor": workload.replication_factor,
"min.insync.replicas": max(1, workload.replication_factor - 1),
"unclean.leader.election.enable": "false",
# 其他配置
"group.initial.rebalance.delay.ms": 3000,
"offsets.topic.replication.factor": min(3, len(broker_nodes)),
"transaction.state.log.replication.factor": min(3, len(broker_nodes)),
"transaction.state.log.min.isr": min(2, len(broker_nodes))
}
}
# 机架感知
if node.rack_id:
config["properties"]["broker.rack"] = node.rack_id
broker_configs.append(config)
return broker_configs
def _generate_client_config(self, broker_nodes: List[NodeConfig]) -> Dict[str, Any]:
"""生成客户端配置"""
bootstrap_servers = ",".join([f"{node.hostname}:{self.base_port}" for node in broker_nodes])
return {
"bootstrap_servers": bootstrap_servers,
"producer": {
"acks": "all",
"retries": 2147483647,
"batch.size": 16384,
"linger.ms": 0,
"buffer.memory": 33554432,
"compression.type": "lz4",
"max.in.flight.requests.per.connection": 5,
"enable.idempotence": "true"
},
"consumer": {
"enable.auto.commit": "true",
"auto.commit.interval.ms": 1000,
"session.timeout.ms": 30000,
"fetch.min.bytes": 1,
"fetch.max.wait.ms": 500
}
}
def _generate_monitoring_config(self, nodes: List[NodeConfig]) -> Dict[str, Any]:
"""生成监控配置"""
return {
"jmx_enabled": True,
"jmx_port": 9999,
"metrics_reporters": [
"org.apache.kafka.common.metrics.JmxReporter"
],
"log_level": "INFO",
"log_retention_days": 7,
"health_check_endpoints": [
f"http://{node.hostname}:8080/health" for node in nodes
]
}
# 使用示例
if __name__ == "__main__":
# 定义工作负载需求
workload = WorkloadRequirement(
messages_per_second=50000,
bytes_per_second=50 * 1024 * 1024, # 50MB/s
retention_days=7,
replication_factor=3,
availability_target=0.999,
max_downtime_minutes=30,
max_latency_ms=100,
max_producers=10,
max_consumers=20,
max_topics=100,
max_partitions_per_topic=12
)
# 可用硬件选项
available_hardware = [
HardwareSpec(cpu_cores=8, memory_gb=32, disk_size_gb=1000, disk_type="SSD", network_bandwidth_mbps=1000),
HardwareSpec(cpu_cores=16, memory_gb=64, disk_size_gb=2000, disk_type="SSD", network_bandwidth_mbps=10000),
HardwareSpec(cpu_cores=12, memory_gb=48, disk_size_gb=4000, disk_type="HDD", network_bandwidth_mbps=1000)
]
# 创建集群规划器
planner = ClusterPlanner()
# 生成集群规划
plan = planner.plan_cluster(workload, available_hardware)
print("=== Kafka集群规划报告 ===")
print(f"工作负载类型: {plan['workload_analysis']['workload_type']}")
print(f"推荐Broker数量: {plan['capacity_planning']['recommended_brokers']}")
print(f"ZooKeeper节点数量: {plan['capacity_planning']['zookeeper_nodes']}")
print(f"存储需求: {plan['workload_analysis']['total_storage_gb']:.1f} GB")
print(f"网络需求: 入站 {plan['workload_analysis']['network_in_mbps']:.1f} Mbps, 出站 {plan['workload_analysis']['network_out_mbps']:.1f} Mbps")
if plan['hardware_selection']['recommended']:
hw = plan['hardware_selection']['recommended']
print(f"推荐硬件: {hw.cpu_cores}核 {hw.memory_gb}GB内存 {hw.disk_size_gb}GB {hw.disk_type}")
# 创建节点配置
nodes = []
# ZooKeeper节点
for i in range(plan['capacity_planning']['zookeeper_nodes']):
node = NodeConfig(
node_id=f"zk-{i+1}",
node_type=NodeType.ZOOKEEPER,
hostname=f"zk-{i+1}.kafka.local",
ip_address=f"10.0.1.{i+10}",
hardware=plan['hardware_selection']['recommended'] or available_hardware[0],
rack_id=f"rack-{(i % 3) + 1}"
)
nodes.append(node)
# Broker节点
for i in range(plan['capacity_planning']['recommended_brokers']):
node = NodeConfig(
node_id=f"broker-{i+1}",
node_type=NodeType.BROKER,
hostname=f"kafka-{i+1}.kafka.local",
ip_address=f"10.0.2.{i+10}",
hardware=plan['hardware_selection']['recommended'] or available_hardware[0],
rack_id=f"rack-{(i % 3) + 1}"
)
nodes.append(node)
# 生成集群配置
config_generator = ClusterConfigGenerator()
cluster_config = config_generator.generate_cluster_config(nodes, workload)
print("\n=== 集群配置 ===")
print(f"ZooKeeper连接字符串: {cluster_config['zookeeper_config']['connect_string']}")
print(f"Bootstrap服务器: {cluster_config['client_config']['bootstrap_servers']}")
# 保存配置到文件
with open('kafka_cluster_config.json', 'w', encoding='utf-8') as f:
json.dump(cluster_config, f, indent=2, ensure_ascii=False)
print("\n集群配置已保存到 kafka_cluster_config.json")
6.1.2 网络架构设计
import ipaddress
from typing import Dict, List, Any, Optional, Set
from dataclasses import dataclass, field
from enum import Enum
class NetworkZone(Enum):
"""网络区域"""
PUBLIC = "public"
PRIVATE = "private"
DMZ = "dmz"
MANAGEMENT = "management"
class ProtocolType(Enum):
"""协议类型"""
TCP = "tcp"
UDP = "udp"
ICMP = "icmp"
@dataclass
class NetworkRule:
"""网络规则"""
name: str
source: str # IP/CIDR
destination: str # IP/CIDR
protocol: ProtocolType
port_range: str # "9092" or "9092-9094"
action: str # "allow" or "deny"
description: str = ""
@dataclass
class NetworkSegment:
"""网络段"""
name: str
cidr: str
zone: NetworkZone
vlan_id: Optional[int] = None
gateway: Optional[str] = None
dns_servers: List[str] = field(default_factory=list)
def contains_ip(self, ip: str) -> bool:
"""检查IP是否在此网段中"""
network = ipaddress.ip_network(self.cidr, strict=False)
return ipaddress.ip_address(ip) in network
class NetworkArchitect:
"""网络架构师"""
def __init__(self):
self.kafka_ports = {
"broker": [9092, 9093, 9094],
"jmx": [9999],
"metrics": [8080, 8081]
}
self.zk_ports = {
"client": [2181],
"peer": [2888],
"leader": [3888]
}
def design_network(self, nodes: List[NodeConfig],
security_level: str = "standard") -> Dict[str, Any]:
"""设计网络架构"""
design = {
"network_segments": self._design_segments(nodes, security_level),
"firewall_rules": self._design_firewall_rules(nodes, security_level),
"load_balancer_config": self._design_load_balancer(nodes),
"dns_config": self._design_dns_config(nodes),
"monitoring_network": self._design_monitoring_network(nodes)
}
return design
def _design_segments(self, nodes: List[NodeConfig],
security_level: str) -> List[NetworkSegment]:
"""设计网络段"""
segments = []
if security_level == "high":
# 高安全级别:每种服务一个网段
segments.extend([
NetworkSegment(
name="kafka-brokers",
cidr="10.0.10.0/24",
zone=NetworkZone.PRIVATE,
vlan_id=10,
gateway="10.0.10.1",
dns_servers=["10.0.1.10", "10.0.1.11"]
),
NetworkSegment(
name="zookeeper",
cidr="10.0.11.0/24",
zone=NetworkZone.PRIVATE,
vlan_id=11,
gateway="10.0.11.1",
dns_servers=["10.0.1.10", "10.0.1.11"]
),
NetworkSegment(
name="kafka-clients",
cidr="10.0.12.0/24",
zone=NetworkZone.PRIVATE,
vlan_id=12,
gateway="10.0.12.1",
dns_servers=["10.0.1.10", "10.0.1.11"]
),
NetworkSegment(
name="management",
cidr="10.0.1.0/24",
zone=NetworkZone.MANAGEMENT,
vlan_id=1,
gateway="10.0.1.1",
dns_servers=["10.0.1.10", "10.0.1.11"]
)
])
else:
# 标准安全级别:Kafka服务共享网段
segments.extend([
NetworkSegment(
name="kafka-cluster",
cidr="10.0.10.0/24",
zone=NetworkZone.PRIVATE,
vlan_id=10,
gateway="10.0.10.1",
dns_servers=["10.0.1.10", "10.0.1.11"]
),
NetworkSegment(
name="kafka-clients",
cidr="10.0.11.0/24",
zone=NetworkZone.PRIVATE,
vlan_id=11,
gateway="10.0.11.1",
dns_servers=["10.0.1.10", "10.0.1.11"]
),
NetworkSegment(
name="management",
cidr="10.0.1.0/24",
zone=NetworkZone.MANAGEMENT,
vlan_id=1,
gateway="10.0.1.1",
dns_servers=["10.0.1.10", "10.0.1.11"]
)
])
return segments
def _design_firewall_rules(self, nodes: List[NodeConfig],
security_level: str) -> List[NetworkRule]:
"""设计防火墙规则"""
rules = []
# 基础规则
rules.extend([
# 允许SSH管理访问
NetworkRule(
name="ssh-management",
source="10.0.1.0/24",
destination="10.0.0.0/8",
protocol=ProtocolType.TCP,
port_range="22",
action="allow",
description="SSH管理访问"
),
# 允许ICMP
NetworkRule(
name="icmp-internal",
source="10.0.0.0/8",
destination="10.0.0.0/8",
protocol=ProtocolType.ICMP,
port_range="*",
action="allow",
description="内部ICMP通信"
)
])
# Kafka Broker规则
broker_ips = [node.ip_address for node in nodes if node.node_type == NodeType.BROKER]
if broker_ips:
broker_cidr = self._calculate_cidr(broker_ips)
# Broker之间通信
rules.append(NetworkRule(
name="kafka-inter-broker",
source=broker_cidr,
destination=broker_cidr,
protocol=ProtocolType.TCP,
port_range="9092-9094",
action="allow",
description="Broker间通信"
))
# 客户端到Broker
rules.append(NetworkRule(
name="kafka-client-to-broker",
source="10.0.11.0/24", # 客户端网段
destination=broker_cidr,
protocol=ProtocolType.TCP,
port_range="9092",
action="allow",
description="客户端到Broker通信"
))
# JMX监控
rules.append(NetworkRule(
name="kafka-jmx-monitoring",
source="10.0.1.0/24", # 管理网段
destination=broker_cidr,
protocol=ProtocolType.TCP,
port_range="9999",
action="allow",
description="JMX监控访问"
))
# ZooKeeper规则
zk_ips = [node.ip_address for node in nodes if node.node_type == NodeType.ZOOKEEPER]
if zk_ips:
zk_cidr = self._calculate_cidr(zk_ips)
# ZooKeeper集群内部通信
rules.extend([
NetworkRule(
name="zk-peer-communication",
source=zk_cidr,
destination=zk_cidr,
protocol=ProtocolType.TCP,
port_range="2888",
action="allow",
description="ZooKeeper节点间通信"
),
NetworkRule(
name="zk-leader-election",
source=zk_cidr,
destination=zk_cidr,
protocol=ProtocolType.TCP,
port_range="3888",
action="allow",
description="ZooKeeper Leader选举"
)
])
# Broker到ZooKeeper
if broker_ips:
rules.append(NetworkRule(
name="broker-to-zk",
source=broker_cidr,
destination=zk_cidr,
protocol=ProtocolType.TCP,
port_range="2181",
action="allow",
description="Broker到ZooKeeper通信"
))
# 高安全级别的额外规则
if security_level == "high":
# 拒绝所有其他流量
rules.append(NetworkRule(
name="deny-all-other",
source="*",
destination="*",
protocol=ProtocolType.TCP,
port_range="*",
action="deny",
description="拒绝所有其他流量"
))
return rules
def _calculate_cidr(self, ips: List[str]) -> str:
"""计算IP列表的CIDR"""
if not ips:
return "0.0.0.0/0"
# 简化实现:假设所有IP在同一个/24网段
first_ip = ipaddress.ip_address(ips[0])
network = ipaddress.ip_network(f"{first_ip}/24", strict=False)
return str(network)
def _design_load_balancer(self, nodes: List[NodeConfig]) -> Dict[str, Any]:
"""设计负载均衡器"""
broker_nodes = [n for n in nodes if n.node_type == NodeType.BROKER]
if len(broker_nodes) < 2:
return {"enabled": False, "reason": "单节点无需负载均衡"}
return {
"enabled": True,
"type": "layer4", # TCP负载均衡
"algorithm": "round_robin",
"health_check": {
"protocol": "tcp",
"port": 9092,
"interval_seconds": 30,
"timeout_seconds": 5,
"healthy_threshold": 2,
"unhealthy_threshold": 3
},
"backend_servers": [
{
"ip": node.ip_address,
"port": 9092,
"weight": 100
} for node in broker_nodes
],
"virtual_ip": "10.0.10.100",
"virtual_port": 9092
}
def _design_dns_config(self, nodes: List[NodeConfig]) -> Dict[str, Any]:
"""设计DNS配置"""
dns_records = []
# 为每个节点创建A记录
for node in nodes:
dns_records.append({
"type": "A",
"name": node.hostname,
"value": node.ip_address,
"ttl": 300
})
# 创建服务记录
broker_nodes = [n for n in nodes if n.node_type == NodeType.BROKER]
zk_nodes = [n for n in nodes if n.node_type == NodeType.ZOOKEEPER]
if broker_nodes:
# Kafka集群SRV记录
for node in broker_nodes:
dns_records.append({
"type": "SRV",
"name": "_kafka._tcp.kafka.local",
"value": f"0 5 9092 {node.hostname}",
"ttl": 300
})
if zk_nodes:
# ZooKeeper集群SRV记录
for node in zk_nodes:
dns_records.append({
"type": "SRV",
"name": "_zookeeper._tcp.kafka.local",
"value": f"0 5 2181 {node.hostname}",
"ttl": 300
})
return {
"domain": "kafka.local",
"records": dns_records,
"search_domains": ["kafka.local", "local"],
"nameservers": ["10.0.1.10", "10.0.1.11"]
}
def _design_monitoring_network(self, nodes: List[NodeConfig]) -> Dict[str, Any]:
"""设计监控网络"""
return {
"monitoring_subnet": "10.0.1.0/24",
"metrics_collection": {
"protocol": "http",
"port": 8080,
"path": "/metrics",
"interval_seconds": 30
},
"log_collection": {
"protocol": "syslog",
"port": 514,
"facility": "local0"
},
"alerting": {
"webhook_url": "http://10.0.1.20:9093/api/v1/alerts",
"smtp_server": "10.0.1.25:587"
}
}
# 使用示例
if __name__ == "__main__":
# 创建示例节点
nodes = [
NodeConfig(
node_id="zk-1", node_type=NodeType.ZOOKEEPER,
hostname="zk-1.kafka.local", ip_address="10.0.10.10",
hardware=HardwareSpec(8, 16, 500, "SSD", 1000)
),
NodeConfig(
node_id="zk-2", node_type=NodeType.ZOOKEEPER,
hostname="zk-2.kafka.local", ip_address="10.0.10.11",
hardware=HardwareSpec(8, 16, 500, "SSD", 1000)
),
NodeConfig(
node_id="zk-3", node_type=NodeType.ZOOKEEPER,
hostname="zk-3.kafka.local", ip_address="10.0.10.12",
hardware=HardwareSpec(8, 16, 500, "SSD", 1000)
),
NodeConfig(
node_id="broker-1", node_type=NodeType.BROKER,
hostname="kafka-1.kafka.local", ip_address="10.0.10.20",
hardware=HardwareSpec(16, 32, 1000, "SSD", 1000)
),
NodeConfig(
node_id="broker-2", node_type=NodeType.BROKER,
hostname="kafka-2.kafka.local", ip_address="10.0.10.21",
hardware=HardwareSpec(16, 32, 1000, "SSD", 1000)
),
NodeConfig(
node_id="broker-3", node_type=NodeType.BROKER,
hostname="kafka-3.kafka.local", ip_address="10.0.10.22",
hardware=HardwareSpec(16, 32, 1000, "SSD", 1000)
)
]
# 创建网络架构师
architect = NetworkArchitect()
# 设计网络架构
network_design = architect.design_network(nodes, security_level="standard")
print("=== Kafka网络架构设计 ===")
print("\n网络段:")
for segment in network_design["network_segments"]:
print(f" {segment.name}: {segment.cidr} (VLAN {segment.vlan_id})")
print("\n防火墙规则:")
for rule in network_design["firewall_rules"][:5]: # 显示前5条规则
print(f" {rule.name}: {rule.source} -> {rule.destination}:{rule.port_range} ({rule.action})")
if network_design["load_balancer_config"]["enabled"]:
lb_config = network_design["load_balancer_config"]
print(f"\n负载均衡器: {lb_config['virtual_ip']}:{lb_config['virtual_port']}")
print(f" 后端服务器数量: {len(lb_config['backend_servers'])}")
print(f"\nDNS记录数量: {len(network_design['dns_config']['records'])}")
# 保存网络设计
import json
with open('kafka_network_design.json', 'w', encoding='utf-8') as f:
# 转换枚举为字符串以便JSON序列化
def enum_converter(obj):
if isinstance(obj, Enum):
return obj.value
raise TypeError(f"Object of type {type(obj)} is not JSON serializable")
json.dump(network_design, f, indent=2, ensure_ascii=False, default=enum_converter)
print("\n网络设计已保存到 kafka_network_design.json")
6.2 自动化部署
6.2.1 Ansible部署脚本
import yaml
import os
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, asdict
from pathlib import Path
class AnsiblePlaybookGenerator:
"""Ansible Playbook生成器"""
def __init__(self, output_dir: str = "ansible"):
self.output_dir = Path(output_dir)
self.output_dir.mkdir(exist_ok=True)
def generate_kafka_deployment(self, cluster_config: Dict[str, Any]) -> Dict[str, str]:
"""生成Kafka部署的Ansible Playbook"""
files = {}
# 生成inventory文件
files["inventory.yml"] = self._generate_inventory(cluster_config)
# 生成主playbook
files["site.yml"] = self._generate_site_playbook()
# 生成ZooKeeper playbook
files["zookeeper.yml"] = self._generate_zookeeper_playbook(cluster_config)
# 生成Kafka playbook
files["kafka.yml"] = self._generate_kafka_playbook(cluster_config)
# 生成配置模板
files["templates/zookeeper.properties.j2"] = self._generate_zk_template()
files["templates/server.properties.j2"] = self._generate_kafka_template()
# 生成变量文件
files["group_vars/all.yml"] = self._generate_group_vars(cluster_config)
# 写入文件
for filename, content in files.items():
file_path = self.output_dir / filename
file_path.parent.mkdir(parents=True, exist_ok=True)
with open(file_path, 'w', encoding='utf-8') as f:
f.write(content)
return {filename: str(self.output_dir / filename) for filename in files.keys()}
def _generate_inventory(self, cluster_config: Dict[str, Any]) -> str:
"""生成inventory文件"""
inventory = {
"all": {
"children": {
"zookeeper": {
"hosts": {}
},
"kafka": {
"hosts": {}
}
}
}
}
# 添加ZooKeeper主机
for zk_config in cluster_config["zookeeper_config"]["ensemble"]:
host_vars = {
"ansible_host": zk_config["ip"],
"zk_id": zk_config["id"],
"zk_client_port": zk_config["client_port"],
"zk_peer_port": zk_config["peer_port"],
"zk_leader_port": zk_config["leader_port"]
}
inventory["all"]["children"]["zookeeper"]["hosts"][zk_config["hostname"]] = host_vars
# 添加Kafka主机
for broker_config in cluster_config["broker_configs"]:
host_vars = {
"ansible_host": broker_config["ip"],
"broker_id": broker_config["broker_id"],
"kafka_port": broker_config["port"],
"log_dirs": broker_config["log_dirs"]
}
inventory["all"]["children"]["kafka"]["hosts"][broker_config["hostname"]] = host_vars
return yaml.dump(inventory, default_flow_style=False, allow_unicode=True)
def _generate_site_playbook(self) -> str:
"""生成主playbook"""
playbook = [
{
"name": "Deploy Kafka Cluster",
"hosts": "localhost",
"gather_facts": False,
"tasks": [
{
"name": "Deploy ZooKeeper",
"import_playbook": "zookeeper.yml"
},
{
"name": "Deploy Kafka",
"import_playbook": "kafka.yml"
}
]
}
]
return yaml.dump(playbook, default_flow_style=False, allow_unicode=True)
def _generate_zookeeper_playbook(self, cluster_config: Dict[str, Any]) -> str:
"""生成ZooKeeper playbook"""
zk_ensemble = []
for zk in cluster_config["zookeeper_config"]["ensemble"]:
zk_ensemble.append(f"server.{zk['id']}={zk['hostname']}:{zk['peer_port']}:{zk['leader_port']}")
playbook = [
{
"name": "Deploy ZooKeeper",
"hosts": "zookeeper",
"become": True,
"vars": {
"zk_version": "3.8.0",
"zk_user": "zookeeper",
"zk_group": "zookeeper",
"zk_data_dir": "/var/lib/zookeeper",
"zk_log_dir": "/var/log/zookeeper",
"zk_ensemble": zk_ensemble
},
"tasks": [
{
"name": "Create ZooKeeper user",
"user": {
"name": "{{ zk_user }}",
"group": "{{ zk_group }}",
"system": True,
"shell": "/bin/false",
"home": "/var/lib/zookeeper",
"create_home": False
}
},
{
"name": "Create ZooKeeper directories",
"file": {
"path": "{{ item }}",
"state": "restarted"
}
}
]
}
]
return yaml.dump(playbook, default_flow_style=False, allow_unicode=True)
def _generate_kafka_playbook(self, cluster_config: Dict[str, Any]) -> str:
"""生成Kafka playbook"""
playbook = [
{
"name": "Deploy Kafka",
"hosts": "kafka",
"become": True,
"vars": {
"kafka_version": "2.8.1",
"scala_version": "2.13",
"kafka_user": "kafka",
"kafka_group": "kafka",
"kafka_home": "/opt/kafka",
"kafka_log_dirs": "{{ log_dirs | join(',') }}",
"zk_connect": cluster_config["zookeeper_config"]["connect_string"]
},
"tasks": [
{
"name": "Install Java",
"package": {
"name": "openjdk-11-jdk",
"state": "present"
}
},
{
"name": "Create Kafka user",
"user": {
"name": "{{ kafka_user }}",
"group": "{{ kafka_group }}",
"system": True,
"shell": "/bin/false",
"home": "/var/lib/kafka",
"create_home": False
}
},
{
"name": "Create Kafka directories",
"file": {
"path": "{{ item }}",
"state": "directory",
"owner": "{{ kafka_user }}",
"group": "{{ kafka_group }}",
"mode": "0755"
},
"loop": "{{ ['{{ kafka_home }}', '/var/log/kafka'] + log_dirs }}"
},
{
"name": "Download Kafka",
"get_url": {
"url": "https://archive.apache.org/dist/kafka/{{ kafka_version }}/kafka_{{ scala_version }}-{{ kafka_version }}.tgz",
"dest": "/tmp/kafka_{{ scala_version }}-{{ kafka_version }}.tgz",
"mode": "0644"
}
},
{
"name": "Extract Kafka",
"unarchive": {
"src": "/tmp/kafka_{{ scala_version }}-{{ kafka_version }}.tgz",
"dest": "/opt",
"remote_src": True,
"owner": "{{ kafka_user }}",
"group": "{{ kafka_group }}",
"creates": "/opt/kafka_{{ scala_version }}-{{ kafka_version }}"
}
},
{
"name": "Create Kafka symlink",
"file": {
"src": "/opt/kafka_{{ scala_version }}-{{ kafka_version }}",
"dest": "{{ kafka_home }}/current",
"state": "link",
"owner": "{{ kafka_user }}",
"group": "{{ kafka_group }}"
}
},
{
"name": "Generate Kafka configuration",
"template": {
"src": "server.properties.j2",
"dest": "{{ kafka_home }}/current/config/server.properties",
"owner": "{{ kafka_user }}",
"group": "{{ kafka_group }}",
"mode": "0644"
},
"notify": "restart kafka"
},
{
"name": "Create Kafka systemd service",
"template": {
"src": "kafka.service.j2",
"dest": "/etc/systemd/system/kafka.service",
"mode": "0644"
},
"notify": [
"reload systemd",
"restart kafka"
]
},
{
"name": "Start and enable Kafka",
"systemd": {
"name": "kafka",
"state": "started",
"enabled": True,
"daemon_reload": True
}
}
],
"handlers": [
{
"name": "reload systemd",
"systemd": {
"daemon_reload": True
}
},
{
"name": "restart kafka",
"systemd": {
"name": "kafka",
"state": "restarted"
}
}
]
}
]
return yaml.dump(playbook, default_flow_style=False, allow_unicode=True)
def _generate_zk_template(self) -> str:
"""生成ZooKeeper配置模板"""
template = """# ZooKeeper配置文件
# 基本配置
tickTime={{ zk_tick_time | default(2000) }}
initLimit={{ zk_init_limit | default(10) }}
syncLimit={{ zk_sync_limit | default(5) }}
# 数据目录
dataDir={{ zk_data_dir }}
# 客户端端口
clientPort={{ zk_client_port }}
# 最大客户端连接数
maxClientCnxns={{ zk_max_client_cnxns | default(60) }}
# 自动清理
autopurge.snapRetainCount={{ zk_autopurge_snap_retain_count | default(3) }}
autopurge.purgeInterval={{ zk_autopurge_purge_interval | default(24) }}
# 集群配置
{% for server in zk_ensemble %}
{{ server }}
{% endfor %}
# 管理员服务器
admin.enableServer=true
admin.serverPort=8080
"""
return template
def _generate_kafka_template(self) -> str:
"""生成Kafka配置模板"""
template = """# Kafka配置文件
# Broker ID
broker.id={{ broker_id }}
# 监听器配置
listeners=PLAINTEXT://{{ ansible_host }}:{{ kafka_port }}
advertised.listeners=PLAINTEXT://{{ ansible_host }}:{{ kafka_port }}
# 日志目录
log.dirs={{ kafka_log_dirs }}
# ZooKeeper连接
zookeeper.connect={{ zk_connect }}
zookeeper.connection.timeout.ms=18000
# 网络配置
num.network.threads={{ kafka_num_network_threads | default(8) }}
num.io.threads={{ kafka_num_io_threads | default(16) }}
socket.send.buffer.bytes={{ kafka_socket_send_buffer_bytes | default(102400) }}
socket.receive.buffer.bytes={{ kafka_socket_receive_buffer_bytes | default(102400) }}
socket.request.max.bytes={{ kafka_socket_request_max_bytes | default(104857600) }}
# 日志配置
log.retention.hours={{ kafka_log_retention_hours | default(168) }}
log.segment.bytes={{ kafka_log_segment_bytes | default(1073741824) }}
log.retention.check.interval.ms={{ kafka_log_retention_check_interval_ms | default(300000) }}
log.cleanup.policy={{ kafka_log_cleanup_policy | default('delete') }}
# 副本配置
default.replication.factor={{ kafka_default_replication_factor | default(3) }}
min.insync.replicas={{ kafka_min_insync_replicas | default(2) }}
unclean.leader.election.enable={{ kafka_unclean_leader_election_enable | default('false') }}
# 其他配置
group.initial.rebalance.delay.ms={{ kafka_group_initial_rebalance_delay_ms | default(3000) }}
offsets.topic.replication.factor={{ kafka_offsets_topic_replication_factor | default(3) }}
transaction.state.log.replication.factor={{ kafka_transaction_state_log_replication_factor | default(3) }}
transaction.state.log.min.isr={{ kafka_transaction_state_log_min_isr | default(2) }}
# JMX配置
{% if kafka_jmx_enabled | default(true) %}
jmx.port={{ kafka_jmx_port | default(9999) }}
{% endif %}
# 机架感知
{% if broker_rack is defined %}
broker.rack={{ broker_rack }}
{% endif %}
"""
return template
def _generate_group_vars(self, cluster_config: Dict[str, Any]) -> str:
"""生成组变量文件"""
vars_dict = {
"# 全局配置": None,
"ansible_user": "ubuntu",
"ansible_ssh_private_key_file": "~/.ssh/kafka-cluster.pem",
"ansible_become": True,
"# Java配置": None,
"java_home": "/usr/lib/jvm/java-11-openjdk-amd64",
"# ZooKeeper配置": None,
"zk_tick_time": 2000,
"zk_init_limit": 10,
"zk_sync_limit": 5,
"zk_max_client_cnxns": 60,
"zk_autopurge_snap_retain_count": 3,
"zk_autopurge_purge_interval": 24,
"# Kafka配置": None,
"kafka_num_network_threads": 8,
"kafka_num_io_threads": 16,
"kafka_socket_send_buffer_bytes": 102400,
"kafka_socket_receive_buffer_bytes": 102400,
"kafka_socket_request_max_bytes": 104857600,
"kafka_log_retention_hours": 168,
"kafka_log_segment_bytes": 1073741824,
"kafka_log_retention_check_interval_ms": 300000,
"kafka_default_replication_factor": 3,
"kafka_min_insync_replicas": 2,
"kafka_unclean_leader_election_enable": False,
"kafka_group_initial_rebalance_delay_ms": 3000,
"kafka_jmx_enabled": True,
"kafka_jmx_port": 9999,
"# 监控配置": None,
"monitoring_enabled": True,
"metrics_retention_days": 30,
"# 安全配置": None,
"security_enabled": False,
"ssl_enabled": False,
"sasl_enabled": False
}
# 过滤掉注释键
filtered_vars = {k: v for k, v in vars_dict.items() if not k.startswith("#")}
return yaml.dump(filtered_vars, default_flow_style=False, allow_unicode=True)
# Docker部署配置生成器
class DockerComposeGenerator:
"""Docker Compose配置生成器"""
def __init__(self, output_dir: str = "docker"):
self.output_dir = Path(output_dir)
self.output_dir.mkdir(exist_ok=True)
def generate_kafka_compose(self, cluster_config: Dict[str, Any]) -> str:
"""生成Kafka集群的Docker Compose文件"""
services = {}
networks = {
"kafka-network": {
"driver": "bridge",
"ipam": {
"config": [
{"subnet": "172.20.0.0/16"}
]
}
}
}
volumes = {}
# 生成ZooKeeper服务
zk_ensemble = []
for i, zk_config in enumerate(cluster_config["zookeeper_config"]["ensemble"], 1):
service_name = f"zookeeper-{i}"
zk_ensemble.append(f"{service_name}:2181")
services[service_name] = {
"image": "confluentinc/cp-zookeeper:7.4.0",
"hostname": zk_config["hostname"],
"container_name": service_name,
"ports": [
f"{zk_config['client_port']}:2181"
],
"environment": {
"ZOOKEEPER_CLIENT_PORT": 2181,
"ZOOKEEPER_SERVER_ID": i,
"ZOOKEEPER_SERVERS": self._generate_zk_servers(cluster_config["zookeeper_config"]["ensemble"])
},
"volumes": [
f"zk-data-{i}:/var/lib/zookeeper/data",
f"zk-logs-{i}:/var/lib/zookeeper/log"
],
"networks": {
"kafka-network": {
"ipv4_address": f"172.20.1.{i+10}"
}
},
"restart": "unless-stopped",
"healthcheck": {
"test": ["CMD", "nc", "-z", "localhost", "2181"],
"interval": "30s",
"timeout": "10s",
"retries": 3
}
}
# 添加数据卷
volumes[f"zk-data-{i}"] = None
volumes[f"zk-logs-{i}"] = None
# 生成Kafka服务
for i, broker_config in enumerate(cluster_config["broker_configs"], 1):
service_name = f"kafka-{i}"
services[service_name] = {
"image": "confluentinc/cp-kafka:7.4.0",
"hostname": broker_config["hostname"],
"container_name": service_name,
"depends_on": [f"zookeeper-{j}" for j in range(1, len(cluster_config["zookeeper_config"]["ensemble"]) + 1)],
"ports": [
f"{broker_config['port']}:9092",
f"{9999 + i}:9999" # JMX端口
],
"environment": {
"KAFKA_BROKER_ID": broker_config["broker_id"],
"KAFKA_ZOOKEEPER_CONNECT": cluster_config["zookeeper_config"]["connect_string"],
"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP": "PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT",
"KAFKA_ADVERTISED_LISTENERS": f"PLAINTEXT://172.20.2.{i+10}:29092,PLAINTEXT_HOST://localhost:{broker_config['port']}",
"KAFKA_LISTENERS": "PLAINTEXT://0.0.0.0:29092,PLAINTEXT_HOST://0.0.0.0:9092",
"KAFKA_INTER_BROKER_LISTENER_NAME": "PLAINTEXT",
"KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR": min(3, len(cluster_config["broker_configs"])),
"KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR": min(3, len(cluster_config["broker_configs"])),
"KAFKA_TRANSACTION_STATE_LOG_MIN_ISR": min(2, len(cluster_config["broker_configs"])),
"KAFKA_DEFAULT_REPLICATION_FACTOR": min(3, len(cluster_config["broker_configs"])),
"KAFKA_MIN_INSYNC_REPLICAS": min(2, len(cluster_config["broker_configs"])),
"KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS": 0,
"KAFKA_JMX_PORT": 9999,
"KAFKA_JMX_HOSTNAME": "localhost"
},
"volumes": [
f"kafka-data-{i}:/var/lib/kafka/data",
f"kafka-logs-{i}:/var/log/kafka"
],
"networks": {
"kafka-network": {
"ipv4_address": f"172.20.2.{i+10}"
}
},
"restart": "unless-stopped",
"healthcheck": {
"test": ["CMD", "kafka-broker-api-versions", "--bootstrap-server", "localhost:9092"],
"interval": "30s",
"timeout": "10s",
"retries": 3
}
}
# 添加数据卷
volumes[f"kafka-data-{i}"] = None
volumes[f"kafka-logs-{i}"] = None
# 添加Kafka UI服务
services["kafka-ui"] = {
"image": "provectuslabs/kafka-ui:latest",
"container_name": "kafka-ui",
"depends_on": [f"kafka-{i}" for i in range(1, len(cluster_config["broker_configs"]) + 1)],
"ports": ["8080:8080"],
"environment": {
"KAFKA_CLUSTERS_0_NAME": "local",
"KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS": ",".join([f"kafka-{i}:29092" for i in range(1, len(cluster_config["broker_configs"]) + 1)]),
"KAFKA_CLUSTERS_0_ZOOKEEPER": ",".join([f"zookeeper-{i}:2181" for i in range(1, len(cluster_config["zookeeper_config"]["ensemble"]) + 1)])
},
"networks": ["kafka-network"],
"restart": "unless-stopped"
}
compose_config = {
"version": "3.8",
"services": services,
"networks": networks,
"volumes": volumes
}
compose_content = yaml.dump(compose_config, default_flow_style=False, allow_unicode=True)
# 写入文件
compose_file = self.output_dir / "docker-compose.yml"
with open(compose_file, 'w', encoding='utf-8') as f:
f.write(compose_content)
return str(compose_file)
def _generate_zk_servers(self, zk_ensemble: List[Dict[str, Any]]) -> str:
"""生成ZooKeeper服务器列表"""
servers = []
for i, zk in enumerate(zk_ensemble, 1):
servers.append(f"zookeeper-{i}:2888:3888")
return ";".join(servers)
def generate_env_file(self, cluster_config: Dict[str, Any]) -> str:
"""生成环境变量文件"""
env_content = """# Kafka集群环境变量
# 集群配置
CLUSTER_ID=kafka-cluster-001
KAFKA_VERSION=7.4.0
ZOOKEEPER_VERSION=7.4.0
# 网络配置
KAFKA_NETWORK=kafka-network
KAFKA_SUBNET=172.20.0.0/16
# 存储配置
DATA_DIR=./data
LOGS_DIR=./logs
# JVM配置
KAFKA_HEAP_OPTS=-Xmx1G -Xms1G
ZOOKEEPER_HEAP_OPTS=-Xmx512M -Xms512M
# 监控配置
JMX_ENABLED=true
METRICS_ENABLED=true
# 安全配置
SECURITY_ENABLED=false
SSL_ENABLED=false
SASL_ENABLED=false
"""
env_file = self.output_dir / ".env"
with open(env_file, 'w', encoding='utf-8') as f:
f.write(env_content)
return str(env_file)
# 使用示例
if __name__ == "__main__":
# 使用之前生成的集群配置
from pathlib import Path
import json
# 读取集群配置
config_file = Path("kafka_cluster_config.json")
if config_file.exists():
with open(config_file, 'r', encoding='utf-8') as f:
cluster_config = json.load(f)
print("=== 生成Ansible部署脚本 ===")
ansible_gen = AnsiblePlaybookGenerator()
ansible_files = ansible_gen.generate_kafka_deployment(cluster_config)
print("生成的Ansible文件:")
for filename, filepath in ansible_files.items():
print(f" {filename}: {filepath}")
print("\n=== 生成Docker Compose配置 ===")
docker_gen = DockerComposeGenerator()
compose_file = docker_gen.generate_kafka_compose(cluster_config)
env_file = docker_gen.generate_env_file(cluster_config)
print(f"Docker Compose文件: {compose_file}")
print(f"环境变量文件: {env_file}")
print("\n=== 部署说明 ===")
print("Ansible部署:")
print(" 1. 配置SSH密钥和inventory")
print(" 2. 运行: ansible-playbook -i inventory.yml site.yml")
print("\nDocker部署:")
print(" 1. 确保Docker和Docker Compose已安装")
print(" 2. 运行: docker-compose up -d")
print(" 3. 访问Kafka UI: http://localhost:8080")
else:
print("请先运行集群规划生成kafka_cluster_config.json文件")
6.2.2 Kubernetes部署
import yaml
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from pathlib import Path
class KubernetesManifestGenerator:
"""Kubernetes清单生成器"""
def __init__(self, namespace: str = "kafka", output_dir: str = "k8s"):
self.namespace = namespace
self.output_dir = Path(output_dir)
self.output_dir.mkdir(exist_ok=True)
def generate_kafka_manifests(self, cluster_config: Dict[str, Any]) -> Dict[str, str]:
"""生成Kafka集群的Kubernetes清单"""
manifests = {}
# 生成命名空间
manifests["00-namespace.yaml"] = self._generate_namespace()
# 生成ConfigMap
manifests["01-configmap.yaml"] = self._generate_configmaps(cluster_config)
# 生成ZooKeeper StatefulSet
manifests["02-zookeeper.yaml"] = self._generate_zookeeper_statefulset(cluster_config)
# 生成Kafka StatefulSet
manifests["03-kafka.yaml"] = self._generate_kafka_statefulset(cluster_config)
# 生成Services
manifests["04-services.yaml"] = self._generate_services(cluster_config)
# 生成Ingress
manifests["05-ingress.yaml"] = self._generate_ingress()
# 生成监控相关
manifests["06-monitoring.yaml"] = self._generate_monitoring()
# 写入文件
for filename, content in manifests.items():
file_path = self.output_dir / filename
with open(file_path, 'w', encoding='utf-8') as f:
f.write(content)
return {filename: str(self.output_dir / filename) for filename in manifests.keys()}
def _generate_namespace(self) -> str:
"""生成命名空间"""
namespace = {
"apiVersion": "v1",
"kind": "Namespace",
"metadata": {
"name": self.namespace,
"labels": {
"app": "kafka",
"component": "messaging"
}
}
}
return yaml.dump(namespace, default_flow_style=False)
def _generate_configmaps(self, cluster_config: Dict[str, Any]) -> str:
"""生成ConfigMap"""
# ZooKeeper配置
zk_config = {
"zoo.cfg": """tickTime=2000
initLimit=10
syncLimit=5
dataDir=/var/lib/zookeeper/data
clientPort=2181
maxClientCnxns=60
autopurge.snapRetainCount=3
autopurge.purgeInterval=24
admin.enableServer=true
admin.serverPort=8080
"""
}
# Kafka配置模板
kafka_config = {
"server.properties": """# Broker配置
broker.id=${KAFKA_BROKER_ID}
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://${POD_NAME}.kafka-headless.${POD_NAMESPACE}.svc.cluster.local:9092
# ZooKeeper连接
zookeeper.connect=zk-0.zookeeper-headless.kafka.svc.cluster.local:2181,zk-1.zookeeper-headless.kafka.svc.cluster.local:2181,zk-2.zookeeper-headless.kafka.svc.cluster.local:2181
zookeeper.connection.timeout.ms=18000
# 日志配置
log.dirs=/var/lib/kafka/data
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleanup.policy=delete
# 副本配置
default.replication.factor=3
min.insync.replicas=2
unclean.leader.election.enable=false
# 网络配置
num.network.threads=8
num.io.threads=16
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
# 其他配置
group.initial.rebalance.delay.ms=3000
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
"""
}
configmaps = [
{
"apiVersion": "v1",
"kind": "ConfigMap",
"metadata": {
"name": "zookeeper-config",
"namespace": self.namespace
},
"data": zk_config
},
{
"apiVersion": "v1",
"kind": "ConfigMap",
"metadata": {
"name": "kafka-config",
"namespace": self.namespace
},
"data": kafka_config
}
]
return yaml.dump_all(configmaps, default_flow_style=False)
def _generate_zookeeper_statefulset(self, cluster_config: Dict[str, Any]) -> str:
"""生成ZooKeeper StatefulSet"""
zk_count = len(cluster_config["zookeeper_config"]["ensemble"])
statefulset = {
"apiVersion": "apps/v1",
"kind": "StatefulSet",
"metadata": {
"name": "zookeeper",
"namespace": self.namespace,
"labels": {
"app": "zookeeper",
"component": "zookeeper"
}
},
"spec": {
"serviceName": "zookeeper-headless",
"replicas": zk_count,
"selector": {
"matchLabels": {
"app": "zookeeper"
}
},
"template": {
"metadata": {
"labels": {
"app": "zookeeper",
"component": "zookeeper"
}
},
"spec": {
"containers": [
{
"name": "zookeeper",
"image": "confluentinc/cp-zookeeper:7.4.0",
"ports": [
{"containerPort": 2181, "name": "client"},
{"containerPort": 2888, "name": "peer"},
{"containerPort": 3888, "name": "leader"}
],
"env": [
{"name": "ZOOKEEPER_CLIENT_PORT", "value": "2181"},
{"name": "ZOOKEEPER_TICK_TIME", "value": "2000"},
{"name": "ZOOKEEPER_INIT_LIMIT", "value": "10"},
{"name": "ZOOKEEPER_SYNC_LIMIT", "value": "5"},
{"name": "ZOOKEEPER_MAX_CLIENT_CNXNS", "value": "60"},
{
"name": "ZOOKEEPER_SERVER_ID",
"valueFrom": {
"fieldRef": {
"fieldPath": "metadata.name"
}
}
}
],
"command": [
"bash",
"-c",
"export ZOOKEEPER_SERVER_ID=$((${HOSTNAME##*-}+1)) && /etc/confluent/docker/run"
],
"volumeMounts": [
{
"name": "zookeeper-data",
"mountPath": "/var/lib/zookeeper/data"
},
{
"name": "zookeeper-logs",
"mountPath": "/var/lib/zookeeper/log"
}
],
"livenessProbe": {
"tcpSocket": {"port": 2181},
"initialDelaySeconds": 30,
"periodSeconds": 10
},
"readinessProbe": {
"tcpSocket": {"port": 2181},
"initialDelaySeconds": 10,
"periodSeconds": 5
},
"resources": {
"requests": {
"memory": "512Mi",
"cpu": "250m"
},
"limits": {
"memory": "1Gi",
"cpu": "500m"
}
}
}
]
}
},
"volumeClaimTemplates": [
{
"metadata": {
"name": "zookeeper-data"
},
"spec": {
"accessModes": ["ReadWriteOnce"],
"resources": {
"requests": {
"storage": "10Gi"
}
}
}
},
{
"metadata": {
"name": "zookeeper-logs"
},
"spec": {
"accessModes": ["ReadWriteOnce"],
"resources": {
"requests": {
"storage": "5Gi"
}
}
}
}
]
}
}
return yaml.dump(statefulset, default_flow_style=False)
def _generate_kafka_statefulset(self, cluster_config: Dict[str, Any]) -> str:
"""生成Kafka StatefulSet"""
broker_count = len(cluster_config["broker_configs"])
statefulset = {
"apiVersion": "apps/v1",
"kind": "StatefulSet",
"metadata": {
"name": "kafka",
"namespace": self.namespace,
"labels": {
"app": "kafka",
"component": "kafka"
}
},
"spec": {
"serviceName": "kafka-headless",
"replicas": broker_count,
"selector": {
"matchLabels": {
"app": "kafka"
}
},
"template": {
"metadata": {
"labels": {
"app": "kafka",
"component": "kafka"
}
},
"spec": {
"containers": [
{
"name": "kafka",
"image": "confluentinc/cp-kafka:7.4.0",
"ports": [
{"containerPort": 9092, "name": "kafka"},
{"containerPort": 9999, "name": "jmx"}
],
"env": [
{
"name": "KAFKA_BROKER_ID",
"valueFrom": {
"fieldRef": {
"fieldPath": "metadata.name"
}
}
},
{
"name": "POD_NAME",
"valueFrom": {
"fieldRef": {
"fieldPath": "metadata.name"
}
}
},
{
"name": "POD_NAMESPACE",
"valueFrom": {
"fieldRef": {
"fieldPath": "metadata.namespace"
}
}
},
{"name": "KAFKA_ZOOKEEPER_CONNECT", "value": "zk-0.zookeeper-headless.kafka.svc.cluster.local:2181,zk-1.zookeeper-headless.kafka.svc.cluster.local:2181,zk-2.zookeeper-headless.kafka.svc.cluster.local:2181"},
{"name": "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "value": "PLAINTEXT:PLAINTEXT"},
{"name": "KAFKA_LISTENERS", "value": "PLAINTEXT://0.0.0.0:9092"},
{"name": "KAFKA_INTER_BROKER_LISTENER_NAME", "value": "PLAINTEXT"},
{"name": "KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "value": str(min(3, broker_count))},
{"name": "KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "value": str(min(3, broker_count))},
{"name": "KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "value": str(min(2, broker_count))},
{"name": "KAFKA_DEFAULT_REPLICATION_FACTOR", "value": str(min(3, broker_count))},
{"name": "KAFKA_MIN_INSYNC_REPLICAS", "value": str(min(2, broker_count))},
{"name": "KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "value": "3000"},
{"name": "KAFKA_JMX_PORT", "value": "9999"},
{"name": "KAFKA_JMX_HOSTNAME", "value": "localhost"}
],
"command": [
"bash",
"-c",
"export KAFKA_BROKER_ID=$((${HOSTNAME##*-}+1)) && export KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://${POD_NAME}.kafka-headless.${POD_NAMESPACE}.svc.cluster.local:9092 && /etc/confluent/docker/run"
],
"volumeMounts": [
{
"name": "kafka-data",
"mountPath": "/var/lib/kafka/data"
},
{
"name": "kafka-logs",
"mountPath": "/var/log/kafka"
}
],
"livenessProbe": {
"tcpSocket": {"port": 9092},
"initialDelaySeconds": 60,
"periodSeconds": 10
},
"readinessProbe": {
"tcpSocket": {"port": 9092},
"initialDelaySeconds": 30,
"periodSeconds": 5
},
"resources": {
"requests": {
"memory": "1Gi",
"cpu": "500m"
},
"limits": {
"memory": "2Gi",
"cpu": "1000m"
}
}
}
]
}
},
"volumeClaimTemplates": [
{
"metadata": {
"name": "kafka-data"
},
"spec": {
"accessModes": ["ReadWriteOnce"],
"resources": {
"requests": {
"storage": "50Gi"
}
}
}
},
{
"metadata": {
"name": "kafka-logs"
},
"spec": {
"accessModes": ["ReadWriteOnce"],
"resources": {
"requests": {
"storage": "10Gi"
}
}
}
}
]
}
}
return yaml.dump(statefulset, default_flow_style=False)
def _generate_services(self, cluster_config: Dict[str, Any]) -> str:
"""生成Services"""
services = [
# ZooKeeper Headless Service
{
"apiVersion": "v1",
"kind": "Service",
"metadata": {
"name": "zookeeper-headless",
"namespace": self.namespace,
"labels": {
"app": "zookeeper"
}
},
"spec": {
"clusterIP": "None",
"selector": {
"app": "zookeeper"
},
"ports": [
{"port": 2181, "name": "client"},
{"port": 2888, "name": "peer"},
{"port": 3888, "name": "leader"}
]
}
},
# ZooKeeper Client Service
{
"apiVersion": "v1",
"kind": "Service",
"metadata": {
"name": "zookeeper",
"namespace": self.namespace,
"labels": {
"app": "zookeeper"
}
},
"spec": {
"selector": {
"app": "zookeeper"
},
"ports": [
{"port": 2181, "name": "client"}
]
}
},
# Kafka Headless Service
{
"apiVersion": "v1",
"kind": "Service",
"metadata": {
"name": "kafka-headless",
"namespace": self.namespace,
"labels": {
"app": "kafka"
}
},
"spec": {
"clusterIP": "None",
"selector": {
"app": "kafka"
},
"ports": [
{"port": 9092, "name": "kafka"},
{"port": 9999, "name": "jmx"}
]
}
},
# Kafka Client Service
{
"apiVersion": "v1",
"kind": "Service",
"metadata": {
"name": "kafka",
"namespace": self.namespace,
"labels": {
"app": "kafka"
}
},
"spec": {
"selector": {
"app": "kafka"
},
"ports": [
{"port": 9092, "name": "kafka"}
]
}
}
]
return yaml.dump_all(services, default_flow_style=False)
def _generate_ingress(self) -> str:
"""生成Ingress"""
ingress = {
"apiVersion": "networking.k8s.io/v1",
"kind": "Ingress",
"metadata": {
"name": "kafka-ui-ingress",
"namespace": self.namespace,
"annotations": {
"nginx.ingress.kubernetes.io/rewrite-target": "/"
}
},
"spec": {
"rules": [
{
"host": "kafka-ui.local",
"http": {
"paths": [
{
"path": "/",
"pathType": "Prefix",
"backend": {
"service": {
"name": "kafka-ui",
"port": {
"number": 8080
}
}
}
}
]
}
}
]
}
}
return yaml.dump(ingress, default_flow_style=False)
def _generate_monitoring(self) -> str:
"""生成监控相关资源"""
# Kafka UI Deployment
kafka_ui = {
"apiVersion": "apps/v1",
"kind": "Deployment",
"metadata": {
"name": "kafka-ui",
"namespace": self.namespace,
"labels": {
"app": "kafka-ui"
}
},
"spec": {
"replicas": 1,
"selector": {
"matchLabels": {
"app": "kafka-ui"
}
},
"template": {
"metadata": {
"labels": {
"app": "kafka-ui"
}
},
"spec": {
"containers": [
{
"name": "kafka-ui",
"image": "provectuslabs/kafka-ui:latest",
"ports": [
{"containerPort": 8080}
],
"env": [
{"name": "KAFKA_CLUSTERS_0_NAME", "value": "local"},
{"name": "KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS", "value": "kafka:9092"},
{"name": "KAFKA_CLUSTERS_0_ZOOKEEPER", "value": "zookeeper:2181"}
],
"resources": {
"requests": {
"memory": "256Mi",
"cpu": "100m"
},
"limits": {
"memory": "512Mi",
"cpu": "200m"
}
}
}
]
}
}
}
}
# Kafka UI Service
kafka_ui_service = {
"apiVersion": "v1",
"kind": "Service",
"metadata": {
"name": "kafka-ui",
"namespace": self.namespace,
"labels": {
"app": "kafka-ui"
}
},
"spec": {
"selector": {
"app": "kafka-ui"
},
"ports": [
{"port": 8080, "targetPort": 8080}
],
"type": "ClusterIP"
}
}
return yaml.dump_all([kafka_ui, kafka_ui_service], default_flow_style=False)
# 使用示例
if __name__ == "__main__":
# 使用之前生成的集群配置
from pathlib import Path
import json
# 读取集群配置
config_file = Path("kafka_cluster_config.json")
if config_file.exists():
with open(config_file, 'r', encoding='utf-8') as f:
cluster_config = json.load(f)
print("=== 生成Kubernetes清单 ===")
k8s_gen = KubernetesManifestGenerator()
k8s_files = k8s_gen.generate_kafka_manifests(cluster_config)
print("生成的Kubernetes文件:")
for filename, filepath in k8s_files.items():
print(f" {filename}: {filepath}")
print("\n=== 部署说明 ===")
print("Kubernetes部署:")
print(" 1. 确保kubectl已配置并连接到集群")
print(" 2. 按顺序应用清单: kubectl apply -f k8s/")
print(" 3. 检查部署状态: kubectl get pods -n kafka")
print(" 4. 访问Kafka UI: kubectl port-forward svc/kafka-ui 8080:8080 -n kafka")
else:
print("请先运行集群规划生成kafka_cluster_config.json文件")
6.3 集群管理
6.3.1 集群状态监控
import time
import json
import subprocess
from typing import Dict, List, Any, Optional, Tuple
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
import threading
import logging
class ClusterStatus(Enum):
"""集群状态"""
HEALTHY = "healthy"
WARNING = "warning"
CRITICAL = "critical"
UNKNOWN = "unknown"
class NodeStatus(Enum):
"""节点状态"""
ONLINE = "online"
OFFLINE = "offline"
DEGRADED = "degraded"
UNKNOWN = "unknown"
@dataclass
class BrokerInfo:
"""Broker信息"""
broker_id: int
hostname: str
port: int
status: NodeStatus
leader_partitions: int = 0
follower_partitions: int = 0
under_replicated_partitions: int = 0
offline_partitions: int = 0
last_seen: Optional[datetime] = None
metrics: Dict[str, Any] = field(default_factory=dict)
@dataclass
class TopicInfo:
"""主题信息"""
name: str
partitions: int
replication_factor: int
under_replicated_partitions: int
offline_partitions: int
leader_imbalance: float
size_bytes: int = 0
message_rate: float = 0.0
byte_rate: float = 0.0
@dataclass
class ConsumerGroupInfo:
"""消费者组信息"""
group_id: str
state: str
members: int
lag: int
topics: List[str]
coordinator: Optional[int] = None
class KafkaClusterManager:
"""Kafka集群管理器"""
def __init__(self, bootstrap_servers: str, kafka_home: str = "/opt/kafka"):
self.bootstrap_servers = bootstrap_servers
self.kafka_home = kafka_home
self.logger = logging.getLogger(__name__)
self._monitoring_active = False
self._monitoring_thread = None
self._metrics_history = []
self._max_history_size = 1000
def get_cluster_status(self) -> Dict[str, Any]:
"""获取集群状态"""
try:
brokers = self.get_broker_list()
topics = self.get_topic_list()
consumer_groups = self.get_consumer_groups()
# 计算集群健康状态
cluster_status = self._calculate_cluster_status(brokers, topics)
return {
"status": cluster_status.value,
"timestamp": datetime.now().isoformat(),
"brokers": {
"total": len(brokers),
"online": len([b for b in brokers if b.status == NodeStatus.ONLINE]),
"offline": len([b for b in brokers if b.status == NodeStatus.OFFLINE]),
"details": [self._broker_to_dict(b) for b in brokers]
},
"topics": {
"total": len(topics),
"under_replicated": len([t for t in topics if t.under_replicated_partitions > 0]),
"offline": len([t for t in topics if t.offline_partitions > 0]),
"details": [self._topic_to_dict(t) for t in topics]
},
"consumer_groups": {
"total": len(consumer_groups),
"active": len([g for g in consumer_groups if g.state == "Stable"]),
"details": [self._consumer_group_to_dict(g) for g in consumer_groups]
}
}
except Exception as e:
self.logger.error(f"获取集群状态失败: {e}")
return {
"status": ClusterStatus.UNKNOWN.value,
"error": str(e),
"timestamp": datetime.now().isoformat()
}
def get_broker_list(self) -> List[BrokerInfo]:
"""获取Broker列表"""
try:
# 使用kafka-broker-api-versions.sh获取broker信息
cmd = [
f"{self.kafka_home}/bin/kafka-broker-api-versions.sh",
"--bootstrap-server", self.bootstrap_servers
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if result.returncode != 0:
raise Exception(f"获取broker信息失败: {result.stderr}")
brokers = []
for line in result.stdout.strip().split('\n'):
if line and not line.startswith('Picked up'):
# 解析broker信息
parts = line.split()
if len(parts) >= 2:
broker_info = parts[0].split(':')
if len(broker_info) >= 2:
hostname = broker_info[0]
port = int(broker_info[1])
broker_id = self._extract_broker_id(hostname, port)
broker = BrokerInfo(
broker_id=broker_id,
hostname=hostname,
port=port,
status=NodeStatus.ONLINE,
last_seen=datetime.now()
)
# 获取broker详细信息
self._enrich_broker_info(broker)
brokers.append(broker)
return brokers
except Exception as e:
self.logger.error(f"获取broker列表失败: {e}")
return []
def get_topic_list(self) -> List[TopicInfo]:
"""获取主题列表"""
try:
# 获取主题列表
cmd = [
f"{self.kafka_home}/bin/kafka-topics.sh",
"--bootstrap-server", self.bootstrap_servers,
"--list"
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if result.returncode != 0:
raise Exception(f"获取主题列表失败: {result.stderr}")
topics = []
for topic_name in result.stdout.strip().split('\n'):
if topic_name and not topic_name.startswith('Picked up'):
topic_info = self._get_topic_details(topic_name)
if topic_info:
topics.append(topic_info)
return topics
except Exception as e:
self.logger.error(f"获取主题列表失败: {e}")
return []
def get_consumer_groups(self) -> List[ConsumerGroupInfo]:
"""获取消费者组列表"""
try:
# 获取消费者组列表
cmd = [
f"{self.kafka_home}/bin/kafka-consumer-groups.sh",
"--bootstrap-server", self.bootstrap_servers,
"--list"
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if result.returncode != 0:
raise Exception(f"获取消费者组列表失败: {result.stderr}")
groups = []
for group_id in result.stdout.strip().split('\n'):
if group_id and not group_id.startswith('Picked up'):
group_info = self._get_consumer_group_details(group_id)
if group_info:
groups.append(group_info)
return groups
except Exception as e:
self.logger.error(f"获取消费者组列表失败: {e}")
return []
def _extract_broker_id(self, hostname: str, port: int) -> int:
"""从主机名和端口提取broker ID"""
# 简化实现:从主机名中提取数字
import re
match = re.search(r'(\d+)', hostname)
if match:
return int(match.group(1))
else:
# 如果无法从主机名提取,使用端口号
return port - 9092 + 1
def _enrich_broker_info(self, broker: BrokerInfo):
"""丰富broker信息"""
try:
# 获取broker的分区信息
cmd = [
f"{self.kafka_home}/bin/kafka-topics.sh",
"--bootstrap-server", self.bootstrap_servers,
"--describe"
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if result.returncode == 0:
leader_count = 0
follower_count = 0
under_replicated = 0
for line in result.stdout.split('\n'):
if 'Leader:' in line and f'Leader: {broker.broker_id}' in line:
leader_count += 1
elif f'Replicas: {broker.broker_id}' in line or f',{broker.broker_id},' in line:
follower_count += 1
elif 'UnderReplicated' in line and str(broker.broker_id) in line:
under_replicated += 1
broker.leader_partitions = leader_count
broker.follower_partitions = follower_count
broker.under_replicated_partitions = under_replicated
except Exception as e:
self.logger.warning(f"获取broker {broker.broker_id} 详细信息失败: {e}")
def _get_topic_details(self, topic_name: str) -> Optional[TopicInfo]:
"""获取主题详细信息"""
try:
cmd = [
f"{self.kafka_home}/bin/kafka-topics.sh",
"--bootstrap-server", self.bootstrap_servers,
"--describe",
"--topic", topic_name
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if result.returncode != 0:
return None
partitions = 0
replication_factor = 0
under_replicated = 0
offline = 0
for line in result.stdout.split('\n'):
if line.startswith('Topic:'):
parts = line.split()
for i, part in enumerate(parts):
if part == 'PartitionCount:':
partitions = int(parts[i + 1])
elif part == 'ReplicationFactor:':
replication_factor = int(parts[i + 1])
elif 'Leader: -1' in line:
offline += 1
elif 'Isr:' in line:
# 检查ISR是否小于副本数
isr_match = line.split('Isr: ')[1].split()[0]
isr_count = len(isr_match.split(','))
if isr_count < replication_factor:
under_replicated += 1
return TopicInfo(
name=topic_name,
partitions=partitions,
replication_factor=replication_factor,
under_replicated_partitions=under_replicated,
offline_partitions=offline,
leader_imbalance=0.0 # 需要额外计算
)
except Exception as e:
self.logger.warning(f"获取主题 {topic_name} 详细信息失败: {e}")
return None
def _get_consumer_group_details(self, group_id: str) -> Optional[ConsumerGroupInfo]:
"""获取消费者组详细信息"""
try:
cmd = [
f"{self.kafka_home}/bin/kafka-consumer-groups.sh",
"--bootstrap-server", self.bootstrap_servers,
"--describe",
"--group", group_id
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if result.returncode != 0:
return None
state = "Unknown"
members = 0
total_lag = 0
topics = set()
lines = result.stdout.strip().split('\n')
for line in lines:
if 'GROUP' in line and 'TOPIC' in line:
continue # 跳过标题行
elif line.strip():
parts = line.split()
if len(parts) >= 6:
topics.add(parts[1]) # TOPIC
try:
lag = int(parts[5]) if parts[5] != '-' else 0
total_lag += lag
except (ValueError, IndexError):
pass
# 获取组状态
state_cmd = [
f"{self.kafka_home}/bin/kafka-consumer-groups.sh",
"--bootstrap-server", self.bootstrap_servers,
"--describe",
"--group", group_id,
"--state"
]
state_result = subprocess.run(state_cmd, capture_output=True, text=True, timeout=30)
if state_result.returncode == 0:
for line in state_result.stdout.split('\n'):
if group_id in line:
parts = line.split()
if len(parts) >= 3:
state = parts[2]
try:
members = int(parts[3]) if len(parts) > 3 and parts[3] != '-' else 0
except (ValueError, IndexError):
pass
break
return ConsumerGroupInfo(
group_id=group_id,
state=state,
members=members,
lag=total_lag,
topics=list(topics)
)
except Exception as e:
self.logger.warning(f"获取消费者组 {group_id} 详细信息失败: {e}")
return None
def _calculate_cluster_status(self, brokers: List[BrokerInfo], topics: List[TopicInfo]) -> ClusterStatus:
"""计算集群健康状态"""
if not brokers:
return ClusterStatus.CRITICAL
offline_brokers = len([b for b in brokers if b.status == NodeStatus.OFFLINE])
offline_ratio = offline_brokers / len(brokers)
under_replicated_topics = len([t for t in topics if t.under_replicated_partitions > 0])
offline_topics = len([t for t in topics if t.offline_partitions > 0])
# 严重状态:超过50%的broker离线或有离线分区
if offline_ratio > 0.5 or offline_topics > 0:
return ClusterStatus.CRITICAL
# 警告状态:有broker离线或有副本不足的分区
if offline_ratio > 0 or under_replicated_topics > 0:
return ClusterStatus.WARNING
return ClusterStatus.HEALTHY
def _broker_to_dict(self, broker: BrokerInfo) -> Dict[str, Any]:
"""将Broker信息转换为字典"""
return {
"broker_id": broker.broker_id,
"hostname": broker.hostname,
"port": broker.port,
"status": broker.status.value,
"leader_partitions": broker.leader_partitions,
"follower_partitions": broker.follower_partitions,
"under_replicated_partitions": broker.under_replicated_partitions,
"offline_partitions": broker.offline_partitions,
"last_seen": broker.last_seen.isoformat() if broker.last_seen else None
}
def _topic_to_dict(self, topic: TopicInfo) -> Dict[str, Any]:
"""将主题信息转换为字典"""
return {
"name": topic.name,
"partitions": topic.partitions,
"replication_factor": topic.replication_factor,
"under_replicated_partitions": topic.under_replicated_partitions,
"offline_partitions": topic.offline_partitions,
"leader_imbalance": topic.leader_imbalance,
"size_bytes": topic.size_bytes,
"message_rate": topic.message_rate,
"byte_rate": topic.byte_rate
}
def _consumer_group_to_dict(self, group: ConsumerGroupInfo) -> Dict[str, Any]:
"""将消费者组信息转换为字典"""
return {
"group_id": group.group_id,
"state": group.state,
"members": group.members,
"lag": group.lag,
"topics": group.topics,
"coordinator": group.coordinator
}
def start_monitoring(self, interval_seconds: int = 60):
"""开始监控"""
if self._monitoring_active:
return
self._monitoring_active = True
self._monitoring_thread = threading.Thread(
target=self._monitoring_loop,
args=(interval_seconds,),
daemon=True
)
self._monitoring_thread.start()
self.logger.info(f"开始监控集群,间隔 {interval_seconds} 秒")
def stop_monitoring(self):
"""停止监控"""
self._monitoring_active = False
if self._monitoring_thread:
self._monitoring_thread.join(timeout=5)
self.logger.info("停止监控集群")
def _monitoring_loop(self, interval_seconds: int):
"""监控循环"""
while self._monitoring_active:
try:
status = self.get_cluster_status()
# 保存历史记录
self._metrics_history.append(status)
if len(self._metrics_history) > self._max_history_size:
self._metrics_history.pop(0)
# 检查告警条件
self._check_alerts(status)
time.sleep(interval_seconds)
except Exception as e:
self.logger.error(f"监控循环出错: {e}")
time.sleep(interval_seconds)
def _check_alerts(self, status: Dict[str, Any]):
"""检查告警条件"""
cluster_status = status.get("status")
if cluster_status == ClusterStatus.CRITICAL.value:
self.logger.critical(f"集群状态严重: {status}")
elif cluster_status == ClusterStatus.WARNING.value:
self.logger.warning(f"集群状态警告: {status}")
def get_metrics_history(self, hours: int = 24) -> List[Dict[str, Any]]:
"""获取指标历史"""
cutoff_time = datetime.now() - timedelta(hours=hours)
filtered_history = []
for metric in self._metrics_history:
try:
timestamp = datetime.fromisoformat(metric["timestamp"])
if timestamp >= cutoff_time:
filtered_history.append(metric)
except (KeyError, ValueError):
continue
return filtered_history
# 使用示例
if __name__ == "__main__":
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# 创建集群管理器
manager = KafkaClusterManager(
bootstrap_servers="localhost:9092",
kafka_home="/opt/kafka"
)
print("=== Kafka集群状态 ===")
status = manager.get_cluster_status()
print(json.dumps(status, indent=2, ensure_ascii=False))
# 开始监控(可选)
# manager.start_monitoring(interval_seconds=30)
# 等待一段时间后停止监控
# time.sleep(300) # 5分钟
# manager.stop_monitoring()
# 获取历史指标
# history = manager.get_metrics_history(hours=1)
# print(f"\n过去1小时的指标记录数: {len(history)}")
6.3.2 备份与恢复
import os
import json
import shutil
import tarfile
import subprocess
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
import logging
@dataclass
class BackupInfo:
"""备份信息"""
backup_id: str
timestamp: datetime
backup_type: str # full, incremental, metadata
size_bytes: int
topics: List[str]
partitions: int
file_path: str
metadata: Dict[str, Any]
class KafkaBackupManager:
"""Kafka备份管理器"""
def __init__(self, kafka_home: str, backup_dir: str, bootstrap_servers: str):
self.kafka_home = kafka_home
self.backup_dir = Path(backup_dir)
self.backup_dir.mkdir(parents=True, exist_ok=True)
self.bootstrap_servers = bootstrap_servers
self.logger = logging.getLogger(__name__)
def create_metadata_backup(self, backup_id: Optional[str] = None) -> BackupInfo:
"""创建元数据备份"""
if not backup_id:
backup_id = f"metadata_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
backup_path = self.backup_dir / f"{backup_id}.tar.gz"
temp_dir = self.backup_dir / f"temp_{backup_id}"
temp_dir.mkdir(exist_ok=True)
try:
# 备份主题配置
topics_file = temp_dir / "topics.json"
topics_data = self._export_topics()
with open(topics_file, 'w', encoding='utf-8') as f:
json.dump(topics_data, f, indent=2, ensure_ascii=False)
# 备份消费者组偏移量
offsets_file = temp_dir / "consumer_offsets.json"
offsets_data = self._export_consumer_offsets()
with open(offsets_file, 'w', encoding='utf-8') as f:
json.dump(offsets_data, f, indent=2, ensure_ascii=False)
# 备份ACL配置(如果启用)
acls_file = temp_dir / "acls.json"
acls_data = self._export_acls()
with open(acls_file, 'w', encoding='utf-8') as f:
json.dump(acls_data, f, indent=2, ensure_ascii=False)
# 创建备份元数据
metadata = {
"backup_id": backup_id,
"timestamp": datetime.now().isoformat(),
"backup_type": "metadata",
"kafka_version": self._get_kafka_version(),
"cluster_id": self._get_cluster_id(),
"topics_count": len(topics_data),
"consumer_groups_count": len(offsets_data)
}
metadata_file = temp_dir / "backup_metadata.json"
with open(metadata_file, 'w', encoding='utf-8') as f:
json.dump(metadata, f, indent=2, ensure_ascii=False)
# 创建压缩包
with tarfile.open(backup_path, 'w:gz') as tar:
tar.add(temp_dir, arcname=backup_id)
# 清理临时目录
shutil.rmtree(temp_dir)
backup_info = BackupInfo(
backup_id=backup_id,
timestamp=datetime.now(),
backup_type="metadata",
size_bytes=backup_path.stat().st_size,
topics=list(topics_data.keys()),
partitions=sum(t["partitions"] for t in topics_data.values()),
file_path=str(backup_path),
metadata=metadata
)
self.logger.info(f"元数据备份完成: {backup_id}")
return backup_info
except Exception as e:
# 清理失败的备份
if temp_dir.exists():
shutil.rmtree(temp_dir)
if backup_path.exists():
backup_path.unlink()
self.logger.error(f"元数据备份失败: {e}")
raise
def create_data_backup(self, topics: List[str], backup_id: Optional[str] = None) -> BackupInfo:
"""创建数据备份"""
if not backup_id:
backup_id = f"data_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
backup_path = self.backup_dir / f"{backup_id}.tar.gz"
temp_dir = self.backup_dir / f"temp_{backup_id}"
temp_dir.mkdir(exist_ok=True)
try:
total_size = 0
total_partitions = 0
for topic in topics:
topic_dir = temp_dir / topic
topic_dir.mkdir(exist_ok=True)
# 导出主题数据
topic_size, topic_partitions = self._export_topic_data(topic, topic_dir)
total_size += topic_size
total_partitions += topic_partitions
# 创建备份元数据
metadata = {
"backup_id": backup_id,
"timestamp": datetime.now().isoformat(),
"backup_type": "data",
"topics": topics,
"total_partitions": total_partitions,
"data_size_bytes": total_size
}
metadata_file = temp_dir / "backup_metadata.json"
with open(metadata_file, 'w', encoding='utf-8') as f:
json.dump(metadata, f, indent=2, ensure_ascii=False)
# 创建压缩包
with tarfile.open(backup_path, 'w:gz') as tar:
tar.add(temp_dir, arcname=backup_id)
# 清理临时目录
shutil.rmtree(temp_dir)
backup_info = BackupInfo(
backup_id=backup_id,
timestamp=datetime.now(),
backup_type="data",
size_bytes=backup_path.stat().st_size,
topics=topics,
partitions=total_partitions,
file_path=str(backup_path),
metadata=metadata
)
self.logger.info(f"数据备份完成: {backup_id}")
return backup_info
except Exception as e:
# 清理失败的备份
if temp_dir.exists():
shutil.rmtree(temp_dir)
if backup_path.exists():
backup_path.unlink()
self.logger.error(f"数据备份失败: {e}")
raise
def restore_metadata(self, backup_id: str, force: bool = False) -> bool:
"""恢复元数据"""
backup_path = self.backup_dir / f"{backup_id}.tar.gz"
if not backup_path.exists():
raise FileNotFoundError(f"备份文件不存在: {backup_path}")
temp_dir = self.backup_dir / f"restore_{backup_id}"
temp_dir.mkdir(exist_ok=True)
try:
# 解压备份文件
with tarfile.open(backup_path, 'r:gz') as tar:
tar.extractall(temp_dir)
backup_content_dir = temp_dir / backup_id
# 读取备份元数据
metadata_file = backup_content_dir / "backup_metadata.json"
with open(metadata_file, 'r', encoding='utf-8') as f:
metadata = json.load(f)
if not force:
# 检查兼容性
current_version = self._get_kafka_version()
backup_version = metadata.get("kafka_version")
if backup_version and backup_version != current_version:
self.logger.warning(f"版本不匹配: 当前={current_version}, 备份={backup_version}")
# 恢复主题配置
topics_file = backup_content_dir / "topics.json"
if topics_file.exists():
with open(topics_file, 'r', encoding='utf-8') as f:
topics_data = json.load(f)
self._restore_topics(topics_data, force)
# 恢复消费者组偏移量
offsets_file = backup_content_dir / "consumer_offsets.json"
if offsets_file.exists():
with open(offsets_file, 'r', encoding='utf-8') as f:
offsets_data = json.load(f)
self._restore_consumer_offsets(offsets_data)
# 恢复ACL配置
acls_file = backup_content_dir / "acls.json"
if acls_file.exists():
with open(acls_file, 'r', encoding='utf-8') as f:
acls_data = json.load(f)
self._restore_acls(acls_data)
# 清理临时目录
shutil.rmtree(temp_dir)
self.logger.info(f"元数据恢复完成: {backup_id}")
return True
except Exception as e:
# 清理临时目录
if temp_dir.exists():
shutil.rmtree(temp_dir)
self.logger.error(f"元数据恢复失败: {e}")
raise
def list_backups(self) -> List[BackupInfo]:
"""列出所有备份"""
backups = []
for backup_file in self.backup_dir.glob("*.tar.gz"):
try:
backup_info = self._get_backup_info(backup_file)
if backup_info:
backups.append(backup_info)
except Exception as e:
self.logger.warning(f"读取备份信息失败 {backup_file}: {e}")
# 按时间排序
backups.sort(key=lambda x: x.timestamp, reverse=True)
return backups
def delete_backup(self, backup_id: str) -> bool:
"""删除备份"""
backup_path = self.backup_dir / f"{backup_id}.tar.gz"
if backup_path.exists():
backup_path.unlink()
self.logger.info(f"备份已删除: {backup_id}")
return True
else:
self.logger.warning(f"备份文件不存在: {backup_id}")
return False
def _export_topics(self) -> Dict[str, Any]:
"""导出主题配置"""
cmd = [
f"{self.kafka_home}/bin/kafka-topics.sh",
"--bootstrap-server", self.bootstrap_servers,
"--list"
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if result.returncode != 0:
raise Exception(f"获取主题列表失败: {result.stderr}")
topics_data = {}
for topic_name in result.stdout.strip().split('\n'):
if topic_name and not topic_name.startswith('Picked up'):
topic_config = self._get_topic_config(topic_name)
if topic_config:
topics_data[topic_name] = topic_config
return topics_data
def _get_topic_config(self, topic_name: str) -> Optional[Dict[str, Any]]:
"""获取主题配置"""
try:
# 获取主题描述
cmd = [
f"{self.kafka_home}/bin/kafka-topics.sh",
"--bootstrap-server", self.bootstrap_servers,
"--describe",
"--topic", topic_name
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if result.returncode != 0:
return None
config = {"partitions": 0, "replication_factor": 0, "configs": {}}
for line in result.stdout.split('\n'):
if line.startswith('Topic:'):
parts = line.split()
for i, part in enumerate(parts):
if part == 'PartitionCount:':
config["partitions"] = int(parts[i + 1])
elif part == 'ReplicationFactor:':
config["replication_factor"] = int(parts[i + 1])
# 获取主题配置
config_cmd = [
f"{self.kafka_home}/bin/kafka-configs.sh",
"--bootstrap-server", self.bootstrap_servers,
"--describe",
"--entity-type", "topics",
"--entity-name", topic_name
]
config_result = subprocess.run(config_cmd, capture_output=True, text=True, timeout=30)
if config_result.returncode == 0:
for line in config_result.stdout.split('\n'):
if 'configs=' in line:
configs_str = line.split('configs=')[1]
if configs_str.strip():
for config_pair in configs_str.split(','):
if '=' in config_pair:
key, value = config_pair.split('=', 1)
config["configs"][key.strip()] = value.strip()
return config
except Exception as e:
self.logger.warning(f"获取主题 {topic_name} 配置失败: {e}")
return None
def _export_consumer_offsets(self) -> Dict[str, Any]:
"""导出消费者组偏移量"""
# 获取消费者组列表
cmd = [
f"{self.kafka_home}/bin/kafka-consumer-groups.sh",
"--bootstrap-server", self.bootstrap_servers,
"--list"
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if result.returncode != 0:
return {}
offsets_data = {}
for group_id in result.stdout.strip().split('\n'):
if group_id and not group_id.startswith('Picked up'):
group_offsets = self._get_group_offsets(group_id)
if group_offsets:
offsets_data[group_id] = group_offsets
return offsets_data
def _get_group_offsets(self, group_id: str) -> Optional[Dict[str, Any]]:
"""获取消费者组偏移量"""
try:
cmd = [
f"{self.kafka_home}/bin/kafka-consumer-groups.sh",
"--bootstrap-server", self.bootstrap_servers,
"--describe",
"--group", group_id
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if result.returncode != 0:
return None
offsets = {}
for line in result.stdout.split('\n'):
if line.strip() and 'GROUP' not in line and 'TOPIC' not in line:
parts = line.split()
if len(parts) >= 6:
topic = parts[1]
partition = int(parts[2])
offset = int(parts[3]) if parts[3] != '-' else 0
if topic not in offsets:
offsets[topic] = {}
offsets[topic][partition] = offset
return offsets
except Exception as e:
self.logger.warning(f"获取消费者组 {group_id} 偏移量失败: {e}")
return None
def _export_acls(self) -> Dict[str, Any]:
"""导出ACL配置"""
try:
cmd = [
f"{self.kafka_home}/bin/kafka-acls.sh",
"--bootstrap-server", self.bootstrap_servers,
"--list"
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if result.returncode != 0:
return {}
# 简化实现:返回ACL列表的文本形式
return {"acls": result.stdout}
except Exception:
# ACL可能未启用
return {}
def _export_topic_data(self, topic: str, output_dir: Path) -> tuple[int, int]:
"""导出主题数据"""
# 使用kafka-console-consumer导出数据
output_file = output_dir / f"{topic}.json"
cmd = [
f"{self.kafka_home}/bin/kafka-console-consumer.sh",
"--bootstrap-server", self.bootstrap_servers,
"--topic", topic,
"--from-beginning",
"--timeout-ms", "10000"
]
with open(output_file, 'w', encoding='utf-8') as f:
result = subprocess.run(cmd, stdout=f, stderr=subprocess.PIPE, text=True)
# 获取文件大小和分区数
size_bytes = output_file.stat().st_size
partitions = self._get_topic_partition_count(topic)
return size_bytes, partitions
def _get_topic_partition_count(self, topic: str) -> int:
"""获取主题分区数"""
try:
cmd = [
f"{self.kafka_home}/bin/kafka-topics.sh",
"--bootstrap-server", self.bootstrap_servers,
"--describe",
"--topic", topic
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if result.returncode != 0:
return 0
for line in result.stdout.split('\n'):
if line.startswith('Topic:'):
parts = line.split()
for i, part in enumerate(parts):
if part == 'PartitionCount:':
return int(parts[i + 1])
return 0
except Exception:
return 0
def _restore_topics(self, topics_data: Dict[str, Any], force: bool = False):
"""恢复主题配置"""
for topic_name, config in topics_data.items():
try:
# 检查主题是否已存在
if not force and self._topic_exists(topic_name):
self.logger.info(f"主题已存在,跳过: {topic_name}")
continue
# 创建主题
cmd = [
f"{self.kafka_home}/bin/kafka-topics.sh",
"--bootstrap-server", self.bootstrap_servers,
"--create",
"--topic", topic_name,
"--partitions", str(config["partitions"]),
"--replication-factor", str(config["replication_factor"])
]
# 添加配置参数
if config.get("configs"):
config_str = ",".join([f"{k}={v}" for k, v in config["configs"].items()])
cmd.extend(["--config", config_str])
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if result.returncode == 0:
self.logger.info(f"主题创建成功: {topic_name}")
else:
self.logger.error(f"主题创建失败 {topic_name}: {result.stderr}")
except Exception as e:
self.logger.error(f"恢复主题 {topic_name} 失败: {e}")
def _restore_consumer_offsets(self, offsets_data: Dict[str, Any]):
"""恢复消费者组偏移量"""
for group_id, group_offsets in offsets_data.items():
try:
for topic, topic_offsets in group_offsets.items():
for partition, offset in topic_offsets.items():
# 重置偏移量
cmd = [
f"{self.kafka_home}/bin/kafka-consumer-groups.sh",
"--bootstrap-server", self.bootstrap_servers,
"--group", group_id,
"--topic", f"{topic}:{partition}",
"--reset-offsets",
"--to-offset", str(offset),
"--execute"
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if result.returncode != 0:
self.logger.warning(f"重置偏移量失败 {group_id}/{topic}:{partition}: {result.stderr}")
self.logger.info(f"消费者组偏移量恢复完成: {group_id}")
except Exception as e:
self.logger.error(f"恢复消费者组 {group_id} 偏移量失败: {e}")
def _restore_acls(self, acls_data: Dict[str, Any]):
"""恢复ACL配置"""
# 简化实现:记录ACL信息
if acls_data.get("acls"):
self.logger.info("ACL配置需要手动恢复")
self.logger.info(acls_data["acls"])
def _topic_exists(self, topic_name: str) -> bool:
"""检查主题是否存在"""
try:
cmd = [
f"{self.kafka_home}/bin/kafka-topics.sh",
"--bootstrap-server", self.bootstrap_servers,
"--list"
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if result.returncode == 0:
topics = result.stdout.strip().split('\n')
return topic_name in topics
return False
except Exception:
return False
def _get_kafka_version(self) -> str:
"""获取Kafka版本"""
try:
# 从kafka-topics.sh获取版本信息
cmd = [f"{self.kafka_home}/bin/kafka-topics.sh", "--version"]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
if result.returncode == 0:
# 解析版本信息
for line in result.stdout.split('\n'):
if 'kafka' in line.lower():
return line.strip()
return "unknown"
except Exception:
return "unknown"
def _get_cluster_id(self) -> str:
"""获取集群ID"""
try:
cmd = [
f"{self.kafka_home}/bin/kafka-metadata-shell.sh",
"--snapshot", "/var/kafka-logs/__cluster_metadata-0/00000000000000000000.log"
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
# 简化实现:返回固定值
return "kafka-cluster"
except Exception:
return "unknown"
def _get_backup_info(self, backup_file: Path) -> Optional[BackupInfo]:
"""获取备份信息"""
try:
temp_dir = self.backup_dir / f"info_{backup_file.stem}"
temp_dir.mkdir(exist_ok=True)
# 解压备份文件
with tarfile.open(backup_file, 'r:gz') as tar:
tar.extractall(temp_dir)
# 查找元数据文件
metadata_file = None
for root, dirs, files in os.walk(temp_dir):
if "backup_metadata.json" in files:
metadata_file = Path(root) / "backup_metadata.json"
break
if not metadata_file:
return None
# 读取元数据
with open(metadata_file, 'r', encoding='utf-8') as f:
metadata = json.load(f)
# 清理临时目录
shutil.rmtree(temp_dir)
return BackupInfo(
backup_id=metadata["backup_id"],
timestamp=datetime.fromisoformat(metadata["timestamp"]),
backup_type=metadata["backup_type"],
size_bytes=backup_file.stat().st_size,
topics=metadata.get("topics", []),
partitions=metadata.get("total_partitions", 0),
file_path=str(backup_file),
metadata=metadata
)
except Exception as e:
self.logger.warning(f"读取备份信息失败 {backup_file}: {e}")
return None
# 使用示例
if __name__ == "__main__":
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# 创建备份管理器
backup_manager = KafkaBackupManager(
kafka_home="/opt/kafka",
backup_dir="/opt/kafka/backups",
bootstrap_servers="localhost:9092"
)
print("=== Kafka备份管理 ===")
# 创建元数据备份
print("\n1. 创建元数据备份")
metadata_backup = backup_manager.create_metadata_backup()
print(f"备份ID: {metadata_backup.backup_id}")
print(f"备份大小: {metadata_backup.size_bytes} bytes")
# 创建数据备份
print("\n2. 创建数据备份")
topics_to_backup = ["test-topic", "important-topic"]
data_backup = backup_manager.create_data_backup(topics_to_backup)
print(f"备份ID: {data_backup.backup_id}")
print(f"备份主题: {data_backup.topics}")
# 列出所有备份
print("\n3. 列出所有备份")
backups = backup_manager.list_backups()
for backup in backups:
print(f" {backup.backup_id} - {backup.backup_type} - {backup.timestamp}")
# 恢复元数据(示例)
# print("\n4. 恢复元数据")
# backup_manager.restore_metadata(metadata_backup.backup_id)
6.3.3 集群扩容与缩容
import json
import time
import subprocess
from typing import Dict, List, Any, Optional, Tuple
from dataclasses import dataclass
from datetime import datetime
import logging
@dataclass
class ScalingPlan:
"""扩缩容计划"""
operation: str # scale_out, scale_in
current_brokers: List[int]
target_brokers: List[int]
affected_topics: List[str]
estimated_duration: int # 预估时间(分钟)
reassignment_plan: Dict[str, Any]
@dataclass
class ReassignmentProgress:
"""重分配进度"""
topic: str
partition: int
current_replicas: List[int]
target_replicas: List[int]
status: str # pending, in_progress, completed, failed
progress_percent: float
class KafkaClusterScaler:
"""Kafka集群扩缩容管理器"""
def __init__(self, kafka_home: str, bootstrap_servers: str):
self.kafka_home = kafka_home
self.bootstrap_servers = bootstrap_servers
self.logger = logging.getLogger(__name__)
def plan_scale_out(self, new_broker_ids: List[int],
rebalance_topics: Optional[List[str]] = None) -> ScalingPlan:
"""规划扩容"""
try:
# 获取当前broker列表
current_brokers = self._get_current_brokers()
target_brokers = current_brokers + new_broker_ids
# 获取需要重平衡的主题
if rebalance_topics is None:
rebalance_topics = self._get_all_topics()
# 生成重分配计划
reassignment_plan = self._generate_reassignment_plan(
rebalance_topics, target_brokers
)
# 估算执行时间
estimated_duration = self._estimate_reassignment_duration(reassignment_plan)
return ScalingPlan(
operation="scale_out",
current_brokers=current_brokers,
target_brokers=target_brokers,
affected_topics=rebalance_topics,
estimated_duration=estimated_duration,
reassignment_plan=reassignment_plan
)
except Exception as e:
self.logger.error(f"扩容规划失败: {e}")
raise
def plan_scale_in(self, remove_broker_ids: List[int]) -> ScalingPlan:
"""规划缩容"""
try:
# 获取当前broker列表
current_brokers = self._get_current_brokers()
# 检查要移除的broker是否存在
for broker_id in remove_broker_ids:
if broker_id not in current_brokers:
raise ValueError(f"Broker {broker_id} 不存在")
target_brokers = [b for b in current_brokers if b not in remove_broker_ids]
if len(target_brokers) == 0:
raise ValueError("不能移除所有broker")
# 获取受影响的主题
affected_topics = self._get_topics_on_brokers(remove_broker_ids)
# 生成重分配计划
reassignment_plan = self._generate_reassignment_plan(
affected_topics, target_brokers
)
# 估算执行时间
estimated_duration = self._estimate_reassignment_duration(reassignment_plan)
return ScalingPlan(
operation="scale_in",
current_brokers=current_brokers,
target_brokers=target_brokers,
affected_topics=affected_topics,
estimated_duration=estimated_duration,
reassignment_plan=reassignment_plan
)
except Exception as e:
self.logger.error(f"缩容规划失败: {e}")
raise
def execute_scaling(self, plan: ScalingPlan, dry_run: bool = False) -> bool:
"""执行扩缩容"""
try:
self.logger.info(f"开始执行{plan.operation}计划")
if dry_run:
self.logger.info("干运行模式,不会实际执行")
return True
# 生成重分配JSON文件
reassignment_file = "/tmp/kafka-reassignment.json"
with open(reassignment_file, 'w') as f:
json.dump(plan.reassignment_plan, f, indent=2)
# 执行重分配
success = self._execute_reassignment(reassignment_file)
if success:
self.logger.info(f"{plan.operation}执行成功")
# 如果是缩容,等待重分配完成后再移除broker
if plan.operation == "scale_in":
self._wait_for_reassignment_completion()
self._decommission_brokers(plan.current_brokers, plan.target_brokers)
return success
except Exception as e:
self.logger.error(f"执行扩缩容失败: {e}")
return False
def monitor_reassignment_progress(self) -> List[ReassignmentProgress]:
"""监控重分配进度"""
try:
cmd = [
f"{self.kafka_home}/bin/kafka-reassign-partitions.sh",
"--bootstrap-server", self.bootstrap_servers,
"--reassignment-json-file", "/tmp/kafka-reassignment.json",
"--verify"
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=60)
if result.returncode != 0:
self.logger.error(f"检查重分配进度失败: {result.stderr}")
return []
progress_list = []
for line in result.stdout.split('\n'):
if 'Status of partition reassignment:' in line:
continue
elif line.strip() and 'Reassignment of partition' in line:
# 解析进度信息
progress = self._parse_reassignment_line(line)
if progress:
progress_list.append(progress)
return progress_list
except Exception as e:
self.logger.error(f"监控重分配进度失败: {e}")
return []
def _get_current_brokers(self) -> List[int]:
"""获取当前broker列表"""
try:
cmd = [
f"{self.kafka_home}/bin/kafka-broker-api-versions.sh",
"--bootstrap-server", self.bootstrap_servers
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if result.returncode != 0:
raise Exception(f"获取broker列表失败: {result.stderr}")
brokers = []
for line in result.stdout.strip().split('\n'):
if line and not line.startswith('Picked up'):
parts = line.split()
if len(parts) >= 2:
broker_info = parts[0].split(':')
if len(broker_info) >= 2:
# 从主机名提取broker ID
hostname = broker_info[0]
port = int(broker_info[1])
broker_id = self._extract_broker_id(hostname, port)
brokers.append(broker_id)
return sorted(brokers)
except Exception as e:
self.logger.error(f"获取broker列表失败: {e}")
return []
def _get_all_topics(self) -> List[str]:
"""获取所有主题"""
try:
cmd = [
f"{self.kafka_home}/bin/kafka-topics.sh",
"--bootstrap-server", self.bootstrap_servers,
"--list"
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if result.returncode != 0:
raise Exception(f"获取主题列表失败: {result.stderr}")
topics = []
for topic in result.stdout.strip().split('\n'):
if topic and not topic.startswith('Picked up'):
topics.append(topic)
return topics
except Exception as e:
self.logger.error(f"获取主题列表失败: {e}")
return []
def _get_topics_on_brokers(self, broker_ids: List[int]) -> List[str]:
"""获取指定broker上的主题"""
try:
affected_topics = set()
# 获取所有主题的分区分布
cmd = [
f"{self.kafka_home}/bin/kafka-topics.sh",
"--bootstrap-server", self.bootstrap_servers,
"--describe"
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=60)
if result.returncode != 0:
raise Exception(f"获取主题描述失败: {result.stderr}")
current_topic = None
for line in result.stdout.split('\n'):
if line.startswith('Topic:'):
parts = line.split()
for i, part in enumerate(parts):
if part == 'Topic:':
current_topic = parts[i + 1]
break
elif line.strip() and current_topic and 'Replicas:' in line:
# 检查副本是否在要移除的broker上
replicas_str = line.split('Replicas: ')[1].split()[0]
replicas = [int(r) for r in replicas_str.split(',')]
for broker_id in broker_ids:
if broker_id in replicas:
affected_topics.add(current_topic)
break
return list(affected_topics)
except Exception as e:
self.logger.error(f"获取broker上的主题失败: {e}")
return []
def _generate_reassignment_plan(self, topics: List[str],
target_brokers: List[int]) -> Dict[str, Any]:
"""生成重分配计划"""
try:
# 创建主题列表文件
topics_file = "/tmp/kafka-topics-to-move.json"
topics_data = {"topics": [{"topic": topic} for topic in topics], "version": 1}
with open(topics_file, 'w') as f:
json.dump(topics_data, f, indent=2)
# 生成重分配计划
cmd = [
f"{self.kafka_home}/bin/kafka-reassign-partitions.sh",
"--bootstrap-server", self.bootstrap_servers,
"--topics-to-move-json-file", topics_file,
"--broker-list", ",".join(map(str, target_brokers)),
"--generate"
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=60)
if result.returncode != 0:
raise Exception(f"生成重分配计划失败: {result.stderr}")
# 解析输出,提取重分配JSON
lines = result.stdout.split('\n')
json_start = False
json_lines = []
for line in lines:
if 'Proposed partition reassignment configuration' in line:
json_start = True
continue
elif json_start and line.strip():
if line.startswith('{'):
json_lines.append(line)
elif json_lines:
json_lines.append(line)
if line.strip().endswith('}'):
break
if json_lines:
json_str = '\n'.join(json_lines)
return json.loads(json_str)
else:
raise Exception("无法解析重分配计划")
except Exception as e:
self.logger.error(f"生成重分配计划失败: {e}")
raise
def _estimate_reassignment_duration(self, reassignment_plan: Dict[str, Any]) -> int:
"""估算重分配持续时间(分钟)"""
try:
partitions = reassignment_plan.get("partitions", [])
partition_count = len(partitions)
# 简化估算:每个分区平均2分钟
base_time = partition_count * 2
# 考虑并发度(假设最多10个分区并发)
concurrent_partitions = min(10, partition_count)
if concurrent_partitions > 0:
estimated_time = max(base_time // concurrent_partitions, partition_count // 5)
else:
estimated_time = 0
return max(estimated_time, 5) # 最少5分钟
except Exception:
return 30 # 默认30分钟
def _execute_reassignment(self, reassignment_file: str) -> bool:
"""执行重分配"""
try:
cmd = [
f"{self.kafka_home}/bin/kafka-reassign-partitions.sh",
"--bootstrap-server", self.bootstrap_servers,
"--reassignment-json-file", reassignment_file,
"--execute"
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=120)
if result.returncode == 0:
self.logger.info("重分配任务已启动")
return True
else:
self.logger.error(f"启动重分配失败: {result.stderr}")
return False
except Exception as e:
self.logger.error(f"执行重分配失败: {e}")
return False
def _wait_for_reassignment_completion(self, timeout_minutes: int = 60):
"""等待重分配完成"""
start_time = time.time()
timeout_seconds = timeout_minutes * 60
while time.time() - start_time < timeout_seconds:
progress = self.monitor_reassignment_progress()
if not progress:
self.logger.info("重分配已完成")
return
completed = sum(1 for p in progress if p.status == "completed")
total = len(progress)
self.logger.info(f"重分配进度: {completed}/{total} ({completed/total*100:.1f}%)")
if completed == total:
self.logger.info("重分配已完成")
return
time.sleep(30) # 每30秒检查一次
self.logger.warning(f"重分配超时({timeout_minutes}分钟)")
def _decommission_brokers(self, current_brokers: List[int], target_brokers: List[int]):
"""下线broker"""
brokers_to_remove = [b for b in current_brokers if b not in target_brokers]
for broker_id in brokers_to_remove:
self.logger.info(f"准备下线broker {broker_id}")
# 这里应该执行实际的broker下线操作
# 例如:停止broker进程、从集群中移除等
# 具体实现取决于部署方式(systemd、docker、k8s等)
def _extract_broker_id(self, hostname: str, port: int) -> int:
"""从主机名和端口提取broker ID"""
import re
match = re.search(r'(\d+)', hostname)
if match:
return int(match.group(1))
else:
return port - 9092 + 1
def _parse_reassignment_line(self, line: str) -> Optional[ReassignmentProgress]:
"""解析重分配进度行"""
try:
# 简化解析实现
if 'completed successfully' in line:
status = "completed"
progress = 100.0
elif 'in progress' in line:
status = "in_progress"
progress = 50.0 # 简化进度
elif 'failed' in line:
status = "failed"
progress = 0.0
else:
status = "pending"
progress = 0.0
# 提取主题和分区信息(简化实现)
parts = line.split()
topic = "unknown"
partition = 0
for i, part in enumerate(parts):
if part.endswith(':'):
topic_partition = part[:-1]
if '-' in topic_partition:
topic, partition_str = topic_partition.rsplit('-', 1)
try:
partition = int(partition_str)
except ValueError:
pass
break
return ReassignmentProgress(
topic=topic,
partition=partition,
current_replicas=[],
target_replicas=[],
status=status,
progress_percent=progress
)
except Exception:
return None
# 使用示例
if __name__ == "__main__":
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# 创建扩缩容管理器
scaler = KafkaClusterScaler(
kafka_home="/opt/kafka",
bootstrap_servers="localhost:9092"
)
print("=== Kafka集群扩缩容 ===")
# 扩容示例
print("\n1. 规划扩容")
scale_out_plan = scaler.plan_scale_out([4, 5]) # 添加broker 4和5
print(f"当前broker: {scale_out_plan.current_brokers}")
print(f"目标broker: {scale_out_plan.target_brokers}")
print(f"受影响主题: {len(scale_out_plan.affected_topics)}")
print(f"预估时间: {scale_out_plan.estimated_duration}分钟")
# 执行扩容(干运行)
print("\n2. 执行扩容(干运行)")
success = scaler.execute_scaling(scale_out_plan, dry_run=True)
print(f"扩容计划验证: {'成功' if success else '失败'}")
# 缩容示例
print("\n3. 规划缩容")
try:
scale_in_plan = scaler.plan_scale_in([3]) # 移除broker 3
print(f"当前broker: {scale_in_plan.current_brokers}")
print(f"目标broker: {scale_in_plan.target_brokers}")
print(f"受影响主题: {len(scale_in_plan.affected_topics)}")
print(f"预估时间: {scale_in_plan.estimated_duration}分钟")
except Exception as e:
print(f"缩容规划失败: {e}")
# 监控重分配进度
print("\n4. 监控重分配进度")
progress = scaler.monitor_reassignment_progress()
if progress:
for p in progress:
print(f" {p.topic}-{p.partition}: {p.status} ({p.progress_percent:.1f}%)")
else:
print(" 当前没有进行中的重分配任务")
6.4 安全配置
6.4.1 SSL/TLS配置
import os
import ssl
import subprocess
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from pathlib import Path
import logging
@dataclass
class SSLConfig:
"""SSL配置"""
keystore_path: str
keystore_password: str
truststore_path: str
truststore_password: str
key_password: str
protocol: str = "TLSv1.2"
enabled_protocols: List[str] = None
cipher_suites: List[str] = None
class KafkaSSLManager:
"""Kafka SSL管理器"""
def __init__(self, kafka_home: str, ssl_dir: str):
self.kafka_home = kafka_home
self.ssl_dir = Path(ssl_dir)
self.ssl_dir.mkdir(parents=True, exist_ok=True)
self.logger = logging.getLogger(__name__)
def generate_ca_certificate(self, ca_name: str = "kafka-ca",
validity_days: int = 365) -> Dict[str, str]:
"""生成CA证书"""
try:
ca_key_path = self.ssl_dir / f"{ca_name}-key.pem"
ca_cert_path = self.ssl_dir / f"{ca_name}-cert.pem"
# 生成CA私钥
cmd_key = [
"openssl", "genrsa",
"-out", str(ca_key_path),
"4096"
]
result = subprocess.run(cmd_key, capture_output=True, text=True, timeout=60)
if result.returncode != 0:
raise Exception(f"生成CA私钥失败: {result.stderr}")
# 生成CA证书
cmd_cert = [
"openssl", "req", "-new", "-x509",
"-key", str(ca_key_path),
"-out", str(ca_cert_path),
"-days", str(validity_days),
"-subj", f"/CN={ca_name}/O=Kafka/C=US"
]
result = subprocess.run(cmd_cert, capture_output=True, text=True, timeout=60)
if result.returncode != 0:
raise Exception(f"生成CA证书失败: {result.stderr}")
self.logger.info(f"CA证书生成成功: {ca_cert_path}")
return {
"ca_key": str(ca_key_path),
"ca_cert": str(ca_cert_path)
}
except Exception as e:
self.logger.error(f"生成CA证书失败: {e}")
raise
def generate_server_certificate(self, hostname: str, ca_cert: str, ca_key: str,
validity_days: int = 365) -> Dict[str, str]:
"""生成服务器证书"""
try:
server_key_path = self.ssl_dir / f"{hostname}-key.pem"
server_csr_path = self.ssl_dir / f"{hostname}-csr.pem"
server_cert_path = self.ssl_dir / f"{hostname}-cert.pem"
# 生成服务器私钥
cmd_key = [
"openssl", "genrsa",
"-out", str(server_key_path),
"2048"
]
result = subprocess.run(cmd_key, capture_output=True, text=True, timeout=60)
if result.returncode != 0:
raise Exception(f"生成服务器私钥失败: {result.stderr}")
# 生成证书签名请求
cmd_csr = [
"openssl", "req", "-new",
"-key", str(server_key_path),
"-out", str(server_csr_path),
"-subj", f"/CN={hostname}/O=Kafka/C=US"
]
result = subprocess.run(cmd_csr, capture_output=True, text=True, timeout=60)
if result.returncode != 0:
raise Exception(f"生成CSR失败: {result.stderr}")
# 使用CA签名生成服务器证书
cmd_sign = [
"openssl", "x509", "-req",
"-in", str(server_csr_path),
"-CA", ca_cert,
"-CAkey", ca_key,
"-CAcreateserial",
"-out", str(server_cert_path),
"-days", str(validity_days),
"-extensions", "v3_req"
]
result = subprocess.run(cmd_sign, capture_output=True, text=True, timeout=60)
if result.returncode != 0:
raise Exception(f"签名服务器证书失败: {result.stderr}")
# 清理CSR文件
server_csr_path.unlink()
self.logger.info(f"服务器证书生成成功: {server_cert_path}")
return {
"server_key": str(server_key_path),
"server_cert": str(server_cert_path)
}
except Exception as e:
self.logger.error(f"生成服务器证书失败: {e}")
raise
def create_keystore(self, hostname: str, server_key: str, server_cert: str,
ca_cert: str, keystore_password: str) -> str:
"""创建Java Keystore"""
try:
keystore_path = self.ssl_dir / f"{hostname}.keystore.jks"
p12_path = self.ssl_dir / f"{hostname}.p12"
# 创建PKCS12文件
cmd_p12 = [
"openssl", "pkcs12", "-export",
"-in", server_cert,
"-inkey", server_key,
"-out", str(p12_path),
"-name", hostname,
"-password", f"pass:{keystore_password}"
]
result = subprocess.run(cmd_p12, capture_output=True, text=True, timeout=60)
if result.returncode != 0:
raise Exception(f"创建PKCS12文件失败: {result.stderr}")
# 转换为JKS格式
cmd_jks = [
"keytool", "-importkeystore",
"-deststorepass", keystore_password,
"-destkeypass", keystore_password,
"-destkeystore", str(keystore_path),
"-srckeystore", str(p12_path),
"-srcstoretype", "PKCS12",
"-srcstorepass", keystore_password,
"-alias", hostname,
"-noprompt"
]
result = subprocess.run(cmd_jks, capture_output=True, text=True, timeout=60)
if result.returncode != 0:
raise Exception(f"创建Keystore失败: {result.stderr}")
# 导入CA证书
cmd_ca = [
"keytool", "-import",
"-trustcacerts",
"-alias", "CARoot",
"-file", ca_cert,
"-keystore", str(keystore_path),
"-storepass", keystore_password,
"-noprompt"
]
result = subprocess.run(cmd_ca, capture_output=True, text=True, timeout=60)
if result.returncode != 0:
self.logger.warning(f"导入CA证书失败: {result.stderr}")
# 清理临时文件
p12_path.unlink()
self.logger.info(f"Keystore创建成功: {keystore_path}")
return str(keystore_path)
except Exception as e:
self.logger.error(f"创建Keystore失败: {e}")
raise
def create_truststore(self, ca_cert: str, truststore_password: str) -> str:
"""创建Truststore"""
try:
truststore_path = self.ssl_dir / "kafka.truststore.jks"
cmd = [
"keytool", "-import",
"-trustcacerts",
"-alias", "CARoot",
"-file", ca_cert,
"-keystore", str(truststore_path),
"-storepass", truststore_password,
"-noprompt"
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=60)
if result.returncode != 0:
raise Exception(f"创建Truststore失败: {result.stderr}")
self.logger.info(f"Truststore创建成功: {truststore_path}")
return str(truststore_path)
except Exception as e:
self.logger.error(f"创建Truststore失败: {e}")
raise
def generate_ssl_config(self, hostname: str, keystore_password: str,
truststore_password: str) -> SSLConfig:
"""生成完整的SSL配置"""
try:
# 生成CA证书
ca_files = self.generate_ca_certificate()
# 生成服务器证书
server_files = self.generate_server_certificate(
hostname, ca_files["ca_cert"], ca_files["ca_key"]
)
# 创建Keystore
keystore_path = self.create_keystore(
hostname,
server_files["server_key"],
server_files["server_cert"],
ca_files["ca_cert"],
keystore_password
)
# 创建Truststore
truststore_path = self.create_truststore(
ca_files["ca_cert"], truststore_password
)
return SSLConfig(
keystore_path=keystore_path,
keystore_password=keystore_password,
truststore_path=truststore_path,
truststore_password=truststore_password,
key_password=keystore_password,
enabled_protocols=["TLSv1.2", "TLSv1.3"],
cipher_suites=[
"TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384",
"TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256",
"TLS_RSA_WITH_AES_256_GCM_SHA384",
"TLS_RSA_WITH_AES_128_GCM_SHA256"
]
)
except Exception as e:
self.logger.error(f"生成SSL配置失败: {e}")
raise
def generate_broker_ssl_properties(self, ssl_config: SSLConfig) -> str:
"""生成Broker SSL配置"""
properties = f"""
# SSL配置
listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093
security.inter.broker.protocol=SSL
ssl.keystore.location={ssl_config.keystore_path}
ssl.keystore.password={ssl_config.keystore_password}
ssl.key.password={ssl_config.key_password}
ssl.truststore.location={ssl_config.truststore_path}
ssl.truststore.password={ssl_config.truststore_password}
ssl.client.auth=required
ssl.protocol={ssl_config.protocol}
ssl.enabled.protocols={','.join(ssl_config.enabled_protocols or [ssl_config.protocol])}
ssl.cipher.suites={','.join(ssl_config.cipher_suites or [])}
ssl.endpoint.identification.algorithm=HTTPS
"""
return properties
def generate_client_ssl_properties(self, ssl_config: SSLConfig) -> str:
"""生成客户端SSL配置"""
properties = f"""
# 客户端SSL配置
security.protocol=SSL
ssl.truststore.location={ssl_config.truststore_path}
ssl.truststore.password={ssl_config.truststore_password}
ssl.keystore.location={ssl_config.keystore_path}
ssl.keystore.password={ssl_config.keystore_password}
ssl.key.password={ssl_config.key_password}
ssl.protocol={ssl_config.protocol}
ssl.enabled.protocols={','.join(ssl_config.enabled_protocols or [ssl_config.protocol])}
ssl.endpoint.identification.algorithm=HTTPS
"""
return properties
### 6.4.2 SASL认证配置
class KafkaSASLManager:
"""Kafka SASL管理器"""
def __init__(self, kafka_home: str, config_dir: str):
self.kafka_home = kafka_home
self.config_dir = Path(config_dir)
self.config_dir.mkdir(parents=True, exist_ok=True)
self.logger = logging.getLogger(__name__)
def create_jaas_config(self, mechanism: str = "PLAIN",
users: Dict[str, str] = None) -> str:
"""创建JAAS配置文件"""
try:
jaas_file = self.config_dir / "kafka_server_jaas.conf"
if mechanism == "PLAIN":
# PLAIN机制配置
users = users or {"admin": "admin-secret", "user1": "user1-secret"}
user_entries = [f'user_{username}="{password}"' for username, password in users.items()]
jaas_content = f"""
KafkaServer {{
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret"
{' '.join(user_entries)};
}};
Client {{
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret";
}};
"""
elif mechanism == "SCRAM-SHA-256":
# SCRAM-SHA-256机制配置
jaas_content = """
KafkaServer {
org.apache.kafka.common.security.scram.ScramLoginModule required;
};
Client {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="admin-secret";
};
"""
elif mechanism == "GSSAPI":
# Kerberos配置
jaas_content = """
KafkaServer {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/etc/security/keytabs/kafka.service.keytab"
principal="kafka/kafka-broker@EXAMPLE.COM";
};
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/etc/security/keytabs/kafka.client.keytab"
principal="kafka-client@EXAMPLE.COM";
};
"""
else:
raise ValueError(f"不支持的SASL机制: {mechanism}")
with open(jaas_file, 'w', encoding='utf-8') as f:
f.write(jaas_content)
self.logger.info(f"JAAS配置文件创建成功: {jaas_file}")
return str(jaas_file)
except Exception as e:
self.logger.error(f"创建JAAS配置失败: {e}")
raise
def create_scram_users(self, users: Dict[str, str],
bootstrap_servers: str = "localhost:9092") -> bool:
"""创建SCRAM用户"""
try:
for username, password in users.items():
cmd = [
f"{self.kafka_home}/bin/kafka-configs.sh",
"--bootstrap-server", bootstrap_servers,
"--alter",
"--add-config", f"SCRAM-SHA-256=[password={password}]",
"--entity-type", "users",
"--entity-name", username
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if result.returncode == 0:
self.logger.info(f"SCRAM用户创建成功: {username}")
else:
self.logger.error(f"SCRAM用户创建失败 {username}: {result.stderr}")
return False
return True
except Exception as e:
self.logger.error(f"创建SCRAM用户失败: {e}")
return False
def generate_sasl_properties(self, mechanism: str, jaas_file: str) -> str:
"""生成SASL配置属性"""
properties = f"""
# SASL配置
listeners=SASL_PLAINTEXT://localhost:9092,SASL_SSL://localhost:9093
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol={mechanism}
sasl.enabled.mechanisms={mechanism}
# JAAS配置文件
java.security.auth.login.config={jaas_file}
"""
if mechanism == "GSSAPI":
properties += """
# Kerberos配置
sasl.kerberos.service.name=kafka
sasl.kerberos.principal.to.local.rules=RULE:[1:$1@$0](.*@EXAMPLE.COM)s/@.*//
"""
return properties
### 6.4.3 ACL权限管理
class KafkaACLManager:
"""Kafka ACL管理器"""
def __init__(self, kafka_home: str, bootstrap_servers: str):
self.kafka_home = kafka_home
self.bootstrap_servers = bootstrap_servers
self.logger = logging.getLogger(__name__)
def add_acl(self, principal: str, operation: str, resource_type: str,
resource_name: str, permission: str = "Allow") -> bool:
"""添加ACL规则"""
try:
cmd = [
f"{self.kafka_home}/bin/kafka-acls.sh",
"--bootstrap-server", self.bootstrap_servers,
"--add",
"--allow-principal", principal,
"--operation", operation,
f"--{resource_type.lower()}", resource_name
]
if permission == "Deny":
cmd[5] = "--deny-principal"
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if result.returncode == 0:
self.logger.info(f"ACL规则添加成功: {principal} {operation} {resource_type}:{resource_name}")
return True
else:
self.logger.error(f"ACL规则添加失败: {result.stderr}")
return False
except Exception as e:
self.logger.error(f"添加ACL规则失败: {e}")
return False
def remove_acl(self, principal: str, operation: str, resource_type: str,
resource_name: str) -> bool:
"""删除ACL规则"""
try:
cmd = [
f"{self.kafka_home}/bin/kafka-acls.sh",
"--bootstrap-server", self.bootstrap_servers,
"--remove",
"--allow-principal", principal,
"--operation", operation,
f"--{resource_type.lower()}", resource_name,
"--force"
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if result.returncode == 0:
self.logger.info(f"ACL规则删除成功: {principal} {operation} {resource_type}:{resource_name}")
return True
else:
self.logger.error(f"ACL规则删除失败: {result.stderr}")
return False
except Exception as e:
self.logger.error(f"删除ACL规则失败: {e}")
return False
def list_acls(self, resource_type: Optional[str] = None,
resource_name: Optional[str] = None) -> List[Dict[str, str]]:
"""列出ACL规则"""
try:
cmd = [
f"{self.kafka_home}/bin/kafka-acls.sh",
"--bootstrap-server", self.bootstrap_servers,
"--list"
]
if resource_type and resource_name:
cmd.extend([f"--{resource_type.lower()}", resource_name])
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if result.returncode != 0:
self.logger.error(f"列出ACL规则失败: {result.stderr}")
return []
acls = []
for line in result.stdout.split('\n'):
if line.strip() and 'Current ACLs' not in line:
# 解析ACL行(简化实现)
if 'principal' in line.lower():
acls.append({"rule": line.strip()})
return acls
except Exception as e:
self.logger.error(f"列出ACL规则失败: {e}")
return []
def setup_basic_acls(self, admin_user: str = "User:admin") -> bool:
"""设置基本ACL规则"""
try:
# 管理员权限
admin_rules = [
("All", "Topic", "*"),
("All", "Group", "*"),
("All", "Cluster", "kafka-cluster"),
("All", "TransactionalId", "*")
]
for operation, resource_type, resource_name in admin_rules:
success = self.add_acl(admin_user, operation, resource_type, resource_name)
if not success:
return False
self.logger.info("基本ACL规则设置完成")
return True
except Exception as e:
self.logger.error(f"设置基本ACL规则失败: {e}")
return False
# 使用示例
if __name__ == "__main__":
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
print("=== Kafka安全配置 ===")
# SSL配置
print("\n1. SSL配置")
ssl_manager = KafkaSSLManager(
kafka_home="/opt/kafka",
ssl_dir="/opt/kafka/ssl"
)
try:
ssl_config = ssl_manager.generate_ssl_config(
hostname="kafka-broker-1",
keystore_password="keystore-password",
truststore_password="truststore-password"
)
print(f"SSL配置生成成功")
print(f"Keystore: {ssl_config.keystore_path}")
print(f"Truststore: {ssl_config.truststore_path}")
# 生成配置文件
broker_props = ssl_manager.generate_broker_ssl_properties(ssl_config)
client_props = ssl_manager.generate_client_ssl_properties(ssl_config)
print("\nBroker SSL配置:")
print(broker_props[:200] + "...")
except Exception as e:
print(f"SSL配置失败: {e}")
# SASL配置
print("\n2. SASL配置")
sasl_manager = KafkaSASLManager(
kafka_home="/opt/kafka",
config_dir="/opt/kafka/config"
)
try:
jaas_file = sasl_manager.create_jaas_config(
mechanism="PLAIN",
users={"admin": "admin-secret", "user1": "user1-secret"}
)
print(f"JAAS配置文件创建成功: {jaas_file}")
sasl_props = sasl_manager.generate_sasl_properties("PLAIN", jaas_file)
print("\nSASL配置:")
print(sasl_props[:200] + "...")
except Exception as e:
print(f"SASL配置失败: {e}")
# ACL配置
print("\n3. ACL配置")
acl_manager = KafkaACLManager(
kafka_home="/opt/kafka",
bootstrap_servers="localhost:9092"
)
# 设置基本ACL规则
success = acl_manager.setup_basic_acls("User:admin")
print(f"基本ACL规则设置: {'成功' if success else '失败'}")
# 列出ACL规则
acls = acl_manager.list_acls()
print(f"\n当前ACL规则数量: {len(acls)}")
6.5 版本升级
6.5.1 滚动升级
import time
import subprocess
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from datetime import datetime
import logging
@dataclass
class UpgradeConfig:
"""升级配置"""
current_version: str
target_version: str
kafka_home: str
backup_dir: str
brokers: List[Dict[str, Any]]
upgrade_timeout: int = 300 # 秒
health_check_interval: int = 30 # 秒
@dataclass
class UpgradeStatus:
"""升级状态"""
broker_id: int
status: str # pending, upgrading, completed, failed
start_time: Optional[datetime]
end_time: Optional[datetime]
error_message: Optional[str]
class KafkaUpgradeManager:
"""Kafka升级管理器"""
def __init__(self, upgrade_config: UpgradeConfig):
self.config = upgrade_config
self.logger = logging.getLogger(__name__)
self.upgrade_statuses = {
broker["id"]: UpgradeStatus(
broker_id=broker["id"],
status="pending",
start_time=None,
end_time=None,
error_message=None
)
for broker in upgrade_config.brokers
}
def validate_upgrade(self) -> bool:
"""验证升级前提条件"""
try:
self.logger.info("开始验证升级前提条件")
# 检查版本兼容性
if not self._check_version_compatibility():
return False
# 检查集群健康状态
if not self._check_cluster_health():
return False
# 检查磁盘空间
if not self._check_disk_space():
return False
# 检查备份
if not self._check_backup_availability():
return False
self.logger.info("升级前提条件验证通过")
return True
except Exception as e:
self.logger.error(f"验证升级前提条件失败: {e}")
return False
def perform_rolling_upgrade(self) -> bool:
"""执行滚动升级"""
try:
self.logger.info(f"开始滚动升级: {self.config.current_version} -> {self.config.target_version}")
# 验证前提条件
if not self.validate_upgrade():
return False
# 按顺序升级每个broker
for broker in self.config.brokers:
broker_id = broker["id"]
self.logger.info(f"开始升级Broker {broker_id}")
success = self._upgrade_single_broker(broker)
if success:
self.logger.info(f"Broker {broker_id} 升级成功")
else:
self.logger.error(f"Broker {broker_id} 升级失败")
return False
# 等待集群稳定
if not self._wait_for_cluster_stability():
self.logger.error("集群不稳定,停止升级")
return False
self.logger.info("滚动升级完成")
return True
except Exception as e:
self.logger.error(f"滚动升级失败: {e}")
return False
def _upgrade_single_broker(self, broker: Dict[str, Any]) -> bool:
"""升级单个broker"""
broker_id = broker["id"]
status = self.upgrade_statuses[broker_id]
try:
status.status = "upgrading"
status.start_time = datetime.now()
# 1. 停止broker
self.logger.info(f"停止Broker {broker_id}")
if not self._stop_broker(broker):
raise Exception("停止broker失败")
# 2. 备份配置
self.logger.info(f"备份Broker {broker_id} 配置")
if not self._backup_broker_config(broker):
raise Exception("备份配置失败")
# 3. 更新Kafka二进制文件
self.logger.info(f"更新Broker {broker_id} 二进制文件")
if not self._update_kafka_binaries(broker):
raise Exception("更新二进制文件失败")
# 4. 更新配置文件
self.logger.info(f"更新Broker {broker_id} 配置")
if not self._update_broker_config(broker):
raise Exception("更新配置失败")
# 5. 启动broker
self.logger.info(f"启动Broker {broker_id}")
if not self._start_broker(broker):
raise Exception("启动broker失败")
# 6. 健康检查
self.logger.info(f"检查Broker {broker_id} 健康状态")
if not self._health_check_broker(broker):
raise Exception("健康检查失败")
status.status = "completed"
status.end_time = datetime.now()
return True
except Exception as e:
status.status = "failed"
status.end_time = datetime.now()
status.error_message = str(e)
self.logger.error(f"升级Broker {broker_id} 失败: {e}")
return False
def _stop_broker(self, broker: Dict[str, Any]) -> bool:
"""停止broker"""
try:
if broker.get("deployment_type") == "systemd":
cmd = ["sudo", "systemctl", "stop", f"kafka-{broker['id']}"]
elif broker.get("deployment_type") == "docker":
cmd = ["docker", "stop", f"kafka-{broker['id']}"]
else:
# 默认使用脚本停止
cmd = [f"{self.config.kafka_home}/bin/kafka-server-stop.sh"]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=60)
return result.returncode == 0
except Exception as e:
self.logger.error(f"停止broker失败: {e}")
return False
def _start_broker(self, broker: Dict[str, Any]) -> bool:
"""启动broker"""
try:
if broker.get("deployment_type") == "systemd":
cmd = ["sudo", "systemctl", "start", f"kafka-{broker['id']}"]
elif broker.get("deployment_type") == "docker":
cmd = ["docker", "start", f"kafka-{broker['id']}"]
else:
# 默认使用脚本启动
config_file = broker.get("config_file", f"{self.config.kafka_home}/config/server.properties")
cmd = [
f"{self.config.kafka_home}/bin/kafka-server-start.sh",
"-daemon",
config_file
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=60)
return result.returncode == 0
except Exception as e:
self.logger.error(f"启动broker失败: {e}")
return False
def _backup_broker_config(self, broker: Dict[str, Any]) -> bool:
"""备份broker配置"""
try:
import shutil
from pathlib import Path
backup_dir = Path(self.config.backup_dir) / f"broker-{broker['id']}"
backup_dir.mkdir(parents=True, exist_ok=True)
config_file = broker.get("config_file", f"{self.config.kafka_home}/config/server.properties")
backup_file = backup_dir / f"server.properties.{datetime.now().strftime('%Y%m%d_%H%M%S')}"
shutil.copy2(config_file, backup_file)
self.logger.info(f"配置备份完成: {backup_file}")
return True
except Exception as e:
self.logger.error(f"备份配置失败: {e}")
return False
def _update_kafka_binaries(self, broker: Dict[str, Any]) -> bool:
"""更新Kafka二进制文件"""
try:
# 这里应该实现实际的二进制文件更新逻辑
# 例如:下载新版本、解压、创建符号链接等
self.logger.info(f"模拟更新Kafka二进制文件到版本 {self.config.target_version}")
# 模拟更新过程
time.sleep(2)
return True
except Exception as e:
self.logger.error(f"更新二进制文件失败: {e}")
return False
def _update_broker_config(self, broker: Dict[str, Any]) -> bool:
"""更新broker配置"""
try:
# 这里应该实现配置文件的更新逻辑
# 例如:添加新版本的配置项、移除废弃的配置等
config_file = broker.get("config_file", f"{self.config.kafka_home}/config/server.properties")
# 读取现有配置
with open(config_file, 'r', encoding='utf-8') as f:
config_content = f.read()
# 添加版本相关的配置(示例)
if "inter.broker.protocol.version" not in config_content:
config_content += f"\n# 升级配置\ninter.broker.protocol.version={self.config.target_version}\n"
# 写回配置文件
with open(config_file, 'w', encoding='utf-8') as f:
f.write(config_content)
self.logger.info(f"配置文件更新完成: {config_file}")
return True
except Exception as e:
self.logger.error(f"更新配置失败: {e}")
return False
def _health_check_broker(self, broker: Dict[str, Any]) -> bool:
"""健康检查broker"""
try:
broker_id = broker["id"]
host = broker.get("host", "localhost")
port = broker.get("port", 9092)
# 检查broker是否在线
cmd = [
f"{self.config.kafka_home}/bin/kafka-broker-api-versions.sh",
"--bootstrap-server", f"{host}:{port}"
]
max_retries = 10
for attempt in range(max_retries):
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if result.returncode == 0:
self.logger.info(f"Broker {broker_id} 健康检查通过")
return True
self.logger.info(f"Broker {broker_id} 健康检查失败,重试 {attempt + 1}/{max_retries}")
time.sleep(self.config.health_check_interval)
return False
except Exception as e:
self.logger.error(f"健康检查失败: {e}")
return False
def _check_version_compatibility(self) -> bool:
"""检查版本兼容性"""
try:
# 这里应该实现版本兼容性检查逻辑
# 例如:检查是否支持从当前版本升级到目标版本
compatible_upgrades = {
"2.8.0": ["3.0.0", "3.1.0", "3.2.0"],
"3.0.0": ["3.1.0", "3.2.0", "3.3.0"],
"3.1.0": ["3.2.0", "3.3.0", "3.4.0"],
"3.2.0": ["3.3.0", "3.4.0", "3.5.0"]
}
if self.config.current_version in compatible_upgrades:
if self.config.target_version in compatible_upgrades[self.config.current_version]:
return True
self.logger.error(f"不支持从 {self.config.current_version} 升级到 {self.config.target_version}")
return False
except Exception as e:
self.logger.error(f"检查版本兼容性失败: {e}")
return False
def _check_cluster_health(self) -> bool:
"""检查集群健康状态"""
try:
# 检查所有broker是否在线
for broker in self.config.brokers:
if not self._health_check_broker(broker):
self.logger.error(f"Broker {broker['id']} 不健康")
return False
# 检查是否有正在进行的重分配
cmd = [
f"{self.config.kafka_home}/bin/kafka-reassign-partitions.sh",
"--bootstrap-server", "localhost:9092",
"--list"
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if "No partition reassignment is in progress" not in result.stdout:
self.logger.error("有正在进行的分区重分配,请等待完成后再升级")
return False
return True
except Exception as e:
self.logger.error(f"检查集群健康状态失败: {e}")
return False
def _check_disk_space(self) -> bool:
"""检查磁盘空间"""
try:
import shutil
# 检查Kafka日志目录的磁盘空间
for broker in self.config.brokers:
log_dirs = broker.get("log_dirs", ["/var/kafka-logs"])
for log_dir in log_dirs:
try:
total, used, free = shutil.disk_usage(log_dir)
free_percent = (free / total) * 100
if free_percent < 20: # 至少保留20%的空间
self.logger.error(f"Broker {broker['id']} 磁盘空间不足: {log_dir} ({free_percent:.1f}% 可用)")
return False
except FileNotFoundError:
self.logger.warning(f"日志目录不存在: {log_dir}")
return True
except Exception as e:
self.logger.error(f"检查磁盘空间失败: {e}")
return False
def _check_backup_availability(self) -> bool:
"""检查备份可用性"""
try:
from pathlib import Path
backup_dir = Path(self.config.backup_dir)
if not backup_dir.exists():
backup_dir.mkdir(parents=True, exist_ok=True)
# 检查备份目录是否可写
test_file = backup_dir / "test_write"
try:
test_file.write_text("test")
test_file.unlink()
except Exception:
self.logger.error(f"备份目录不可写: {backup_dir}")
return False
return True
except Exception as e:
self.logger.error(f"检查备份可用性失败: {e}")
return False
def _wait_for_cluster_stability(self) -> bool:
"""等待集群稳定"""
try:
self.logger.info("等待集群稳定...")
stable_checks = 0
required_stable_checks = 3
while stable_checks < required_stable_checks:
# 检查所有broker是否健康
all_healthy = True
for broker in self.config.brokers:
if not self._health_check_broker(broker):
all_healthy = False
break
if all_healthy:
stable_checks += 1
self.logger.info(f"集群稳定检查 {stable_checks}/{required_stable_checks}")
else:
stable_checks = 0
self.logger.info("集群不稳定,重新开始检查")
time.sleep(self.config.health_check_interval)
self.logger.info("集群已稳定")
return True
except Exception as e:
self.logger.error(f"等待集群稳定失败: {e}")
return False
def get_upgrade_status(self) -> Dict[int, UpgradeStatus]:
"""获取升级状态"""
return self.upgrade_statuses.copy()
# 使用示例
if __name__ == "__main__":
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# 升级配置
upgrade_config = UpgradeConfig(
current_version="3.2.0",
target_version="3.3.0",
kafka_home="/opt/kafka",
backup_dir="/opt/kafka/backups",
brokers=[
{
"id": 1,
"host": "kafka-1",
"port": 9092,
"deployment_type": "systemd",
"config_file": "/opt/kafka/config/server-1.properties",
"log_dirs": ["/var/kafka-logs-1"]
},
{
"id": 2,
"host": "kafka-2",
"port": 9092,
"deployment_type": "systemd",
"config_file": "/opt/kafka/config/server-2.properties",
"log_dirs": ["/var/kafka-logs-2"]
},
{
"id": 3,
"host": "kafka-3",
"port": 9092,
"deployment_type": "systemd",
"config_file": "/opt/kafka/config/server-3.properties",
"log_dirs": ["/var/kafka-logs-3"]
}
]
)
print("=== Kafka滚动升级 ===")
# 创建升级管理器
upgrade_manager = KafkaUpgradeManager(upgrade_config)
# 验证升级前提条件
print("\n1. 验证升级前提条件")
if upgrade_manager.validate_upgrade():
print("升级前提条件验证通过")
# 执行滚动升级
print("\n2. 执行滚动升级")
success = upgrade_manager.perform_rolling_upgrade()
if success:
print("滚动升级完成")
else:
print("滚动升级失败")
# 显示升级状态
print("\n3. 升级状态")
statuses = upgrade_manager.get_upgrade_status()
for broker_id, status in statuses.items():
duration = ""
if status.start_time and status.end_time:
duration = f" ({(status.end_time - status.start_time).total_seconds():.1f}s)"
print(f" Broker {broker_id}: {status.status}{duration}")
if status.error_message:
print(f" 错误: {status.error_message}")
else:
print("升级前提条件验证失败,无法进行升级")
6.6 本章总结
6.6.1 核心知识点
集群架构设计
- 容量规划和硬件选择
- 网络架构和拓扑设计
- 配置管理和标准化
自动化部署
- Ansible自动化部署
- Docker容器化部署
- Kubernetes云原生部署
集群管理
- 状态监控和指标收集
- 备份与恢复策略
- 扩容缩容操作
安全配置
- SSL/TLS加密通信
- SASL认证机制
- ACL权限管理
版本升级
- 滚动升级策略
- 兼容性检查
- 风险控制和回滚
6.6.2 最佳实践
部署规划
- 根据业务需求进行容量规划
- 选择合适的硬件配置
- 设计冗余和高可用架构
自动化运维
- 使用基础设施即代码(IaC)
- 实现配置管理标准化
- 建立CI/CD部署流水线
安全加固
- 启用SSL/TLS加密
- 配置SASL认证
- 实施细粒度ACL控制
- 定期更新证书和密钥
运维管理
- 建立完善的监控体系
- 制定备份恢复策略
- 规划扩容缩容流程
- 建立应急响应机制
升级维护
- 制定升级计划和时间窗口
- 执行充分的测试验证
- 实施滚动升级策略
- 准备回滚方案
6.6.3 练习题
基础练习
- 设计一个3节点Kafka集群的硬件配置
- 编写Ansible playbook部署Kafka集群
- 配置SSL加密和SASL认证
进阶练习
- 实现Kafka集群的自动化扩容
- 设计完整的备份恢复方案
- 编写滚动升级脚本
实战项目
- 部署生产级Kafka集群
- 实现集群监控和告警
- 建立运维管理平台
通过本章的学习,你应该掌握了Kafka集群的部署、管理和运维技能,能够在生产环境中稳定运行Kafka集群。”: “directory”, “owner”: “{{ zk_user }}”, “group”: “{{ zk_group }}”, “mode”: “0755” }, “loop”: [ “{{ zk_data_dir }}”, “{{ zk_log_dir }}”, “/opt/zookeeper” ] }, { “name”: “Download ZooKeeper”, “get_url”: { “url”: “https://archive.apache.org/dist/zookeeper/zookeeper-{{ zk_version }}/apache-zookeeper-{{ zk_version }}-bin.tar.gz”, “dest”: “/tmp/zookeeper-{{ zk_version }}.tar.gz”, “mode”: “0644” } }, { “name”: “Extract ZooKeeper”, “unarchive”: { “src”: “/tmp/zookeeper-{{ zk_version }}.tar.gz”, “dest”: “/opt”, “remote_src”: True, “owner”: “{{ zk_user }}”, “group”: “{{ zk_group }}”, “creates”: “/opt/apache-zookeeper-{{ zk_version }}-bin” } }, { “name”: “Create ZooKeeper symlink”, “file”: { “src”: “/opt/apache-zookeeper-{{ zk_version }}-bin”, “dest”: “/opt/zookeeper/current”, “state”: “link”, “owner”: “{{ zk_user }}”, “group”: “{{ zk_group }}” } }, { “name”: “Generate ZooKeeper configuration”, “template”: { “src”: “zookeeper.properties.j2”, “dest”: “/opt/zookeeper/current/conf/zoo.cfg”, “owner”: “{{ zk_user }}”, “group”: “{{ zk_group }}”, “mode”: “0644” }, “notify”: “restart zookeeper” }, { “name”: “Create ZooKeeper myid file”, “copy”: { “content”: “{{ zk_id }}”, “dest”: “{{ zk_data_dir }}/myid”, “owner”: “{{ zk_user }}”, “group”: “{{ zk_group }}”, “mode”: “0644” } }, { “name”: “Create ZooKeeper systemd service”, “template”: { “src”: “zookeeper.service.j2”, “dest”: “/etc/systemd/system/zookeeper.service”, “mode”: “0644” }, “notify”: [ “reload systemd”, “restart zookeeper” ] }, { “name”: “Start and enable ZooKeeper”, “systemd”: { “name”: “zookeeper”, “state”: “started”, “enabled”: True, “daemon_reload”: True } } ], “handlers”: [ { “name”: “reload systemd”, “systemd”: { “daemon_reload”: True } }, { “name”: “restart zookeeper”, “systemd”: { “name”: “zookeeper”, “state