2.1 环境准备
系统要求
import platform
import psutil
import subprocess
from dataclasses import dataclass
from typing import Dict, List, Optional
@dataclass
class SystemRequirement:
"""系统要求配置"""
min_memory_gb: int = 4
min_disk_gb: int = 20
min_cpu_cores: int = 2
java_version: str = "8+"
supported_os: List[str] = None
def __post_init__(self):
if self.supported_os is None:
self.supported_os = ["Linux", "macOS", "Windows"]
class SystemChecker:
"""系统环境检查器"""
def __init__(self):
self.requirements = SystemRequirement()
def check_all(self) -> Dict[str, bool]:
"""检查所有系统要求"""
results = {
"操作系统": self.check_os(),
"内存": self.check_memory(),
"磁盘空间": self.check_disk(),
"CPU核心数": self.check_cpu(),
"Java环境": self.check_java()
}
print("=== 系统环境检查结果 ===")
for item, passed in results.items():
status = "✓ 通过" if passed else "✗ 不满足"
print(f"{item}: {status}")
return results
def check_os(self) -> bool:
"""检查操作系统"""
current_os = platform.system()
supported = current_os in self.requirements.supported_os
print(f"当前操作系统: {current_os} {platform.release()}")
return supported
def check_memory(self) -> bool:
"""检查内存"""
memory_gb = psutil.virtual_memory().total / (1024**3)
print(f"系统内存: {memory_gb:.1f} GB")
return memory_gb >= self.requirements.min_memory_gb
def check_disk(self) -> bool:
"""检查磁盘空间"""
disk_usage = psutil.disk_usage('/')
free_gb = disk_usage.free / (1024**3)
print(f"可用磁盘空间: {free_gb:.1f} GB")
return free_gb >= self.requirements.min_disk_gb
def check_cpu(self) -> bool:
"""检查CPU核心数"""
cpu_count = psutil.cpu_count(logical=False)
print(f"CPU物理核心数: {cpu_count}")
return cpu_count >= self.requirements.min_cpu_cores
def check_java(self) -> bool:
"""检查Java环境"""
try:
result = subprocess.run(
['java', '-version'],
capture_output=True,
text=True,
stderr=subprocess.STDOUT
)
if result.returncode == 0:
version_info = result.stdout
print(f"Java版本信息: {version_info.split()[2]}")
return True
else:
print("Java未安装或不在PATH中")
return False
except FileNotFoundError:
print("Java未安装")
return False
def get_recommendations(self) -> List[str]:
"""获取环境配置建议"""
recommendations = [
"建议使用SSD硬盘以提高I/O性能",
"确保网络延迟低于10ms(集群环境)",
"配置足够的文件描述符限制(ulimit -n 65536)",
"设置合适的JVM堆内存(建议系统内存的25-50%)",
"关闭swap以避免性能问题",
"使用专用的Kafka用户运行服务"
]
return recommendations
# 使用示例
if __name__ == "__main__":
checker = SystemChecker()
results = checker.check_all()
if all(results.values()):
print("\n✓ 系统环境满足Kafka运行要求")
else:
print("\n✗ 系统环境需要调整")
print("\n建议:")
for rec in checker.get_recommendations():
print(f"- {rec}")
Java环境配置
# Ubuntu/Debian安装OpenJDK
sudo apt update
sudo apt install openjdk-11-jdk
# CentOS/RHEL安装OpenJDK
sudo yum install java-11-openjdk-devel
# macOS使用Homebrew安装
brew install openjdk@11
# 验证Java安装
java -version
javac -version
# 设置JAVA_HOME环境变量
export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64
export PATH=$JAVA_HOME/bin:$PATH
# 将环境变量添加到配置文件
echo 'export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64' >> ~/.bashrc
echo 'export PATH=$JAVA_HOME/bin:$PATH' >> ~/.bashrc
source ~/.bashrc
2.2 单机安装
下载和安装
import os
import wget
import tarfile
import shutil
from pathlib import Path
class KafkaInstaller:
"""Kafka安装器"""
def __init__(self, version: str = "2.8.2", scala_version: str = "2.13"):
self.version = version
self.scala_version = scala_version
self.kafka_name = f"kafka_{scala_version}-{version}"
self.download_url = f"https://downloads.apache.org/kafka/{version}/{self.kafka_name}.tgz"
self.install_dir = "/opt/kafka"
self.data_dir = "/var/kafka-logs"
self.zk_data_dir = "/var/zookeeper"
def download(self, download_dir: str = "/tmp") -> str:
"""下载Kafka"""
download_path = os.path.join(download_dir, f"{self.kafka_name}.tgz")
if os.path.exists(download_path):
print(f"文件已存在: {download_path}")
return download_path
print(f"正在下载Kafka {self.version}...")
try:
wget.download(self.download_url, download_path)
print(f"\n下载完成: {download_path}")
return download_path
except Exception as e:
print(f"下载失败: {e}")
raise
def extract(self, archive_path: str, extract_to: str = "/opt") -> str:
"""解压Kafka"""
print(f"正在解压到 {extract_to}...")
with tarfile.open(archive_path, 'r:gz') as tar:
tar.extractall(extract_to)
extracted_path = os.path.join(extract_to, self.kafka_name)
# 创建符号链接
if os.path.exists(self.install_dir):
os.remove(self.install_dir)
os.symlink(extracted_path, self.install_dir)
print(f"Kafka安装到: {self.install_dir}")
return extracted_path
def create_directories(self):
"""创建必要的目录"""
directories = [
self.data_dir,
self.zk_data_dir,
"/var/log/kafka",
"/etc/kafka"
]
for directory in directories:
Path(directory).mkdir(parents=True, exist_ok=True)
print(f"创建目录: {directory}")
def setup_user(self):
"""创建Kafka用户"""
try:
# 创建kafka用户和组
os.system("sudo groupadd -r kafka")
os.system("sudo useradd -r -g kafka -d /opt/kafka -s /bin/bash kafka")
# 设置目录权限
os.system(f"sudo chown -R kafka:kafka {self.install_dir}")
os.system(f"sudo chown -R kafka:kafka {self.data_dir}")
os.system(f"sudo chown -R kafka:kafka {self.zk_data_dir}")
os.system("sudo chown -R kafka:kafka /var/log/kafka")
print("Kafka用户创建完成")
except Exception as e:
print(f"创建用户失败: {e}")
def install(self) -> bool:
"""完整安装流程"""
try:
# 1. 下载
archive_path = self.download()
# 2. 解压
self.extract(archive_path)
# 3. 创建目录
self.create_directories()
# 4. 设置用户
self.setup_user()
# 5. 设置环境变量
self.setup_environment()
print("\n✓ Kafka安装完成")
return True
except Exception as e:
print(f"\n✗ 安装失败: {e}")
return False
def setup_environment(self):
"""设置环境变量"""
env_content = f"""
# Kafka Environment
export KAFKA_HOME={self.install_dir}
export PATH=$KAFKA_HOME/bin:$PATH
"""
# 写入环境变量文件
with open("/etc/environment.d/kafka.conf", "w") as f:
f.write(env_content)
print("环境变量配置完成")
# 使用示例
if __name__ == "__main__":
installer = KafkaInstaller()
success = installer.install()
if success:
print("\n可以开始配置Kafka了")
else:
print("\n请检查错误信息并重试")
基础配置
import configparser
from typing import Dict, Any
class KafkaConfigManager:
"""Kafka配置管理器"""
def __init__(self, config_dir: str = "/opt/kafka/config"):
self.config_dir = config_dir
self.server_properties = os.path.join(config_dir, "server.properties")
self.zk_properties = os.path.join(config_dir, "zookeeper.properties")
def generate_server_config(self, broker_id: int = 0,
custom_configs: Dict[str, Any] = None) -> str:
"""生成server.properties配置"""
default_config = {
# Broker基础配置
"broker.id": broker_id,
"listeners": "PLAINTEXT://localhost:9092",
"advertised.listeners": "PLAINTEXT://localhost:9092",
"num.network.threads": 3,
"num.io.threads": 8,
# 日志配置
"log.dirs": "/var/kafka-logs",
"num.partitions": 1,
"default.replication.factor": 1,
"min.insync.replicas": 1,
# 日志保留策略
"log.retention.hours": 168, # 7天
"log.retention.bytes": 1073741824, # 1GB
"log.segment.bytes": 1073741824, # 1GB
"log.retention.check.interval.ms": 300000, # 5分钟
# ZooKeeper配置
"zookeeper.connect": "localhost:2181",
"zookeeper.connection.timeout.ms": 18000,
# 其他配置
"group.initial.rebalance.delay.ms": 0,
"offsets.topic.replication.factor": 1,
"transaction.state.log.replication.factor": 1,
"transaction.state.log.min.isr": 1,
# 性能调优
"socket.send.buffer.bytes": 102400,
"socket.receive.buffer.bytes": 102400,
"socket.request.max.bytes": 104857600,
"replica.fetch.max.bytes": 1048576,
"message.max.bytes": 1000000
}
# 合并自定义配置
if custom_configs:
default_config.update(custom_configs)
# 生成配置文件内容
config_lines = []
config_lines.append("# Kafka Server Configuration")
config_lines.append("# Generated by KafkaConfigManager")
config_lines.append("")
# 按类别组织配置
categories = {
"Broker Settings": [
"broker.id", "listeners", "advertised.listeners",
"num.network.threads", "num.io.threads"
],
"Log Settings": [
"log.dirs", "num.partitions", "default.replication.factor",
"min.insync.replicas", "log.retention.hours",
"log.retention.bytes", "log.segment.bytes",
"log.retention.check.interval.ms"
],
"ZooKeeper Settings": [
"zookeeper.connect", "zookeeper.connection.timeout.ms"
],
"Performance Settings": [
"socket.send.buffer.bytes", "socket.receive.buffer.bytes",
"socket.request.max.bytes", "replica.fetch.max.bytes",
"message.max.bytes"
]
}
for category, keys in categories.items():
config_lines.append(f"# {category}")
for key in keys:
if key in default_config:
config_lines.append(f"{key}={default_config[key]}")
config_lines.append("")
# 添加其他配置
other_configs = {k: v for k, v in default_config.items()
if k not in sum(categories.values(), [])}
if other_configs:
config_lines.append("# Other Settings")
for key, value in other_configs.items():
config_lines.append(f"{key}={value}")
return "\n".join(config_lines)
def generate_zk_config(self, data_dir: str = "/var/zookeeper",
client_port: int = 2181) -> str:
"""生成zookeeper.properties配置"""
config = {
"dataDir": data_dir,
"clientPort": client_port,
"maxClientCnxns": 0,
"admin.enableServer": "false",
"tickTime": 2000,
"initLimit": 10,
"syncLimit": 5
}
config_lines = [
"# ZooKeeper Configuration",
"# Generated by KafkaConfigManager",
""
]
for key, value in config.items():
config_lines.append(f"{key}={value}")
return "\n".join(config_lines)
def write_configs(self, broker_id: int = 0,
custom_configs: Dict[str, Any] = None):
"""写入配置文件"""
# 生成server.properties
server_config = self.generate_server_config(broker_id, custom_configs)
with open(self.server_properties, 'w') as f:
f.write(server_config)
print(f"已生成: {self.server_properties}")
# 生成zookeeper.properties
zk_config = self.generate_zk_config()
with open(self.zk_properties, 'w') as f:
f.write(zk_config)
print(f"已生成: {self.zk_properties}")
def validate_config(self, config_file: str) -> Dict[str, Any]:
"""验证配置文件"""
issues = []
warnings = []
try:
with open(config_file, 'r') as f:
content = f.read()
# 检查必需的配置项
required_configs = [
"broker.id", "listeners", "log.dirs", "zookeeper.connect"
]
for config in required_configs:
if f"{config}=" not in content:
issues.append(f"缺少必需配置: {config}")
# 检查常见问题
if "log.dirs=/tmp" in content:
warnings.append("建议不要使用/tmp作为日志目录")
if "num.partitions=1" in content:
warnings.append("建议增加默认分区数以提高并行度")
return {
"valid": len(issues) == 0,
"issues": issues,
"warnings": warnings
}
except Exception as e:
return {
"valid": False,
"issues": [f"读取配置文件失败: {e}"],
"warnings": []
}
def optimize_for_environment(self, memory_gb: int,
cpu_cores: int) -> Dict[str, Any]:
"""根据环境优化配置"""
optimized_config = {}
# 根据内存调整
if memory_gb >= 8:
optimized_config["socket.send.buffer.bytes"] = 1048576
optimized_config["socket.receive.buffer.bytes"] = 1048576
optimized_config["replica.fetch.max.bytes"] = 10485760
# 根据CPU核心数调整
optimized_config["num.network.threads"] = min(cpu_cores, 8)
optimized_config["num.io.threads"] = cpu_cores * 2
# 生产环境建议
if memory_gb >= 16:
optimized_config["num.partitions"] = cpu_cores
optimized_config["default.replication.factor"] = 3
optimized_config["min.insync.replicas"] = 2
return optimized_config
# 使用示例
if __name__ == "__main__":
config_manager = KafkaConfigManager()
# 生成优化配置
optimized = config_manager.optimize_for_environment(memory_gb=8, cpu_cores=4)
# 写入配置文件
config_manager.write_configs(broker_id=0, custom_configs=optimized)
# 验证配置
validation = config_manager.validate_config(config_manager.server_properties)
if validation["valid"]:
print("✓ 配置文件验证通过")
else:
print("✗ 配置文件存在问题:")
for issue in validation["issues"]:
print(f" - {issue}")
if validation["warnings"]:
print("⚠ 警告:")
for warning in validation["warnings"]:
print(f" - {warning}")
2.3 启动和停止
服务管理脚本
import subprocess
import time
import signal
import os
from typing import Optional, Dict
class KafkaServiceManager:
"""Kafka服务管理器"""
def __init__(self, kafka_home: str = "/opt/kafka"):
self.kafka_home = kafka_home
self.bin_dir = os.path.join(kafka_home, "bin")
self.config_dir = os.path.join(kafka_home, "config")
self.log_dir = "/var/log/kafka"
# 进程ID文件
self.zk_pid_file = "/var/run/zookeeper.pid"
self.kafka_pid_file = "/var/run/kafka.pid"
def start_zookeeper(self, config_file: Optional[str] = None) -> bool:
"""启动ZooKeeper"""
if config_file is None:
config_file = os.path.join(self.config_dir, "zookeeper.properties")
if self.is_zookeeper_running():
print("ZooKeeper已经在运行")
return True
print("正在启动ZooKeeper...")
try:
# 启动ZooKeeper
zk_script = os.path.join(self.bin_dir, "zookeeper-server-start.sh")
log_file = os.path.join(self.log_dir, "zookeeper.log")
# 确保日志目录存在
os.makedirs(self.log_dir, exist_ok=True)
# 启动进程
with open(log_file, "w") as log:
process = subprocess.Popen(
[zk_script, config_file],
stdout=log,
stderr=subprocess.STDOUT,
preexec_fn=os.setsid
)
# 保存PID
with open(self.zk_pid_file, "w") as f:
f.write(str(process.pid))
# 等待启动
time.sleep(5)
if self.is_zookeeper_running():
print("✓ ZooKeeper启动成功")
return True
else:
print("✗ ZooKeeper启动失败")
return False
except Exception as e:
print(f"启动ZooKeeper时出错: {e}")
return False
def start_kafka(self, config_file: Optional[str] = None) -> bool:
"""启动Kafka"""
if config_file is None:
config_file = os.path.join(self.config_dir, "server.properties")
if self.is_kafka_running():
print("Kafka已经在运行")
return True
# 确保ZooKeeper正在运行
if not self.is_zookeeper_running():
print("ZooKeeper未运行,正在启动...")
if not self.start_zookeeper():
print("无法启动ZooKeeper,Kafka启动失败")
return False
print("正在启动Kafka...")
try:
# 启动Kafka
kafka_script = os.path.join(self.bin_dir, "kafka-server-start.sh")
log_file = os.path.join(self.log_dir, "kafka.log")
# 启动进程
with open(log_file, "w") as log:
process = subprocess.Popen(
[kafka_script, config_file],
stdout=log,
stderr=subprocess.STDOUT,
preexec_fn=os.setsid
)
# 保存PID
with open(self.kafka_pid_file, "w") as f:
f.write(str(process.pid))
# 等待启动
time.sleep(10)
if self.is_kafka_running():
print("✓ Kafka启动成功")
return True
else:
print("✗ Kafka启动失败")
return False
except Exception as e:
print(f"启动Kafka时出错: {e}")
return False
def stop_kafka(self) -> bool:
"""停止Kafka"""
if not self.is_kafka_running():
print("Kafka未运行")
return True
print("正在停止Kafka...")
try:
# 优雅停止
kafka_stop_script = os.path.join(self.bin_dir, "kafka-server-stop.sh")
subprocess.run([kafka_stop_script], timeout=30)
# 等待停止
time.sleep(5)
# 检查是否已停止
if not self.is_kafka_running():
print("✓ Kafka已停止")
if os.path.exists(self.kafka_pid_file):
os.remove(self.kafka_pid_file)
return True
# 强制停止
print("正在强制停止Kafka...")
self._force_stop_by_pid(self.kafka_pid_file)
return not self.is_kafka_running()
except Exception as e:
print(f"停止Kafka时出错: {e}")
return False
def stop_zookeeper(self) -> bool:
"""停止ZooKeeper"""
if not self.is_zookeeper_running():
print("ZooKeeper未运行")
return True
print("正在停止ZooKeeper...")
try:
# 优雅停止
zk_stop_script = os.path.join(self.bin_dir, "zookeeper-server-stop.sh")
subprocess.run([zk_stop_script], timeout=30)
# 等待停止
time.sleep(3)
# 检查是否已停止
if not self.is_zookeeper_running():
print("✓ ZooKeeper已停止")
if os.path.exists(self.zk_pid_file):
os.remove(self.zk_pid_file)
return True
# 强制停止
print("正在强制停止ZooKeeper...")
self._force_stop_by_pid(self.zk_pid_file)
return not self.is_zookeeper_running()
except Exception as e:
print(f"停止ZooKeeper时出错: {e}")
return False
def _force_stop_by_pid(self, pid_file: str):
"""通过PID强制停止进程"""
try:
if os.path.exists(pid_file):
with open(pid_file, "r") as f:
pid = int(f.read().strip())
os.kill(pid, signal.SIGTERM)
time.sleep(3)
# 如果还在运行,使用SIGKILL
try:
os.kill(pid, 0) # 检查进程是否存在
os.kill(pid, signal.SIGKILL)
print(f"强制终止进程 {pid}")
except ProcessLookupError:
pass # 进程已经不存在
os.remove(pid_file)
except Exception as e:
print(f"强制停止进程失败: {e}")
def is_zookeeper_running(self) -> bool:
"""检查ZooKeeper是否运行"""
return self._is_process_running(self.zk_pid_file, "zookeeper")
def is_kafka_running(self) -> bool:
"""检查Kafka是否运行"""
return self._is_process_running(self.kafka_pid_file, "kafka")
def _is_process_running(self, pid_file: str, process_name: str) -> bool:
"""检查进程是否运行"""
try:
# 检查PID文件
if os.path.exists(pid_file):
with open(pid_file, "r") as f:
pid = int(f.read().strip())
# 检查进程是否存在
try:
os.kill(pid, 0)
return True
except ProcessLookupError:
# 进程不存在,删除PID文件
os.remove(pid_file)
return False
# 通过进程名检查
result = subprocess.run(
["pgrep", "-f", process_name],
capture_output=True,
text=True
)
return result.returncode == 0 and result.stdout.strip()
except Exception:
return False
def restart_kafka(self) -> bool:
"""重启Kafka"""
print("正在重启Kafka...")
# 停止Kafka
if not self.stop_kafka():
print("停止Kafka失败")
return False
# 启动Kafka
return self.start_kafka()
def restart_all(self) -> bool:
"""重启所有服务"""
print("正在重启所有服务...")
# 停止服务
self.stop_kafka()
self.stop_zookeeper()
# 启动服务
if not self.start_zookeeper():
return False
return self.start_kafka()
def status(self) -> Dict[str, bool]:
"""获取服务状态"""
zk_running = self.is_zookeeper_running()
kafka_running = self.is_kafka_running()
print("=== 服务状态 ===")
print(f"ZooKeeper: {'运行中' if zk_running else '已停止'}")
print(f"Kafka: {'运行中' if kafka_running else '已停止'}")
return {
"zookeeper": zk_running,
"kafka": kafka_running
}
def get_logs(self, service: str, lines: int = 50) -> str:
"""获取服务日志"""
log_files = {
"zookeeper": os.path.join(self.log_dir, "zookeeper.log"),
"kafka": os.path.join(self.log_dir, "kafka.log")
}
if service not in log_files:
return f"未知服务: {service}"
log_file = log_files[service]
if not os.path.exists(log_file):
return f"日志文件不存在: {log_file}"
try:
result = subprocess.run(
["tail", "-n", str(lines), log_file],
capture_output=True,
text=True
)
return result.stdout
except Exception as e:
return f"读取日志失败: {e}"
# 使用示例
if __name__ == "__main__":
service_manager = KafkaServiceManager()
# 启动所有服务
print("启动Kafka集群...")
if service_manager.start_zookeeper():
if service_manager.start_kafka():
print("\n✓ 所有服务启动成功")
else:
print("\n✗ Kafka启动失败")
else:
print("\n✗ ZooKeeper启动失败")
# 检查状态
service_manager.status()
Systemd服务配置
# 创建ZooKeeper服务文件
sudo tee /etc/systemd/system/zookeeper.service > /dev/null <<EOF
[Unit]
Description=Apache ZooKeeper
Documentation=http://zookeeper.apache.org
Requires=network.target remote-fs.target
After=network.target remote-fs.target
[Service]
Type=forking
User=kafka
Group=kafka
Environment=JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64
ExecStart=/opt/kafka/bin/zookeeper-server-start.sh -daemon /opt/kafka/config/zookeeper.properties
ExecStop=/opt/kafka/bin/zookeeper-server-stop.sh
Restart=on-abnormal
TimeoutSec=60
RestartSec=5
[Install]
WantedBy=multi-user.target
EOF
# 创建Kafka服务文件
sudo tee /etc/systemd/system/kafka.service > /dev/null <<EOF
[Unit]
Description=Apache Kafka
Documentation=http://kafka.apache.org/documentation.html
Requires=zookeeper.service
After=zookeeper.service
[Service]
Type=forking
User=kafka
Group=kafka
Environment=JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64
ExecStart=/opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
ExecStop=/opt/kafka/bin/kafka-server-stop.sh
Restart=on-abnormal
TimeoutSec=60
RestartSec=5
[Install]
WantedBy=multi-user.target
EOF
# 重新加载systemd配置
sudo systemctl daemon-reload
# 启用服务
sudo systemctl enable zookeeper
sudo systemctl enable kafka
# 启动服务
sudo systemctl start zookeeper
sudo systemctl start kafka
# 检查状态
sudo systemctl status zookeeper
sudo systemctl status kafka
# 查看日志
sudo journalctl -u zookeeper -f
sudo journalctl -u kafka -f
2.4 集群部署
多节点配置
from typing import List, Dict
import ipaddress
class KafkaClusterDeployer:
"""Kafka集群部署器"""
def __init__(self):
self.nodes = []
self.zk_ensemble = []
def add_node(self, node_id: int, host: str, kafka_port: int = 9092,
zk_port: int = 2181, zk_peer_port: int = 2888,
zk_leader_port: int = 3888):
"""添加集群节点"""
node = {
"id": node_id,
"host": host,
"kafka_port": kafka_port,
"zk_port": zk_port,
"zk_peer_port": zk_peer_port,
"zk_leader_port": zk_leader_port
}
self.nodes.append(node)
self.zk_ensemble.append(f"{host}:{zk_port}")
def generate_kafka_config(self, node_id: int) -> str:
"""为指定节点生成Kafka配置"""
node = next((n for n in self.nodes if n["id"] == node_id), None)
if not node:
raise ValueError(f"节点 {node_id} 不存在")
zk_connect = ",".join(self.zk_ensemble)
config = f"""
# Kafka Server Configuration for Node {node_id}
# Generated by KafkaClusterDeployer
############################# Server Basics #############################
broker.id={node_id}
############################# Socket Server Settings #############################
listeners=PLAINTEXT://{node["host"]}:{node["kafka_port"]}
advertised.listeners=PLAINTEXT://{node["host"]}:{node["kafka_port"]}
# 网络线程数
num.network.threads=3
# I/O线程数
num.io.threads=8
# Socket缓冲区大小
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
############################# Log Basics #############################
log.dirs=/var/kafka-logs
# 默认分区数
num.partitions=3
# 恢复线程数
num.recovery.threads.per.data.dir=1
############################# Internal Topic Settings #############################
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
############################# Log Retention Policy #############################
# 日志保留时间(小时)
log.retention.hours=168
# 日志段大小
log.segment.bytes=1073741824
# 日志保留检查间隔
log.retention.check.interval.ms=300000
############################# Zookeeper #############################
zookeeper.connect={zk_connect}
zookeeper.connection.timeout.ms=18000
############################# Group Coordinator Settings #############################
group.initial.rebalance.delay.ms=0
############################# Replica Settings #############################
default.replication.factor=3
min.insync.replicas=2
# 副本获取大小
replica.fetch.max.bytes=1048576
# 副本滞后时间
replica.lag.time.max.ms=30000
############################# Producer Settings #############################
compression.type=snappy
############################# Consumer Settings #############################
fetch.min.bytes=1
fetch.max.wait.ms=500
"""
return config
def generate_zk_config(self, node_id: int) -> str:
"""为指定节点生成ZooKeeper配置"""
node = next((n for n in self.nodes if n["id"] == node_id), None)
if not node:
raise ValueError(f"节点 {node_id} 不存在")
config_lines = [
f"# ZooKeeper Configuration for Node {node_id}",
"# Generated by KafkaClusterDeployer",
"",
"# 基础设置",
"dataDir=/var/zookeeper",
f"clientPort={node['zk_port']}",
"maxClientCnxns=0",
"admin.enableServer=false",
"",
"# 集群设置",
"tickTime=2000",
"initLimit=10",
"syncLimit=5",
"autopurge.snapRetainCount=3",
"autopurge.purgeInterval=24",
"",
"# 集群节点"
]
# 添加集群节点配置
for n in self.nodes:
config_lines.append(
f"server.{n['id']}={n['host']}:{n['zk_peer_port']}:{n['zk_leader_port']}"
)
return "\n".join(config_lines)
def generate_myid_file(self, node_id: int) -> str:
"""生成ZooKeeper myid文件内容"""
return str(node_id)
def generate_deployment_script(self, node_id: int) -> str:
"""生成部署脚本"""
node = next((n for n in self.nodes if n["id"] == node_id), None)
if not node:
raise ValueError(f"节点 {node_id} 不存在")
script = f"""
#!/bin/bash
# Kafka集群节点 {node_id} 部署脚本
# 主机: {node['host']}
set -e
echo "开始部署Kafka集群节点 {node_id}..."
# 创建必要目录
sudo mkdir -p /var/kafka-logs
sudo mkdir -p /var/zookeeper
sudo mkdir -p /var/log/kafka
sudo mkdir -p /etc/kafka
# 设置权限
sudo chown -R kafka:kafka /var/kafka-logs
sudo chown -R kafka:kafka /var/zookeeper
sudo chown -R kafka:kafka /var/log/kafka
sudo chown -R kafka:kafka /etc/kafka
# 创建ZooKeeper myid文件
echo "{node_id}" | sudo tee /var/zookeeper/myid
sudo chown kafka:kafka /var/zookeeper/myid
# 复制配置文件
sudo cp server.properties /opt/kafka/config/
sudo cp zookeeper.properties /opt/kafka/config/
sudo chown kafka:kafka /opt/kafka/config/server.properties
sudo chown kafka:kafka /opt/kafka/config/zookeeper.properties
# 设置防火墙规则
sudo ufw allow {node['kafka_port']}/tcp
sudo ufw allow {node['zk_port']}/tcp
sudo ufw allow {node['zk_peer_port']}/tcp
sudo ufw allow {node['zk_leader_port']}/tcp
# 启动服务
sudo systemctl enable zookeeper
sudo systemctl enable kafka
sudo systemctl start zookeeper
# 等待ZooKeeper启动
sleep 10
sudo systemctl start kafka
# 检查服务状态
sudo systemctl status zookeeper
sudo systemctl status kafka
echo "节点 {node_id} 部署完成!"
echo "Kafka地址: {node['host']}:{node['kafka_port']}"
echo "ZooKeeper地址: {node['host']}:{node['zk_port']}"
"""
return script
def validate_cluster_config(self) -> Dict[str, List[str]]:
"""验证集群配置"""
issues = []
warnings = []
# 检查节点数量
if len(self.nodes) < 3:
warnings.append("建议至少使用3个节点以保证高可用性")
if len(self.nodes) % 2 == 0:
warnings.append("建议使用奇数个节点以避免脑裂问题")
# 检查节点ID
node_ids = [n["id"] for n in self.nodes]
if len(set(node_ids)) != len(node_ids):
issues.append("存在重复的节点ID")
# 检查主机地址
hosts = [n["host"] for n in self.nodes]
if len(set(hosts)) != len(hosts):
issues.append("存在重复的主机地址")
# 检查端口冲突
for node in self.nodes:
ports = [node["kafka_port"], node["zk_port"],
node["zk_peer_port"], node["zk_leader_port"]]
if len(set(ports)) != len(ports):
issues.append(f"节点 {node['id']} 存在端口冲突")
# 检查网络连通性(简化检查)
for node in self.nodes:
try:
ipaddress.ip_address(node["host"])
except ValueError:
# 可能是域名,这里不做详细检查
pass
return {
"issues": issues,
"warnings": warnings,
"valid": len(issues) == 0
}
def export_configs(self, output_dir: str = "./kafka-cluster"):
"""导出所有配置文件"""
import os
os.makedirs(output_dir, exist_ok=True)
for node in self.nodes:
node_id = node["id"]
node_dir = os.path.join(output_dir, f"node-{node_id}")
os.makedirs(node_dir, exist_ok=True)
# Kafka配置
kafka_config = self.generate_kafka_config(node_id)
with open(os.path.join(node_dir, "server.properties"), "w") as f:
f.write(kafka_config)
# ZooKeeper配置
zk_config = self.generate_zk_config(node_id)
with open(os.path.join(node_dir, "zookeeper.properties"), "w") as f:
f.write(zk_config)
# myid文件
myid_content = self.generate_myid_file(node_id)
with open(os.path.join(node_dir, "myid"), "w") as f:
f.write(myid_content)
# 部署脚本
deploy_script = self.generate_deployment_script(node_id)
script_path = os.path.join(node_dir, "deploy.sh")
with open(script_path, "w") as f:
f.write(deploy_script)
os.chmod(script_path, 0o755)
print(f"节点 {node_id} 配置已导出到: {node_dir}")
# 生成集群信息文件
cluster_info = self._generate_cluster_info()
with open(os.path.join(output_dir, "cluster-info.txt"), "w") as f:
f.write(cluster_info)
print(f"\n集群配置已导出到: {output_dir}")
def _generate_cluster_info(self) -> str:
"""生成集群信息"""
info_lines = [
"Kafka集群配置信息",
"=" * 50,
"",
f"节点数量: {len(self.nodes)}",
f"ZooKeeper集群: {','.join(self.zk_ensemble)}",
"",
"节点详情:"
]
for node in self.nodes:
info_lines.extend([
f" 节点 {node['id']}:",
f" 主机: {node['host']}",
f" Kafka端口: {node['kafka_port']}",
f" ZooKeeper端口: {node['zk_port']}",
f" ZK Peer端口: {node['zk_peer_port']}",
f" ZK Leader端口: {node['zk_leader_port']}",
""
])
info_lines.extend([
"部署步骤:",
"1. 将对应节点的配置文件复制到目标主机",
"2. 运行deploy.sh脚本进行部署",
"3. 验证集群状态",
"",
"验证命令:",
"kafka-topics.sh --bootstrap-server <any-broker> --list",
"kafka-broker-api-versions.sh --bootstrap-server <any-broker>"
])
return "\n".join(info_lines)
# 使用示例
if __name__ == "__main__":
# 创建3节点集群
deployer = KafkaClusterDeployer()
# 添加节点
deployer.add_node(1, "192.168.1.101")
deployer.add_node(2, "192.168.1.102")
deployer.add_node(3, "192.168.1.103")
# 验证配置
validation = deployer.validate_cluster_config()
if validation["valid"]:
print("✓ 集群配置验证通过")
# 导出配置
deployer.export_configs()
else:
print("✗ 集群配置存在问题:")
for issue in validation["issues"]:
print(f" - {issue}")
if validation["warnings"]:
print("⚠ 警告:")
for warning in validation["warnings"]:
print(f" - {warning}")
2.5 本章总结
核心知识点
环境准备
- 系统要求检查
- Java环境配置
- 用户和目录设置
单机安装
- 下载和解压Kafka
- 基础配置生成
- 服务启动和停止
集群部署
- 多节点配置
- ZooKeeper集群设置
- 网络和防火墙配置
服务管理
- Systemd服务配置
- 日志管理
- 状态监控
最佳实践
生产环境建议
- 使用专用用户运行Kafka
- 配置适当的JVM参数
- 设置日志轮转策略
- 监控磁盘空间使用
安全配置
- 配置防火墙规则
- 限制网络访问
- 定期备份配置文件
性能优化
- 根据硬件配置调整参数
- 使用SSD存储日志
- 优化网络设置
练习题
基础题
- 在本地环境安装单机Kafka
- 配置并启动ZooKeeper和Kafka
- 验证服务是否正常运行
进阶题
- 搭建3节点Kafka集群
- 配置Systemd服务管理
- 实现集群的滚动重启
实战题
- 设计生产环境的部署方案
- 编写自动化部署脚本
- 配置监控和告警系统
下一章我们将学习Kafka的基本操作,包括主题管理、消息生产和消费等核心功能。