1.1 什么是Apache Kafka

Apache Kafka是一个分布式流处理平台,最初由LinkedIn开发,现在是Apache软件基金会的顶级项目。Kafka主要用于构建实时数据管道和流应用程序。

核心特性

  • 高吞吐量:能够处理每秒数百万条消息
  • 低延迟:毫秒级的消息传递延迟
  • 持久性:消息持久化存储到磁盘
  • 可扩展性:水平扩展能力
  • 容错性:分布式架构提供高可用性
  • 实时性:支持实时流处理

应用场景

# 常见应用场景示例
class KafkaUseCases:
    def __init__(self):
        self.use_cases = {
            "消息队列": "解耦系统组件,异步处理",
            "日志收集": "集中收集和处理应用日志",
            "流处理": "实时数据流分析和处理",
            "事件溯源": "记录系统状态变化事件",
            "数据集成": "连接不同系统和数据源",
            "活动跟踪": "用户行为和系统活动监控"
        }
    
    def get_scenario_example(self, scenario):
        examples = {
            "电商系统": {
                "订单处理": "订单创建 -> 库存扣减 -> 支付处理 -> 物流配送",
                "用户行为": "页面浏览 -> 商品点击 -> 购买转化分析",
                "实时推荐": "用户行为数据 -> 实时推荐算法 -> 个性化展示"
            },
            "金融系统": {
                "风控系统": "交易数据 -> 实时风险评估 -> 异常告警",
                "数据同步": "核心系统 -> 数据仓库 -> 报表系统"
            },
            "物联网": {
                "设备监控": "传感器数据 -> 实时分析 -> 预警系统",
                "数据采集": "设备数据 -> 数据清洗 -> 存储分析"
            }
        }
        return examples.get(scenario, {})

1.2 Kafka核心概念

基本术语

from dataclasses import dataclass
from typing import List, Dict
from enum import Enum

class MessageType(Enum):
    TEXT = "text"
    JSON = "json"
    AVRO = "avro"
    BINARY = "binary"

@dataclass
class KafkaMessage:
    """Kafka消息结构"""
    key: str = None
    value: str = None
    headers: Dict[str, str] = None
    timestamp: int = None
    partition: int = None
    offset: int = None
    
    def __post_init__(self):
        if self.headers is None:
            self.headers = {}

@dataclass
class Topic:
    """主题配置"""
    name: str
    partitions: int = 1
    replication_factor: int = 1
    retention_ms: int = 604800000  # 7天
    cleanup_policy: str = "delete"  # delete 或 compact
    
    def validate(self):
        if self.partitions < 1:
            raise ValueError("分区数必须大于0")
        if self.replication_factor < 1:
            raise ValueError("副本因子必须大于0")

@dataclass
class Partition:
    """分区信息"""
    topic: str
    partition_id: int
    leader: int
    replicas: List[int]
    isr: List[int]  # In-Sync Replicas
    
    def is_healthy(self):
        """检查分区健康状态"""
        return len(self.isr) >= len(self.replicas) // 2 + 1

class KafkaCluster:
    """Kafka集群管理"""
    
    def __init__(self, brokers: List[str]):
        self.brokers = brokers
        self.topics: Dict[str, Topic] = {}
        self.partitions: Dict[str, List[Partition]] = {}
    
    def create_topic(self, topic: Topic):
        """创建主题"""
        topic.validate()
        self.topics[topic.name] = topic
        
        # 创建分区
        partitions = []
        for i in range(topic.partitions):
            # 简化的副本分配逻辑
            replicas = self._assign_replicas(i, topic.replication_factor)
            partition = Partition(
                topic=topic.name,
                partition_id=i,
                leader=replicas[0],
                replicas=replicas,
                isr=replicas.copy()
            )
            partitions.append(partition)
        
        self.partitions[topic.name] = partitions
        return f"主题 {topic.name} 创建成功,包含 {topic.partitions} 个分区"
    
    def _assign_replicas(self, partition_id: int, replication_factor: int) -> List[int]:
        """分配副本到不同的broker"""
        broker_count = len(self.brokers)
        replicas = []
        
        for i in range(replication_factor):
            broker_id = (partition_id + i) % broker_count
            replicas.append(broker_id)
        
        return replicas
    
    def get_topic_info(self, topic_name: str) -> Dict:
        """获取主题信息"""
        if topic_name not in self.topics:
            return {"error": f"主题 {topic_name} 不存在"}
        
        topic = self.topics[topic_name]
        partitions = self.partitions.get(topic_name, [])
        
        return {
            "topic": topic.name,
            "partitions": topic.partitions,
            "replication_factor": topic.replication_factor,
            "retention_ms": topic.retention_ms,
            "partition_details": [
                {
                    "partition_id": p.partition_id,
                    "leader": p.leader,
                    "replicas": p.replicas,
                    "isr": p.isr,
                    "healthy": p.is_healthy()
                }
                for p in partitions
            ]
        }

消息流转过程

import hashlib
import json
from typing import Optional

class KafkaProducer:
    """Kafka生产者模拟"""
    
    def __init__(self, cluster: KafkaCluster):
        self.cluster = cluster
        self.message_buffer = []
    
    def send(self, topic: str, message: KafkaMessage, 
             partition: Optional[int] = None) -> Dict:
        """发送消息"""
        if topic not in self.cluster.topics:
            return {"error": f"主题 {topic} 不存在"}
        
        topic_config = self.cluster.topics[topic]
        
        # 确定分区
        if partition is None:
            partition = self._select_partition(topic, message.key, topic_config.partitions)
        
        # 验证分区有效性
        if partition >= topic_config.partitions:
            return {"error": f"分区 {partition} 超出范围"}
        
        # 模拟消息发送
        message.partition = partition
        message.offset = self._get_next_offset(topic, partition)
        
        result = {
            "topic": topic,
            "partition": partition,
            "offset": message.offset,
            "timestamp": message.timestamp,
            "status": "success"
        }
        
        # 添加到缓冲区(模拟)
        self.message_buffer.append((topic, message))
        
        return result
    
    def _select_partition(self, topic: str, key: Optional[str], 
                         partition_count: int) -> int:
        """选择分区"""
        if key is None:
            # 轮询分区
            return len(self.message_buffer) % partition_count
        else:
            # 基于key的哈希分区
            hash_value = int(hashlib.md5(key.encode()).hexdigest(), 16)
            return hash_value % partition_count
    
    def _get_next_offset(self, topic: str, partition: int) -> int:
        """获取下一个offset(模拟)"""
        # 简化实现,实际中由broker管理
        existing_messages = [
            msg for t, msg in self.message_buffer 
            if t == topic and msg.partition == partition
        ]
        return len(existing_messages)

class KafkaConsumer:
    """Kafka消费者模拟"""
    
    def __init__(self, cluster: KafkaCluster, group_id: str):
        self.cluster = cluster
        self.group_id = group_id
        self.subscribed_topics = set()
        self.partition_offsets = {}  # {(topic, partition): offset}
    
    def subscribe(self, topics: List[str]):
        """订阅主题"""
        for topic in topics:
            if topic not in self.cluster.topics:
                raise ValueError(f"主题 {topic} 不存在")
            self.subscribed_topics.add(topic)
        
        # 初始化分区offset
        for topic in topics:
            topic_config = self.cluster.topics[topic]
            for partition_id in range(topic_config.partitions):
                self.partition_offsets[(topic, partition_id)] = 0
    
    def poll(self, timeout_ms: int = 1000) -> List[KafkaMessage]:
        """拉取消息"""
        messages = []
        
        # 模拟从每个分区拉取消息
        for (topic, partition), current_offset in self.partition_offsets.items():
            if topic in self.subscribed_topics:
                # 这里应该从实际的消息存储中读取
                # 为了演示,我们返回模拟消息
                message = KafkaMessage(
                    key=f"key-{current_offset}",
                    value=f"消息内容 {current_offset}",
                    partition=partition,
                    offset=current_offset,
                    headers={"source": "demo"}
                )
                messages.append(message)
                
                # 更新offset
                self.partition_offsets[(topic, partition)] = current_offset + 1
        
        return messages
    
    def commit(self):
        """提交offset"""
        # 在实际实现中,这会将offset提交到Kafka
        print(f"消费者组 {self.group_id} 提交offset: {self.partition_offsets}")
    
    def seek(self, topic: str, partition: int, offset: int):
        """设置消费位置"""
        if (topic, partition) in self.partition_offsets:
            self.partition_offsets[(topic, partition)] = offset
        else:
            raise ValueError(f"未订阅分区 {topic}-{partition}")

1.3 Kafka架构组件

Broker架构

import threading
import time
from concurrent.futures import ThreadPoolExecutor

class KafkaBroker:
    """Kafka Broker模拟"""
    
    def __init__(self, broker_id: int, host: str, port: int):
        self.broker_id = broker_id
        self.host = host
        self.port = port
        self.is_running = False
        self.topics = {}  # topic -> partitions
        self.message_store = {}  # (topic, partition) -> messages
        self.metadata = {}
        self.thread_pool = ThreadPoolExecutor(max_workers=10)
    
    def start(self):
        """启动Broker"""
        self.is_running = True
        print(f"Broker {self.broker_id} 在 {self.host}:{self.port} 启动")
        
        # 启动后台任务
        threading.Thread(target=self._background_tasks, daemon=True).start()
    
    def stop(self):
        """停止Broker"""
        self.is_running = False
        self.thread_pool.shutdown(wait=True)
        print(f"Broker {self.broker_id} 已停止")
    
    def _background_tasks(self):
        """后台任务"""
        while self.is_running:
            try:
                # 日志清理
                self._cleanup_logs()
                # 副本同步检查
                self._check_replica_sync()
                # 健康检查
                self._health_check()
                
                time.sleep(30)  # 30秒执行一次
            except Exception as e:
                print(f"后台任务执行错误: {e}")
    
    def create_partition(self, topic: str, partition_id: int, 
                        is_leader: bool = False):
        """创建分区"""
        if topic not in self.topics:
            self.topics[topic] = {}
        
        self.topics[topic][partition_id] = {
            "is_leader": is_leader,
            "messages": [],
            "high_water_mark": 0,
            "log_end_offset": 0
        }
        
        # 初始化消息存储
        self.message_store[(topic, partition_id)] = []
    
    def append_message(self, topic: str, partition_id: int, 
                      message: KafkaMessage) -> bool:
        """追加消息到分区"""
        if (topic, partition_id) not in self.message_store:
            return False
        
        # 设置offset
        message.offset = len(self.message_store[(topic, partition_id)])
        message.timestamp = int(time.time() * 1000)
        
        # 存储消息
        self.message_store[(topic, partition_id)].append(message)
        
        # 更新分区元数据
        if topic in self.topics and partition_id in self.topics[topic]:
            partition_info = self.topics[topic][partition_id]
            partition_info["log_end_offset"] = message.offset + 1
            if partition_info["is_leader"]:
                partition_info["high_water_mark"] = message.offset + 1
        
        return True
    
    def fetch_messages(self, topic: str, partition_id: int, 
                      offset: int, max_bytes: int = 1024*1024) -> List[KafkaMessage]:
        """获取消息"""
        if (topic, partition_id) not in self.message_store:
            return []
        
        messages = self.message_store[(topic, partition_id)]
        
        # 从指定offset开始获取消息
        result = []
        current_bytes = 0
        
        for i in range(offset, len(messages)):
            message = messages[i]
            message_size = len(str(message.value).encode('utf-8'))
            
            if current_bytes + message_size > max_bytes and result:
                break
            
            result.append(message)
            current_bytes += message_size
        
        return result
    
    def get_partition_metadata(self, topic: str, partition_id: int) -> Dict:
        """获取分区元数据"""
        if topic not in self.topics or partition_id not in self.topics[topic]:
            return {}
        
        partition_info = self.topics[topic][partition_id]
        return {
            "topic": topic,
            "partition": partition_id,
            "leader": self.broker_id if partition_info["is_leader"] else None,
            "high_water_mark": partition_info["high_water_mark"],
            "log_end_offset": partition_info["log_end_offset"],
            "message_count": len(self.message_store.get((topic, partition_id), []))
        }
    
    def _cleanup_logs(self):
        """日志清理"""
        # 根据保留策略清理过期消息
        current_time = int(time.time() * 1000)
        retention_ms = 7 * 24 * 60 * 60 * 1000  # 7天
        
        for (topic, partition_id), messages in self.message_store.items():
            # 清理过期消息
            valid_messages = [
                msg for msg in messages 
                if current_time - msg.timestamp < retention_ms
            ]
            
            if len(valid_messages) < len(messages):
                self.message_store[(topic, partition_id)] = valid_messages
                print(f"清理了 {len(messages) - len(valid_messages)} 条过期消息")
    
    def _check_replica_sync(self):
        """检查副本同步状态"""
        # 模拟副本同步检查
        for topic, partitions in self.topics.items():
            for partition_id, info in partitions.items():
                if info["is_leader"]:
                    # 领导者检查ISR状态
                    pass
    
    def _health_check(self):
        """健康检查"""
        # 检查磁盘空间、内存使用等
        pass

ZooKeeper集成

import json
from typing import Dict, List, Optional

class ZooKeeperClient:
    """ZooKeeper客户端模拟"""
    
    def __init__(self, hosts: str):
        self.hosts = hosts
        self.connected = False
        self.data_store = {}  # 模拟ZK数据存储
        self.watchers = {}  # 路径监听器
    
    def connect(self):
        """连接ZooKeeper"""
        self.connected = True
        # 初始化Kafka相关路径
        self._init_kafka_paths()
        print(f"已连接到ZooKeeper: {self.hosts}")
    
    def _init_kafka_paths(self):
        """初始化Kafka在ZK中的路径结构"""
        paths = [
            "/kafka",
            "/kafka/brokers",
            "/kafka/brokers/ids",
            "/kafka/brokers/topics",
            "/kafka/consumers",
            "/kafka/config",
            "/kafka/config/topics",
            "/kafka/config/clients",
            "/kafka/admin",
            "/kafka/controller"
        ]
        
        for path in paths:
            self.data_store[path] = {"data": None, "children": []}
    
    def create(self, path: str, data: str, ephemeral: bool = False):
        """创建节点"""
        if not self.connected:
            raise Exception("未连接到ZooKeeper")
        
        self.data_store[path] = {
            "data": data,
            "children": [],
            "ephemeral": ephemeral
        }
        
        # 更新父节点的children列表
        parent_path = "/".join(path.split("/")[:-1]) or "/"
        if parent_path in self.data_store:
            child_name = path.split("/")[-1]
            if child_name not in self.data_store[parent_path]["children"]:
                self.data_store[parent_path]["children"].append(child_name)
    
    def get(self, path: str) -> Optional[str]:
        """获取节点数据"""
        if path in self.data_store:
            return self.data_store[path]["data"]
        return None
    
    def set(self, path: str, data: str):
        """设置节点数据"""
        if path in self.data_store:
            self.data_store[path]["data"] = data
        else:
            self.create(path, data)
    
    def get_children(self, path: str) -> List[str]:
        """获取子节点列表"""
        if path in self.data_store:
            return self.data_store[path]["children"]
        return []
    
    def delete(self, path: str):
        """删除节点"""
        if path in self.data_store:
            del self.data_store[path]
            
            # 从父节点的children中移除
            parent_path = "/".join(path.split("/")[:-1]) or "/"
            if parent_path in self.data_store:
                child_name = path.split("/")[-1]
                if child_name in self.data_store[parent_path]["children"]:
                    self.data_store[parent_path]["children"].remove(child_name)

class KafkaZooKeeperManager:
    """Kafka ZooKeeper管理器"""
    
    def __init__(self, zk_client: ZooKeeperClient):
        self.zk = zk_client
    
    def register_broker(self, broker_id: int, host: str, port: int):
        """注册Broker到ZooKeeper"""
        broker_info = {
            "version": 1,
            "host": host,
            "port": port,
            "jmx_port": port + 1000
        }
        
        broker_path = f"/kafka/brokers/ids/{broker_id}"
        self.zk.create(broker_path, json.dumps(broker_info), ephemeral=True)
        print(f"Broker {broker_id} 已注册到ZooKeeper")
    
    def create_topic_in_zk(self, topic: str, partitions: int, 
                          replication_factor: int, broker_ids: List[int]):
        """在ZooKeeper中创建主题元数据"""
        # 分配分区到broker
        partition_assignment = {}
        for partition_id in range(partitions):
            # 简单的轮询分配
            replicas = []
            for i in range(replication_factor):
                broker_idx = (partition_id + i) % len(broker_ids)
                replicas.append(broker_ids[broker_idx])
            partition_assignment[str(partition_id)] = replicas
        
        topic_config = {
            "version": 1,
            "partitions": partition_assignment
        }
        
        topic_path = f"/kafka/brokers/topics/{topic}"
        self.zk.create(topic_path, json.dumps(topic_config))
        
        # 创建主题配置
        config_path = f"/kafka/config/topics/{topic}"
        topic_config_data = {
            "version": 1,
            "config": {
                "retention.ms": "604800000",  # 7天
                "cleanup.policy": "delete"
            }
        }
        self.zk.create(config_path, json.dumps(topic_config_data))
        
        print(f"主题 {topic} 元数据已创建")
    
    def get_broker_list(self) -> List[Dict]:
        """获取活跃的Broker列表"""
        broker_ids = self.zk.get_children("/kafka/brokers/ids")
        brokers = []
        
        for broker_id in broker_ids:
            broker_path = f"/kafka/brokers/ids/{broker_id}"
            broker_data = self.zk.get(broker_path)
            if broker_data:
                broker_info = json.loads(broker_data)
                broker_info["id"] = int(broker_id)
                brokers.append(broker_info)
        
        return brokers
    
    def get_topic_metadata(self, topic: str) -> Optional[Dict]:
        """获取主题元数据"""
        topic_path = f"/kafka/brokers/topics/{topic}"
        topic_data = self.zk.get(topic_path)
        
        if topic_data:
            return json.loads(topic_data)
        return None
    
    def elect_controller(self, broker_id: int) -> bool:
        """选举控制器"""
        controller_path = "/kafka/controller"
        
        try:
            # 尝试创建临时节点
            controller_data = {
                "version": 1,
                "brokerid": broker_id,
                "timestamp": str(int(time.time() * 1000))
            }
            
            self.zk.create(controller_path, json.dumps(controller_data), ephemeral=True)
            print(f"Broker {broker_id} 成为控制器")
            return True
        except:
            # 节点已存在,选举失败
            return False
    
    def get_controller(self) -> Optional[int]:
        """获取当前控制器"""
        controller_path = "/kafka/controller"
        controller_data = self.zk.get(controller_path)
        
        if controller_data:
            controller_info = json.loads(controller_data)
            return controller_info["brokerid"]
        return None

1.4 消息存储机制

日志存储结构

import os
import struct
import mmap
from typing import BinaryIO

class LogSegment:
    """日志段"""
    
    def __init__(self, base_offset: int, log_dir: str):
        self.base_offset = base_offset
        self.log_dir = log_dir
        self.log_file_path = os.path.join(log_dir, f"{base_offset:020d}.log")
        self.index_file_path = os.path.join(log_dir, f"{base_offset:020d}.index")
        self.timeindex_file_path = os.path.join(log_dir, f"{base_offset:020d}.timeindex")
        
        self.log_file: Optional[BinaryIO] = None
        self.index_file: Optional[BinaryIO] = None
        self.size = 0
        self.last_offset = base_offset - 1
    
    def open(self):
        """打开日志段文件"""
        os.makedirs(self.log_dir, exist_ok=True)
        
        self.log_file = open(self.log_file_path, "ab+")
        self.index_file = open(self.index_file_path, "ab+")
        
        # 获取当前文件大小
        self.log_file.seek(0, 2)  # 移动到文件末尾
        self.size = self.log_file.tell()
    
    def append(self, message: KafkaMessage) -> int:
        """追加消息到日志段"""
        if not self.log_file:
            self.open()
        
        # 序列化消息
        serialized_message = self._serialize_message(message)
        
        # 写入日志文件
        position = self.log_file.tell()
        self.log_file.write(serialized_message)
        self.log_file.flush()
        
        # 更新索引
        self._update_index(message.offset, position)
        
        self.last_offset = message.offset
        self.size += len(serialized_message)
        
        return position
    
    def _serialize_message(self, message: KafkaMessage) -> bytes:
        """序列化消息"""
        # 简化的消息格式:
        # [offset(8)] [timestamp(8)] [key_length(4)] [key] [value_length(4)] [value]
        
        key_bytes = message.key.encode('utf-8') if message.key else b''
        value_bytes = message.value.encode('utf-8') if message.value else b''
        
        # 构建消息
        message_data = struct.pack(
            '>QQI',  # offset, timestamp, key_length
            message.offset,
            message.timestamp or 0,
            len(key_bytes)
        )
        message_data += key_bytes
        message_data += struct.pack('>I', len(value_bytes))  # value_length
        message_data += value_bytes
        
        return message_data
    
    def _update_index(self, offset: int, position: int):
        """更新索引文件"""
        # 索引格式:[relative_offset(4)] [position(4)]
        relative_offset = offset - self.base_offset
        index_entry = struct.pack('>II', relative_offset, position)
        
        self.index_file.write(index_entry)
        self.index_file.flush()
    
    def read(self, start_offset: int, max_bytes: int = 1024*1024) -> List[KafkaMessage]:
        """从日志段读取消息"""
        if not self.log_file:
            self.open()
        
        messages = []
        position = self._find_position(start_offset)
        
        if position is None:
            return messages
        
        self.log_file.seek(position)
        bytes_read = 0
        
        while bytes_read < max_bytes:
            try:
                message = self._deserialize_message()
                if message is None:
                    break
                
                if message.offset >= start_offset:
                    messages.append(message)
                    bytes_read += len(str(message.value).encode('utf-8'))
                
            except Exception as e:
                print(f"读取消息时出错: {e}")
                break
        
        return messages
    
    def _find_position(self, offset: int) -> Optional[int]:
        """通过索引查找消息位置"""
        if offset < self.base_offset:
            return None
        
        # 读取索引文件
        self.index_file.seek(0)
        index_data = self.index_file.read()
        
        if not index_data:
            return 0  # 空索引,从文件开始读取
        
        # 二分查找
        entry_size = 8  # 4 bytes for offset + 4 bytes for position
        entry_count = len(index_data) // entry_size
        
        left, right = 0, entry_count - 1
        target_relative_offset = offset - self.base_offset
        
        while left <= right:
            mid = (left + right) // 2
            entry_offset = mid * entry_size
            
            relative_offset, position = struct.unpack(
                '>II', 
                index_data[entry_offset:entry_offset + entry_size]
            )
            
            if relative_offset == target_relative_offset:
                return position
            elif relative_offset < target_relative_offset:
                left = mid + 1
            else:
                right = mid - 1
        
        # 返回最接近的位置
        if right >= 0:
            entry_offset = right * entry_size
            _, position = struct.unpack(
                '>II', 
                index_data[entry_offset:entry_offset + entry_size]
            )
            return position
        
        return 0
    
    def _deserialize_message(self) -> Optional[KafkaMessage]:
        """反序列化消息"""
        try:
            # 读取消息头
            header_data = self.log_file.read(20)  # 8+8+4 bytes
            if len(header_data) < 20:
                return None
            
            offset, timestamp, key_length = struct.unpack('>QQI', header_data)
            
            # 读取key
            key = None
            if key_length > 0:
                key_data = self.log_file.read(key_length)
                key = key_data.decode('utf-8')
            
            # 读取value长度
            value_length_data = self.log_file.read(4)
            if len(value_length_data) < 4:
                return None
            
            value_length = struct.unpack('>I', value_length_data)[0]
            
            # 读取value
            value = None
            if value_length > 0:
                value_data = self.log_file.read(value_length)
                value = value_data.decode('utf-8')
            
            return KafkaMessage(
                key=key,
                value=value,
                offset=offset,
                timestamp=timestamp
            )
            
        except Exception as e:
            print(f"反序列化消息失败: {e}")
            return None
    
    def close(self):
        """关闭文件"""
        if self.log_file:
            self.log_file.close()
        if self.index_file:
            self.index_file.close()
    
    def should_roll(self, max_segment_size: int = 1024*1024*1024) -> bool:
        """判断是否需要滚动到新段"""
        return self.size >= max_segment_size

class TopicPartitionLog:
    """主题分区日志管理"""
    
    def __init__(self, topic: str, partition: int, log_dir: str):
        self.topic = topic
        self.partition = partition
        self.log_dir = os.path.join(log_dir, f"{topic}-{partition}")
        self.segments: List[LogSegment] = []
        self.active_segment: Optional[LogSegment] = None
        self.next_offset = 0
        
        self._load_segments()
    
    def _load_segments(self):
        """加载现有的日志段"""
        if not os.path.exists(self.log_dir):
            os.makedirs(self.log_dir, exist_ok=True)
            # 创建第一个段
            self.active_segment = LogSegment(0, self.log_dir)
            self.segments.append(self.active_segment)
            return
        
        # 扫描日志文件
        log_files = [f for f in os.listdir(self.log_dir) if f.endswith('.log')]
        log_files.sort()
        
        for log_file in log_files:
            base_offset = int(log_file.split('.')[0])
            segment = LogSegment(base_offset, self.log_dir)
            self.segments.append(segment)
            
            # 计算下一个offset
            segment.open()
            if segment.last_offset >= 0:
                self.next_offset = max(self.next_offset, segment.last_offset + 1)
            segment.close()
        
        # 设置活跃段
        if self.segments:
            self.active_segment = self.segments[-1]
        else:
            self.active_segment = LogSegment(0, self.log_dir)
            self.segments.append(self.active_segment)
    
    def append(self, message: KafkaMessage) -> int:
        """追加消息"""
        message.offset = self.next_offset
        message.timestamp = message.timestamp or int(time.time() * 1000)
        
        # 检查是否需要滚动段
        if self.active_segment.should_roll():
            self._roll_segment()
        
        # 追加到活跃段
        position = self.active_segment.append(message)
        self.next_offset += 1
        
        return message.offset
    
    def _roll_segment(self):
        """滚动到新的日志段"""
        # 关闭当前活跃段
        if self.active_segment:
            self.active_segment.close()
        
        # 创建新段
        new_segment = LogSegment(self.next_offset, self.log_dir)
        self.segments.append(new_segment)
        self.active_segment = new_segment
        
        print(f"滚动到新段: {self.next_offset}")
    
    def read(self, start_offset: int, max_bytes: int = 1024*1024) -> List[KafkaMessage]:
        """读取消息"""
        messages = []
        
        # 找到包含start_offset的段
        for segment in self.segments:
            if (segment.base_offset <= start_offset <= segment.last_offset or 
                segment == self.active_segment):
                
                segment_messages = segment.read(start_offset, max_bytes)
                messages.extend(segment_messages)
                
                # 如果读取的字节数已达到限制,停止读取
                total_bytes = sum(len(str(msg.value).encode('utf-8')) for msg in messages)
                if total_bytes >= max_bytes:
                    break
        
        return messages
    
    def get_log_end_offset(self) -> int:
        """获取日志结束offset"""
        return self.next_offset
    
    def get_log_start_offset(self) -> int:
        """获取日志开始offset"""
        if self.segments:
            return self.segments[0].base_offset
        return 0
    
    def cleanup(self, retention_ms: int):
        """清理过期日志段"""
        current_time = int(time.time() * 1000)
        segments_to_remove = []
        
        for segment in self.segments[:-1]:  # 保留活跃段
            # 检查段是否过期(简化实现)
            segment_age = current_time - (segment.base_offset * 1000)  # 假设offset对应时间
            if segment_age > retention_ms:
                segments_to_remove.append(segment)
        
        # 删除过期段
        for segment in segments_to_remove:
            segment.close()
            try:
                os.remove(segment.log_file_path)
                os.remove(segment.index_file_path)
                if os.path.exists(segment.timeindex_file_path):
                    os.remove(segment.timeindex_file_path)
                
                self.segments.remove(segment)
                print(f"删除过期段: {segment.base_offset}")
            except Exception as e:
                print(f"删除段失败: {e}")

1.5 本章总结

核心知识点

  1. Kafka基本概念

    • 分布式流处理平台
    • 高吞吐量、低延迟、持久化
    • 主题、分区、副本、消费者组
  2. 架构组件

    • Broker:消息存储和处理节点
    • ZooKeeper:元数据管理和协调
    • Producer:消息生产者
    • Consumer:消息消费者
  3. 存储机制

    • 日志段(Log Segment)
    • 索引文件(Index File)
    • 消息序列化和持久化
  4. 分区机制

    • 水平扩展和并行处理
    • 分区分配和负载均衡
    • 副本机制保证高可用

最佳实践

  1. 主题设计

    • 合理规划分区数量
    • 考虑消息顺序性需求
    • 设置适当的副本因子
  2. 性能优化

    • 批量发送消息
    • 合理配置缓冲区大小
    • 监控关键指标
  3. 运维管理

    • 定期清理过期日志
    • 监控集群健康状态
    • 备份重要配置

练习题

  1. 基础题

    • 解释Kafka中主题、分区、副本的关系
    • 描述消息在Kafka中的存储流程
    • 说明ZooKeeper在Kafka中的作用
  2. 进阶题

    • 设计一个电商系统的Kafka主题结构
    • 分析不同分区策略的优缺点
    • 实现一个简单的消息序列化器
  3. 实战题

    • 搭建一个3节点的Kafka集群
    • 编写生产者和消费者程序
    • 监控集群性能指标

下一章我们将学习Kafka的安装与环境配置,包括单机部署、集群搭建和基本配置优化。