2.1 环境准备

系统要求

import platform
import psutil
import subprocess
from dataclasses import dataclass
from typing import Dict, List, Optional

@dataclass
class SystemRequirement:
    """系统要求配置"""
    min_memory_gb: int = 4
    min_disk_gb: int = 20
    min_cpu_cores: int = 2
    java_version: str = "8+"
    supported_os: List[str] = None
    
    def __post_init__(self):
        if self.supported_os is None:
            self.supported_os = ["Linux", "macOS", "Windows"]

class SystemChecker:
    """系统环境检查器"""
    
    def __init__(self):
        self.requirements = SystemRequirement()
    
    def check_all(self) -> Dict[str, bool]:
        """检查所有系统要求"""
        results = {
            "操作系统": self.check_os(),
            "内存": self.check_memory(),
            "磁盘空间": self.check_disk(),
            "CPU核心数": self.check_cpu(),
            "Java环境": self.check_java()
        }
        
        print("=== 系统环境检查结果 ===")
        for item, passed in results.items():
            status = "✓ 通过" if passed else "✗ 不满足"
            print(f"{item}: {status}")
        
        return results
    
    def check_os(self) -> bool:
        """检查操作系统"""
        current_os = platform.system()
        supported = current_os in self.requirements.supported_os
        print(f"当前操作系统: {current_os} {platform.release()}")
        return supported
    
    def check_memory(self) -> bool:
        """检查内存"""
        memory_gb = psutil.virtual_memory().total / (1024**3)
        print(f"系统内存: {memory_gb:.1f} GB")
        return memory_gb >= self.requirements.min_memory_gb
    
    def check_disk(self) -> bool:
        """检查磁盘空间"""
        disk_usage = psutil.disk_usage('/')
        free_gb = disk_usage.free / (1024**3)
        print(f"可用磁盘空间: {free_gb:.1f} GB")
        return free_gb >= self.requirements.min_disk_gb
    
    def check_cpu(self) -> bool:
        """检查CPU核心数"""
        cpu_count = psutil.cpu_count(logical=False)
        print(f"CPU物理核心数: {cpu_count}")
        return cpu_count >= self.requirements.min_cpu_cores
    
    def check_java(self) -> bool:
        """检查Java环境"""
        try:
            result = subprocess.run(
                ['java', '-version'], 
                capture_output=True, 
                text=True, 
                stderr=subprocess.STDOUT
            )
            
            if result.returncode == 0:
                version_info = result.stdout
                print(f"Java版本信息: {version_info.split()[2]}")
                return True
            else:
                print("Java未安装或不在PATH中")
                return False
                
        except FileNotFoundError:
            print("Java未安装")
            return False
    
    def get_recommendations(self) -> List[str]:
        """获取环境配置建议"""
        recommendations = [
            "建议使用SSD硬盘以提高I/O性能",
            "确保网络延迟低于10ms(集群环境)",
            "配置足够的文件描述符限制(ulimit -n 65536)",
            "设置合适的JVM堆内存(建议系统内存的25-50%)",
            "关闭swap以避免性能问题",
            "使用专用的Kafka用户运行服务"
        ]
        return recommendations

# 使用示例
if __name__ == "__main__":
    checker = SystemChecker()
    results = checker.check_all()
    
    if all(results.values()):
        print("\n✓ 系统环境满足Kafka运行要求")
    else:
        print("\n✗ 系统环境需要调整")
        print("\n建议:")
        for rec in checker.get_recommendations():
            print(f"- {rec}")

Java环境配置

# Ubuntu/Debian安装OpenJDK
sudo apt update
sudo apt install openjdk-11-jdk

# CentOS/RHEL安装OpenJDK
sudo yum install java-11-openjdk-devel

# macOS使用Homebrew安装
brew install openjdk@11

# 验证Java安装
java -version
javac -version

# 设置JAVA_HOME环境变量
export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64
export PATH=$JAVA_HOME/bin:$PATH

# 将环境变量添加到配置文件
echo 'export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64' >> ~/.bashrc
echo 'export PATH=$JAVA_HOME/bin:$PATH' >> ~/.bashrc
source ~/.bashrc

2.2 单机安装

下载和安装

import os
import wget
import tarfile
import shutil
from pathlib import Path

class KafkaInstaller:
    """Kafka安装器"""
    
    def __init__(self, version: str = "2.8.2", scala_version: str = "2.13"):
        self.version = version
        self.scala_version = scala_version
        self.kafka_name = f"kafka_{scala_version}-{version}"
        self.download_url = f"https://downloads.apache.org/kafka/{version}/{self.kafka_name}.tgz"
        self.install_dir = "/opt/kafka"
        self.data_dir = "/var/kafka-logs"
        self.zk_data_dir = "/var/zookeeper"
    
    def download(self, download_dir: str = "/tmp") -> str:
        """下载Kafka"""
        download_path = os.path.join(download_dir, f"{self.kafka_name}.tgz")
        
        if os.path.exists(download_path):
            print(f"文件已存在: {download_path}")
            return download_path
        
        print(f"正在下载Kafka {self.version}...")
        try:
            wget.download(self.download_url, download_path)
            print(f"\n下载完成: {download_path}")
            return download_path
        except Exception as e:
            print(f"下载失败: {e}")
            raise
    
    def extract(self, archive_path: str, extract_to: str = "/opt") -> str:
        """解压Kafka"""
        print(f"正在解压到 {extract_to}...")
        
        with tarfile.open(archive_path, 'r:gz') as tar:
            tar.extractall(extract_to)
        
        extracted_path = os.path.join(extract_to, self.kafka_name)
        
        # 创建符号链接
        if os.path.exists(self.install_dir):
            os.remove(self.install_dir)
        
        os.symlink(extracted_path, self.install_dir)
        print(f"Kafka安装到: {self.install_dir}")
        
        return extracted_path
    
    def create_directories(self):
        """创建必要的目录"""
        directories = [
            self.data_dir,
            self.zk_data_dir,
            "/var/log/kafka",
            "/etc/kafka"
        ]
        
        for directory in directories:
            Path(directory).mkdir(parents=True, exist_ok=True)
            print(f"创建目录: {directory}")
    
    def setup_user(self):
        """创建Kafka用户"""
        try:
            # 创建kafka用户和组
            os.system("sudo groupadd -r kafka")
            os.system("sudo useradd -r -g kafka -d /opt/kafka -s /bin/bash kafka")
            
            # 设置目录权限
            os.system(f"sudo chown -R kafka:kafka {self.install_dir}")
            os.system(f"sudo chown -R kafka:kafka {self.data_dir}")
            os.system(f"sudo chown -R kafka:kafka {self.zk_data_dir}")
            os.system("sudo chown -R kafka:kafka /var/log/kafka")
            
            print("Kafka用户创建完成")
        except Exception as e:
            print(f"创建用户失败: {e}")
    
    def install(self) -> bool:
        """完整安装流程"""
        try:
            # 1. 下载
            archive_path = self.download()
            
            # 2. 解压
            self.extract(archive_path)
            
            # 3. 创建目录
            self.create_directories()
            
            # 4. 设置用户
            self.setup_user()
            
            # 5. 设置环境变量
            self.setup_environment()
            
            print("\n✓ Kafka安装完成")
            return True
            
        except Exception as e:
            print(f"\n✗ 安装失败: {e}")
            return False
    
    def setup_environment(self):
        """设置环境变量"""
        env_content = f"""
# Kafka Environment
export KAFKA_HOME={self.install_dir}
export PATH=$KAFKA_HOME/bin:$PATH
"""
        
        # 写入环境变量文件
        with open("/etc/environment.d/kafka.conf", "w") as f:
            f.write(env_content)
        
        print("环境变量配置完成")

# 使用示例
if __name__ == "__main__":
    installer = KafkaInstaller()
    success = installer.install()
    
    if success:
        print("\n可以开始配置Kafka了")
    else:
        print("\n请检查错误信息并重试")

基础配置

import configparser
from typing import Dict, Any

class KafkaConfigManager:
    """Kafka配置管理器"""
    
    def __init__(self, config_dir: str = "/opt/kafka/config"):
        self.config_dir = config_dir
        self.server_properties = os.path.join(config_dir, "server.properties")
        self.zk_properties = os.path.join(config_dir, "zookeeper.properties")
    
    def generate_server_config(self, broker_id: int = 0, 
                              custom_configs: Dict[str, Any] = None) -> str:
        """生成server.properties配置"""
        
        default_config = {
            # Broker基础配置
            "broker.id": broker_id,
            "listeners": "PLAINTEXT://localhost:9092",
            "advertised.listeners": "PLAINTEXT://localhost:9092",
            "num.network.threads": 3,
            "num.io.threads": 8,
            
            # 日志配置
            "log.dirs": "/var/kafka-logs",
            "num.partitions": 1,
            "default.replication.factor": 1,
            "min.insync.replicas": 1,
            
            # 日志保留策略
            "log.retention.hours": 168,  # 7天
            "log.retention.bytes": 1073741824,  # 1GB
            "log.segment.bytes": 1073741824,  # 1GB
            "log.retention.check.interval.ms": 300000,  # 5分钟
            
            # ZooKeeper配置
            "zookeeper.connect": "localhost:2181",
            "zookeeper.connection.timeout.ms": 18000,
            
            # 其他配置
            "group.initial.rebalance.delay.ms": 0,
            "offsets.topic.replication.factor": 1,
            "transaction.state.log.replication.factor": 1,
            "transaction.state.log.min.isr": 1,
            
            # 性能调优
            "socket.send.buffer.bytes": 102400,
            "socket.receive.buffer.bytes": 102400,
            "socket.request.max.bytes": 104857600,
            "replica.fetch.max.bytes": 1048576,
            "message.max.bytes": 1000000
        }
        
        # 合并自定义配置
        if custom_configs:
            default_config.update(custom_configs)
        
        # 生成配置文件内容
        config_lines = []
        config_lines.append("# Kafka Server Configuration")
        config_lines.append("# Generated by KafkaConfigManager")
        config_lines.append("")
        
        # 按类别组织配置
        categories = {
            "Broker Settings": [
                "broker.id", "listeners", "advertised.listeners",
                "num.network.threads", "num.io.threads"
            ],
            "Log Settings": [
                "log.dirs", "num.partitions", "default.replication.factor",
                "min.insync.replicas", "log.retention.hours",
                "log.retention.bytes", "log.segment.bytes",
                "log.retention.check.interval.ms"
            ],
            "ZooKeeper Settings": [
                "zookeeper.connect", "zookeeper.connection.timeout.ms"
            ],
            "Performance Settings": [
                "socket.send.buffer.bytes", "socket.receive.buffer.bytes",
                "socket.request.max.bytes", "replica.fetch.max.bytes",
                "message.max.bytes"
            ]
        }
        
        for category, keys in categories.items():
            config_lines.append(f"# {category}")
            for key in keys:
                if key in default_config:
                    config_lines.append(f"{key}={default_config[key]}")
            config_lines.append("")
        
        # 添加其他配置
        other_configs = {k: v for k, v in default_config.items() 
                        if k not in sum(categories.values(), [])}
        
        if other_configs:
            config_lines.append("# Other Settings")
            for key, value in other_configs.items():
                config_lines.append(f"{key}={value}")
        
        return "\n".join(config_lines)
    
    def generate_zk_config(self, data_dir: str = "/var/zookeeper",
                          client_port: int = 2181) -> str:
        """生成zookeeper.properties配置"""
        
        config = {
            "dataDir": data_dir,
            "clientPort": client_port,
            "maxClientCnxns": 0,
            "admin.enableServer": "false",
            "tickTime": 2000,
            "initLimit": 10,
            "syncLimit": 5
        }
        
        config_lines = [
            "# ZooKeeper Configuration",
            "# Generated by KafkaConfigManager",
            ""
        ]
        
        for key, value in config.items():
            config_lines.append(f"{key}={value}")
        
        return "\n".join(config_lines)
    
    def write_configs(self, broker_id: int = 0, 
                     custom_configs: Dict[str, Any] = None):
        """写入配置文件"""
        # 生成server.properties
        server_config = self.generate_server_config(broker_id, custom_configs)
        with open(self.server_properties, 'w') as f:
            f.write(server_config)
        print(f"已生成: {self.server_properties}")
        
        # 生成zookeeper.properties
        zk_config = self.generate_zk_config()
        with open(self.zk_properties, 'w') as f:
            f.write(zk_config)
        print(f"已生成: {self.zk_properties}")
    
    def validate_config(self, config_file: str) -> Dict[str, Any]:
        """验证配置文件"""
        issues = []
        warnings = []
        
        try:
            with open(config_file, 'r') as f:
                content = f.read()
            
            # 检查必需的配置项
            required_configs = [
                "broker.id", "listeners", "log.dirs", "zookeeper.connect"
            ]
            
            for config in required_configs:
                if f"{config}=" not in content:
                    issues.append(f"缺少必需配置: {config}")
            
            # 检查常见问题
            if "log.dirs=/tmp" in content:
                warnings.append("建议不要使用/tmp作为日志目录")
            
            if "num.partitions=1" in content:
                warnings.append("建议增加默认分区数以提高并行度")
            
            return {
                "valid": len(issues) == 0,
                "issues": issues,
                "warnings": warnings
            }
            
        except Exception as e:
            return {
                "valid": False,
                "issues": [f"读取配置文件失败: {e}"],
                "warnings": []
            }
    
    def optimize_for_environment(self, memory_gb: int, 
                               cpu_cores: int) -> Dict[str, Any]:
        """根据环境优化配置"""
        optimized_config = {}
        
        # 根据内存调整
        if memory_gb >= 8:
            optimized_config["socket.send.buffer.bytes"] = 1048576
            optimized_config["socket.receive.buffer.bytes"] = 1048576
            optimized_config["replica.fetch.max.bytes"] = 10485760
        
        # 根据CPU核心数调整
        optimized_config["num.network.threads"] = min(cpu_cores, 8)
        optimized_config["num.io.threads"] = cpu_cores * 2
        
        # 生产环境建议
        if memory_gb >= 16:
            optimized_config["num.partitions"] = cpu_cores
            optimized_config["default.replication.factor"] = 3
            optimized_config["min.insync.replicas"] = 2
        
        return optimized_config

# 使用示例
if __name__ == "__main__":
    config_manager = KafkaConfigManager()
    
    # 生成优化配置
    optimized = config_manager.optimize_for_environment(memory_gb=8, cpu_cores=4)
    
    # 写入配置文件
    config_manager.write_configs(broker_id=0, custom_configs=optimized)
    
    # 验证配置
    validation = config_manager.validate_config(config_manager.server_properties)
    
    if validation["valid"]:
        print("✓ 配置文件验证通过")
    else:
        print("✗ 配置文件存在问题:")
        for issue in validation["issues"]:
            print(f"  - {issue}")
    
    if validation["warnings"]:
        print("⚠ 警告:")
        for warning in validation["warnings"]:
            print(f"  - {warning}")

2.3 启动和停止

服务管理脚本

import subprocess
import time
import signal
import os
from typing import Optional, Dict

class KafkaServiceManager:
    """Kafka服务管理器"""
    
    def __init__(self, kafka_home: str = "/opt/kafka"):
        self.kafka_home = kafka_home
        self.bin_dir = os.path.join(kafka_home, "bin")
        self.config_dir = os.path.join(kafka_home, "config")
        self.log_dir = "/var/log/kafka"
        
        # 进程ID文件
        self.zk_pid_file = "/var/run/zookeeper.pid"
        self.kafka_pid_file = "/var/run/kafka.pid"
    
    def start_zookeeper(self, config_file: Optional[str] = None) -> bool:
        """启动ZooKeeper"""
        if config_file is None:
            config_file = os.path.join(self.config_dir, "zookeeper.properties")
        
        if self.is_zookeeper_running():
            print("ZooKeeper已经在运行")
            return True
        
        print("正在启动ZooKeeper...")
        
        try:
            # 启动ZooKeeper
            zk_script = os.path.join(self.bin_dir, "zookeeper-server-start.sh")
            log_file = os.path.join(self.log_dir, "zookeeper.log")
            
            # 确保日志目录存在
            os.makedirs(self.log_dir, exist_ok=True)
            
            # 启动进程
            with open(log_file, "w") as log:
                process = subprocess.Popen(
                    [zk_script, config_file],
                    stdout=log,
                    stderr=subprocess.STDOUT,
                    preexec_fn=os.setsid
                )
            
            # 保存PID
            with open(self.zk_pid_file, "w") as f:
                f.write(str(process.pid))
            
            # 等待启动
            time.sleep(5)
            
            if self.is_zookeeper_running():
                print("✓ ZooKeeper启动成功")
                return True
            else:
                print("✗ ZooKeeper启动失败")
                return False
                
        except Exception as e:
            print(f"启动ZooKeeper时出错: {e}")
            return False
    
    def start_kafka(self, config_file: Optional[str] = None) -> bool:
        """启动Kafka"""
        if config_file is None:
            config_file = os.path.join(self.config_dir, "server.properties")
        
        if self.is_kafka_running():
            print("Kafka已经在运行")
            return True
        
        # 确保ZooKeeper正在运行
        if not self.is_zookeeper_running():
            print("ZooKeeper未运行,正在启动...")
            if not self.start_zookeeper():
                print("无法启动ZooKeeper,Kafka启动失败")
                return False
        
        print("正在启动Kafka...")
        
        try:
            # 启动Kafka
            kafka_script = os.path.join(self.bin_dir, "kafka-server-start.sh")
            log_file = os.path.join(self.log_dir, "kafka.log")
            
            # 启动进程
            with open(log_file, "w") as log:
                process = subprocess.Popen(
                    [kafka_script, config_file],
                    stdout=log,
                    stderr=subprocess.STDOUT,
                    preexec_fn=os.setsid
                )
            
            # 保存PID
            with open(self.kafka_pid_file, "w") as f:
                f.write(str(process.pid))
            
            # 等待启动
            time.sleep(10)
            
            if self.is_kafka_running():
                print("✓ Kafka启动成功")
                return True
            else:
                print("✗ Kafka启动失败")
                return False
                
        except Exception as e:
            print(f"启动Kafka时出错: {e}")
            return False
    
    def stop_kafka(self) -> bool:
        """停止Kafka"""
        if not self.is_kafka_running():
            print("Kafka未运行")
            return True
        
        print("正在停止Kafka...")
        
        try:
            # 优雅停止
            kafka_stop_script = os.path.join(self.bin_dir, "kafka-server-stop.sh")
            subprocess.run([kafka_stop_script], timeout=30)
            
            # 等待停止
            time.sleep(5)
            
            # 检查是否已停止
            if not self.is_kafka_running():
                print("✓ Kafka已停止")
                if os.path.exists(self.kafka_pid_file):
                    os.remove(self.kafka_pid_file)
                return True
            
            # 强制停止
            print("正在强制停止Kafka...")
            self._force_stop_by_pid(self.kafka_pid_file)
            
            return not self.is_kafka_running()
            
        except Exception as e:
            print(f"停止Kafka时出错: {e}")
            return False
    
    def stop_zookeeper(self) -> bool:
        """停止ZooKeeper"""
        if not self.is_zookeeper_running():
            print("ZooKeeper未运行")
            return True
        
        print("正在停止ZooKeeper...")
        
        try:
            # 优雅停止
            zk_stop_script = os.path.join(self.bin_dir, "zookeeper-server-stop.sh")
            subprocess.run([zk_stop_script], timeout=30)
            
            # 等待停止
            time.sleep(3)
            
            # 检查是否已停止
            if not self.is_zookeeper_running():
                print("✓ ZooKeeper已停止")
                if os.path.exists(self.zk_pid_file):
                    os.remove(self.zk_pid_file)
                return True
            
            # 强制停止
            print("正在强制停止ZooKeeper...")
            self._force_stop_by_pid(self.zk_pid_file)
            
            return not self.is_zookeeper_running()
            
        except Exception as e:
            print(f"停止ZooKeeper时出错: {e}")
            return False
    
    def _force_stop_by_pid(self, pid_file: str):
        """通过PID强制停止进程"""
        try:
            if os.path.exists(pid_file):
                with open(pid_file, "r") as f:
                    pid = int(f.read().strip())
                
                os.kill(pid, signal.SIGTERM)
                time.sleep(3)
                
                # 如果还在运行,使用SIGKILL
                try:
                    os.kill(pid, 0)  # 检查进程是否存在
                    os.kill(pid, signal.SIGKILL)
                    print(f"强制终止进程 {pid}")
                except ProcessLookupError:
                    pass  # 进程已经不存在
                
                os.remove(pid_file)
                
        except Exception as e:
            print(f"强制停止进程失败: {e}")
    
    def is_zookeeper_running(self) -> bool:
        """检查ZooKeeper是否运行"""
        return self._is_process_running(self.zk_pid_file, "zookeeper")
    
    def is_kafka_running(self) -> bool:
        """检查Kafka是否运行"""
        return self._is_process_running(self.kafka_pid_file, "kafka")
    
    def _is_process_running(self, pid_file: str, process_name: str) -> bool:
        """检查进程是否运行"""
        try:
            # 检查PID文件
            if os.path.exists(pid_file):
                with open(pid_file, "r") as f:
                    pid = int(f.read().strip())
                
                # 检查进程是否存在
                try:
                    os.kill(pid, 0)
                    return True
                except ProcessLookupError:
                    # 进程不存在,删除PID文件
                    os.remove(pid_file)
                    return False
            
            # 通过进程名检查
            result = subprocess.run(
                ["pgrep", "-f", process_name],
                capture_output=True,
                text=True
            )
            
            return result.returncode == 0 and result.stdout.strip()
            
        except Exception:
            return False
    
    def restart_kafka(self) -> bool:
        """重启Kafka"""
        print("正在重启Kafka...")
        
        # 停止Kafka
        if not self.stop_kafka():
            print("停止Kafka失败")
            return False
        
        # 启动Kafka
        return self.start_kafka()
    
    def restart_all(self) -> bool:
        """重启所有服务"""
        print("正在重启所有服务...")
        
        # 停止服务
        self.stop_kafka()
        self.stop_zookeeper()
        
        # 启动服务
        if not self.start_zookeeper():
            return False
        
        return self.start_kafka()
    
    def status(self) -> Dict[str, bool]:
        """获取服务状态"""
        zk_running = self.is_zookeeper_running()
        kafka_running = self.is_kafka_running()
        
        print("=== 服务状态 ===")
        print(f"ZooKeeper: {'运行中' if zk_running else '已停止'}")
        print(f"Kafka: {'运行中' if kafka_running else '已停止'}")
        
        return {
            "zookeeper": zk_running,
            "kafka": kafka_running
        }
    
    def get_logs(self, service: str, lines: int = 50) -> str:
        """获取服务日志"""
        log_files = {
            "zookeeper": os.path.join(self.log_dir, "zookeeper.log"),
            "kafka": os.path.join(self.log_dir, "kafka.log")
        }
        
        if service not in log_files:
            return f"未知服务: {service}"
        
        log_file = log_files[service]
        
        if not os.path.exists(log_file):
            return f"日志文件不存在: {log_file}"
        
        try:
            result = subprocess.run(
                ["tail", "-n", str(lines), log_file],
                capture_output=True,
                text=True
            )
            return result.stdout
        except Exception as e:
            return f"读取日志失败: {e}"

# 使用示例
if __name__ == "__main__":
    service_manager = KafkaServiceManager()
    
    # 启动所有服务
    print("启动Kafka集群...")
    if service_manager.start_zookeeper():
        if service_manager.start_kafka():
            print("\n✓ 所有服务启动成功")
        else:
            print("\n✗ Kafka启动失败")
    else:
        print("\n✗ ZooKeeper启动失败")
    
    # 检查状态
    service_manager.status()

Systemd服务配置

# 创建ZooKeeper服务文件
sudo tee /etc/systemd/system/zookeeper.service > /dev/null <<EOF
[Unit]
Description=Apache ZooKeeper
Documentation=http://zookeeper.apache.org
Requires=network.target remote-fs.target
After=network.target remote-fs.target

[Service]
Type=forking
User=kafka
Group=kafka
Environment=JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64
ExecStart=/opt/kafka/bin/zookeeper-server-start.sh -daemon /opt/kafka/config/zookeeper.properties
ExecStop=/opt/kafka/bin/zookeeper-server-stop.sh
Restart=on-abnormal
TimeoutSec=60
RestartSec=5

[Install]
WantedBy=multi-user.target
EOF

# 创建Kafka服务文件
sudo tee /etc/systemd/system/kafka.service > /dev/null <<EOF
[Unit]
Description=Apache Kafka
Documentation=http://kafka.apache.org/documentation.html
Requires=zookeeper.service
After=zookeeper.service

[Service]
Type=forking
User=kafka
Group=kafka
Environment=JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64
ExecStart=/opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
ExecStop=/opt/kafka/bin/kafka-server-stop.sh
Restart=on-abnormal
TimeoutSec=60
RestartSec=5

[Install]
WantedBy=multi-user.target
EOF

# 重新加载systemd配置
sudo systemctl daemon-reload

# 启用服务
sudo systemctl enable zookeeper
sudo systemctl enable kafka

# 启动服务
sudo systemctl start zookeeper
sudo systemctl start kafka

# 检查状态
sudo systemctl status zookeeper
sudo systemctl status kafka

# 查看日志
sudo journalctl -u zookeeper -f
sudo journalctl -u kafka -f

2.4 集群部署

多节点配置

from typing import List, Dict
import ipaddress

class KafkaClusterDeployer:
    """Kafka集群部署器"""
    
    def __init__(self):
        self.nodes = []
        self.zk_ensemble = []
    
    def add_node(self, node_id: int, host: str, kafka_port: int = 9092, 
                 zk_port: int = 2181, zk_peer_port: int = 2888, 
                 zk_leader_port: int = 3888):
        """添加集群节点"""
        node = {
            "id": node_id,
            "host": host,
            "kafka_port": kafka_port,
            "zk_port": zk_port,
            "zk_peer_port": zk_peer_port,
            "zk_leader_port": zk_leader_port
        }
        self.nodes.append(node)
        self.zk_ensemble.append(f"{host}:{zk_port}")
    
    def generate_kafka_config(self, node_id: int) -> str:
        """为指定节点生成Kafka配置"""
        node = next((n for n in self.nodes if n["id"] == node_id), None)
        if not node:
            raise ValueError(f"节点 {node_id} 不存在")
        
        zk_connect = ",".join(self.zk_ensemble)
        
        config = f"""
# Kafka Server Configuration for Node {node_id}
# Generated by KafkaClusterDeployer

############################# Server Basics #############################
broker.id={node_id}

############################# Socket Server Settings #############################
listeners=PLAINTEXT://{node["host"]}:{node["kafka_port"]}
advertised.listeners=PLAINTEXT://{node["host"]}:{node["kafka_port"]}

# 网络线程数
num.network.threads=3

# I/O线程数
num.io.threads=8

# Socket缓冲区大小
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600

############################# Log Basics #############################
log.dirs=/var/kafka-logs

# 默认分区数
num.partitions=3

# 恢复线程数
num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings #############################
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2

############################# Log Retention Policy #############################
# 日志保留时间(小时)
log.retention.hours=168

# 日志段大小
log.segment.bytes=1073741824

# 日志保留检查间隔
log.retention.check.interval.ms=300000

############################# Zookeeper #############################
zookeeper.connect={zk_connect}
zookeeper.connection.timeout.ms=18000

############################# Group Coordinator Settings #############################
group.initial.rebalance.delay.ms=0

############################# Replica Settings #############################
default.replication.factor=3
min.insync.replicas=2

# 副本获取大小
replica.fetch.max.bytes=1048576

# 副本滞后时间
replica.lag.time.max.ms=30000

############################# Producer Settings #############################
compression.type=snappy

############################# Consumer Settings #############################
fetch.min.bytes=1
fetch.max.wait.ms=500
"""
        return config
    
    def generate_zk_config(self, node_id: int) -> str:
        """为指定节点生成ZooKeeper配置"""
        node = next((n for n in self.nodes if n["id"] == node_id), None)
        if not node:
            raise ValueError(f"节点 {node_id} 不存在")
        
        config_lines = [
            f"# ZooKeeper Configuration for Node {node_id}",
            "# Generated by KafkaClusterDeployer",
            "",
            "# 基础设置",
            "dataDir=/var/zookeeper",
            f"clientPort={node['zk_port']}",
            "maxClientCnxns=0",
            "admin.enableServer=false",
            "",
            "# 集群设置",
            "tickTime=2000",
            "initLimit=10",
            "syncLimit=5",
            "autopurge.snapRetainCount=3",
            "autopurge.purgeInterval=24",
            "",
            "# 集群节点"
        ]
        
        # 添加集群节点配置
        for n in self.nodes:
            config_lines.append(
                f"server.{n['id']}={n['host']}:{n['zk_peer_port']}:{n['zk_leader_port']}"
            )
        
        return "\n".join(config_lines)
    
    def generate_myid_file(self, node_id: int) -> str:
        """生成ZooKeeper myid文件内容"""
        return str(node_id)
    
    def generate_deployment_script(self, node_id: int) -> str:
        """生成部署脚本"""
        node = next((n for n in self.nodes if n["id"] == node_id), None)
        if not node:
            raise ValueError(f"节点 {node_id} 不存在")
        
        script = f"""
#!/bin/bash
# Kafka集群节点 {node_id} 部署脚本
# 主机: {node['host']}

set -e

echo "开始部署Kafka集群节点 {node_id}..."

# 创建必要目录
sudo mkdir -p /var/kafka-logs
sudo mkdir -p /var/zookeeper
sudo mkdir -p /var/log/kafka
sudo mkdir -p /etc/kafka

# 设置权限
sudo chown -R kafka:kafka /var/kafka-logs
sudo chown -R kafka:kafka /var/zookeeper
sudo chown -R kafka:kafka /var/log/kafka
sudo chown -R kafka:kafka /etc/kafka

# 创建ZooKeeper myid文件
echo "{node_id}" | sudo tee /var/zookeeper/myid
sudo chown kafka:kafka /var/zookeeper/myid

# 复制配置文件
sudo cp server.properties /opt/kafka/config/
sudo cp zookeeper.properties /opt/kafka/config/
sudo chown kafka:kafka /opt/kafka/config/server.properties
sudo chown kafka:kafka /opt/kafka/config/zookeeper.properties

# 设置防火墙规则
sudo ufw allow {node['kafka_port']}/tcp
sudo ufw allow {node['zk_port']}/tcp
sudo ufw allow {node['zk_peer_port']}/tcp
sudo ufw allow {node['zk_leader_port']}/tcp

# 启动服务
sudo systemctl enable zookeeper
sudo systemctl enable kafka
sudo systemctl start zookeeper

# 等待ZooKeeper启动
sleep 10

sudo systemctl start kafka

# 检查服务状态
sudo systemctl status zookeeper
sudo systemctl status kafka

echo "节点 {node_id} 部署完成!"
echo "Kafka地址: {node['host']}:{node['kafka_port']}"
echo "ZooKeeper地址: {node['host']}:{node['zk_port']}"
"""
        return script
    
    def validate_cluster_config(self) -> Dict[str, List[str]]:
        """验证集群配置"""
        issues = []
        warnings = []
        
        # 检查节点数量
        if len(self.nodes) < 3:
            warnings.append("建议至少使用3个节点以保证高可用性")
        
        if len(self.nodes) % 2 == 0:
            warnings.append("建议使用奇数个节点以避免脑裂问题")
        
        # 检查节点ID
        node_ids = [n["id"] for n in self.nodes]
        if len(set(node_ids)) != len(node_ids):
            issues.append("存在重复的节点ID")
        
        # 检查主机地址
        hosts = [n["host"] for n in self.nodes]
        if len(set(hosts)) != len(hosts):
            issues.append("存在重复的主机地址")
        
        # 检查端口冲突
        for node in self.nodes:
            ports = [node["kafka_port"], node["zk_port"], 
                    node["zk_peer_port"], node["zk_leader_port"]]
            if len(set(ports)) != len(ports):
                issues.append(f"节点 {node['id']} 存在端口冲突")
        
        # 检查网络连通性(简化检查)
        for node in self.nodes:
            try:
                ipaddress.ip_address(node["host"])
            except ValueError:
                # 可能是域名,这里不做详细检查
                pass
        
        return {
            "issues": issues,
            "warnings": warnings,
            "valid": len(issues) == 0
        }
    
    def export_configs(self, output_dir: str = "./kafka-cluster"):
        """导出所有配置文件"""
        import os
        
        os.makedirs(output_dir, exist_ok=True)
        
        for node in self.nodes:
            node_id = node["id"]
            node_dir = os.path.join(output_dir, f"node-{node_id}")
            os.makedirs(node_dir, exist_ok=True)
            
            # Kafka配置
            kafka_config = self.generate_kafka_config(node_id)
            with open(os.path.join(node_dir, "server.properties"), "w") as f:
                f.write(kafka_config)
            
            # ZooKeeper配置
            zk_config = self.generate_zk_config(node_id)
            with open(os.path.join(node_dir, "zookeeper.properties"), "w") as f:
                f.write(zk_config)
            
            # myid文件
            myid_content = self.generate_myid_file(node_id)
            with open(os.path.join(node_dir, "myid"), "w") as f:
                f.write(myid_content)
            
            # 部署脚本
            deploy_script = self.generate_deployment_script(node_id)
            script_path = os.path.join(node_dir, "deploy.sh")
            with open(script_path, "w") as f:
                f.write(deploy_script)
            os.chmod(script_path, 0o755)
            
            print(f"节点 {node_id} 配置已导出到: {node_dir}")
        
        # 生成集群信息文件
        cluster_info = self._generate_cluster_info()
        with open(os.path.join(output_dir, "cluster-info.txt"), "w") as f:
            f.write(cluster_info)
        
        print(f"\n集群配置已导出到: {output_dir}")
    
    def _generate_cluster_info(self) -> str:
        """生成集群信息"""
        info_lines = [
            "Kafka集群配置信息",
            "=" * 50,
            "",
            f"节点数量: {len(self.nodes)}",
            f"ZooKeeper集群: {','.join(self.zk_ensemble)}",
            "",
            "节点详情:"
        ]
        
        for node in self.nodes:
            info_lines.extend([
                f"  节点 {node['id']}:",
                f"    主机: {node['host']}",
                f"    Kafka端口: {node['kafka_port']}",
                f"    ZooKeeper端口: {node['zk_port']}",
                f"    ZK Peer端口: {node['zk_peer_port']}",
                f"    ZK Leader端口: {node['zk_leader_port']}",
                ""
            ])
        
        info_lines.extend([
            "部署步骤:",
            "1. 将对应节点的配置文件复制到目标主机",
            "2. 运行deploy.sh脚本进行部署",
            "3. 验证集群状态",
            "",
            "验证命令:",
            "kafka-topics.sh --bootstrap-server <any-broker> --list",
            "kafka-broker-api-versions.sh --bootstrap-server <any-broker>"
        ])
        
        return "\n".join(info_lines)

# 使用示例
if __name__ == "__main__":
    # 创建3节点集群
    deployer = KafkaClusterDeployer()
    
    # 添加节点
    deployer.add_node(1, "192.168.1.101")
    deployer.add_node(2, "192.168.1.102")
    deployer.add_node(3, "192.168.1.103")
    
    # 验证配置
    validation = deployer.validate_cluster_config()
    
    if validation["valid"]:
        print("✓ 集群配置验证通过")
        
        # 导出配置
        deployer.export_configs()
        
    else:
        print("✗ 集群配置存在问题:")
        for issue in validation["issues"]:
            print(f"  - {issue}")
    
    if validation["warnings"]:
        print("⚠ 警告:")
        for warning in validation["warnings"]:
            print(f"  - {warning}")

2.5 本章总结

核心知识点

  1. 环境准备

    • 系统要求检查
    • Java环境配置
    • 用户和目录设置
  2. 单机安装

    • 下载和解压Kafka
    • 基础配置生成
    • 服务启动和停止
  3. 集群部署

    • 多节点配置
    • ZooKeeper集群设置
    • 网络和防火墙配置
  4. 服务管理

    • Systemd服务配置
    • 日志管理
    • 状态监控

最佳实践

  1. 生产环境建议

    • 使用专用用户运行Kafka
    • 配置适当的JVM参数
    • 设置日志轮转策略
    • 监控磁盘空间使用
  2. 安全配置

    • 配置防火墙规则
    • 限制网络访问
    • 定期备份配置文件
  3. 性能优化

    • 根据硬件配置调整参数
    • 使用SSD存储日志
    • 优化网络设置

练习题

  1. 基础题

    • 在本地环境安装单机Kafka
    • 配置并启动ZooKeeper和Kafka
    • 验证服务是否正常运行
  2. 进阶题

    • 搭建3节点Kafka集群
    • 配置Systemd服务管理
    • 实现集群的滚动重启
  3. 实战题

    • 设计生产环境的部署方案
    • 编写自动化部署脚本
    • 配置监控和告警系统

下一章我们将学习Kafka的基本操作,包括主题管理、消息生产和消费等核心功能。