1.1 什么是Apache Kafka
Apache Kafka是一个分布式流处理平台,最初由LinkedIn开发,现在是Apache软件基金会的顶级项目。Kafka主要用于构建实时数据管道和流应用程序。
核心特性
- 高吞吐量:能够处理每秒数百万条消息
- 低延迟:毫秒级的消息传递延迟
- 持久性:消息持久化存储到磁盘
- 可扩展性:水平扩展能力
- 容错性:分布式架构提供高可用性
- 实时性:支持实时流处理
应用场景
# 常见应用场景示例
class KafkaUseCases:
def __init__(self):
self.use_cases = {
"消息队列": "解耦系统组件,异步处理",
"日志收集": "集中收集和处理应用日志",
"流处理": "实时数据流分析和处理",
"事件溯源": "记录系统状态变化事件",
"数据集成": "连接不同系统和数据源",
"活动跟踪": "用户行为和系统活动监控"
}
def get_scenario_example(self, scenario):
examples = {
"电商系统": {
"订单处理": "订单创建 -> 库存扣减 -> 支付处理 -> 物流配送",
"用户行为": "页面浏览 -> 商品点击 -> 购买转化分析",
"实时推荐": "用户行为数据 -> 实时推荐算法 -> 个性化展示"
},
"金融系统": {
"风控系统": "交易数据 -> 实时风险评估 -> 异常告警",
"数据同步": "核心系统 -> 数据仓库 -> 报表系统"
},
"物联网": {
"设备监控": "传感器数据 -> 实时分析 -> 预警系统",
"数据采集": "设备数据 -> 数据清洗 -> 存储分析"
}
}
return examples.get(scenario, {})
1.2 Kafka核心概念
基本术语
from dataclasses import dataclass
from typing import List, Dict
from enum import Enum
class MessageType(Enum):
TEXT = "text"
JSON = "json"
AVRO = "avro"
BINARY = "binary"
@dataclass
class KafkaMessage:
"""Kafka消息结构"""
key: str = None
value: str = None
headers: Dict[str, str] = None
timestamp: int = None
partition: int = None
offset: int = None
def __post_init__(self):
if self.headers is None:
self.headers = {}
@dataclass
class Topic:
"""主题配置"""
name: str
partitions: int = 1
replication_factor: int = 1
retention_ms: int = 604800000 # 7天
cleanup_policy: str = "delete" # delete 或 compact
def validate(self):
if self.partitions < 1:
raise ValueError("分区数必须大于0")
if self.replication_factor < 1:
raise ValueError("副本因子必须大于0")
@dataclass
class Partition:
"""分区信息"""
topic: str
partition_id: int
leader: int
replicas: List[int]
isr: List[int] # In-Sync Replicas
def is_healthy(self):
"""检查分区健康状态"""
return len(self.isr) >= len(self.replicas) // 2 + 1
class KafkaCluster:
"""Kafka集群管理"""
def __init__(self, brokers: List[str]):
self.brokers = brokers
self.topics: Dict[str, Topic] = {}
self.partitions: Dict[str, List[Partition]] = {}
def create_topic(self, topic: Topic):
"""创建主题"""
topic.validate()
self.topics[topic.name] = topic
# 创建分区
partitions = []
for i in range(topic.partitions):
# 简化的副本分配逻辑
replicas = self._assign_replicas(i, topic.replication_factor)
partition = Partition(
topic=topic.name,
partition_id=i,
leader=replicas[0],
replicas=replicas,
isr=replicas.copy()
)
partitions.append(partition)
self.partitions[topic.name] = partitions
return f"主题 {topic.name} 创建成功,包含 {topic.partitions} 个分区"
def _assign_replicas(self, partition_id: int, replication_factor: int) -> List[int]:
"""分配副本到不同的broker"""
broker_count = len(self.brokers)
replicas = []
for i in range(replication_factor):
broker_id = (partition_id + i) % broker_count
replicas.append(broker_id)
return replicas
def get_topic_info(self, topic_name: str) -> Dict:
"""获取主题信息"""
if topic_name not in self.topics:
return {"error": f"主题 {topic_name} 不存在"}
topic = self.topics[topic_name]
partitions = self.partitions.get(topic_name, [])
return {
"topic": topic.name,
"partitions": topic.partitions,
"replication_factor": topic.replication_factor,
"retention_ms": topic.retention_ms,
"partition_details": [
{
"partition_id": p.partition_id,
"leader": p.leader,
"replicas": p.replicas,
"isr": p.isr,
"healthy": p.is_healthy()
}
for p in partitions
]
}
消息流转过程
import hashlib
import json
from typing import Optional
class KafkaProducer:
"""Kafka生产者模拟"""
def __init__(self, cluster: KafkaCluster):
self.cluster = cluster
self.message_buffer = []
def send(self, topic: str, message: KafkaMessage,
partition: Optional[int] = None) -> Dict:
"""发送消息"""
if topic not in self.cluster.topics:
return {"error": f"主题 {topic} 不存在"}
topic_config = self.cluster.topics[topic]
# 确定分区
if partition is None:
partition = self._select_partition(topic, message.key, topic_config.partitions)
# 验证分区有效性
if partition >= topic_config.partitions:
return {"error": f"分区 {partition} 超出范围"}
# 模拟消息发送
message.partition = partition
message.offset = self._get_next_offset(topic, partition)
result = {
"topic": topic,
"partition": partition,
"offset": message.offset,
"timestamp": message.timestamp,
"status": "success"
}
# 添加到缓冲区(模拟)
self.message_buffer.append((topic, message))
return result
def _select_partition(self, topic: str, key: Optional[str],
partition_count: int) -> int:
"""选择分区"""
if key is None:
# 轮询分区
return len(self.message_buffer) % partition_count
else:
# 基于key的哈希分区
hash_value = int(hashlib.md5(key.encode()).hexdigest(), 16)
return hash_value % partition_count
def _get_next_offset(self, topic: str, partition: int) -> int:
"""获取下一个offset(模拟)"""
# 简化实现,实际中由broker管理
existing_messages = [
msg for t, msg in self.message_buffer
if t == topic and msg.partition == partition
]
return len(existing_messages)
class KafkaConsumer:
"""Kafka消费者模拟"""
def __init__(self, cluster: KafkaCluster, group_id: str):
self.cluster = cluster
self.group_id = group_id
self.subscribed_topics = set()
self.partition_offsets = {} # {(topic, partition): offset}
def subscribe(self, topics: List[str]):
"""订阅主题"""
for topic in topics:
if topic not in self.cluster.topics:
raise ValueError(f"主题 {topic} 不存在")
self.subscribed_topics.add(topic)
# 初始化分区offset
for topic in topics:
topic_config = self.cluster.topics[topic]
for partition_id in range(topic_config.partitions):
self.partition_offsets[(topic, partition_id)] = 0
def poll(self, timeout_ms: int = 1000) -> List[KafkaMessage]:
"""拉取消息"""
messages = []
# 模拟从每个分区拉取消息
for (topic, partition), current_offset in self.partition_offsets.items():
if topic in self.subscribed_topics:
# 这里应该从实际的消息存储中读取
# 为了演示,我们返回模拟消息
message = KafkaMessage(
key=f"key-{current_offset}",
value=f"消息内容 {current_offset}",
partition=partition,
offset=current_offset,
headers={"source": "demo"}
)
messages.append(message)
# 更新offset
self.partition_offsets[(topic, partition)] = current_offset + 1
return messages
def commit(self):
"""提交offset"""
# 在实际实现中,这会将offset提交到Kafka
print(f"消费者组 {self.group_id} 提交offset: {self.partition_offsets}")
def seek(self, topic: str, partition: int, offset: int):
"""设置消费位置"""
if (topic, partition) in self.partition_offsets:
self.partition_offsets[(topic, partition)] = offset
else:
raise ValueError(f"未订阅分区 {topic}-{partition}")
1.3 Kafka架构组件
Broker架构
import threading
import time
from concurrent.futures import ThreadPoolExecutor
class KafkaBroker:
"""Kafka Broker模拟"""
def __init__(self, broker_id: int, host: str, port: int):
self.broker_id = broker_id
self.host = host
self.port = port
self.is_running = False
self.topics = {} # topic -> partitions
self.message_store = {} # (topic, partition) -> messages
self.metadata = {}
self.thread_pool = ThreadPoolExecutor(max_workers=10)
def start(self):
"""启动Broker"""
self.is_running = True
print(f"Broker {self.broker_id} 在 {self.host}:{self.port} 启动")
# 启动后台任务
threading.Thread(target=self._background_tasks, daemon=True).start()
def stop(self):
"""停止Broker"""
self.is_running = False
self.thread_pool.shutdown(wait=True)
print(f"Broker {self.broker_id} 已停止")
def _background_tasks(self):
"""后台任务"""
while self.is_running:
try:
# 日志清理
self._cleanup_logs()
# 副本同步检查
self._check_replica_sync()
# 健康检查
self._health_check()
time.sleep(30) # 30秒执行一次
except Exception as e:
print(f"后台任务执行错误: {e}")
def create_partition(self, topic: str, partition_id: int,
is_leader: bool = False):
"""创建分区"""
if topic not in self.topics:
self.topics[topic] = {}
self.topics[topic][partition_id] = {
"is_leader": is_leader,
"messages": [],
"high_water_mark": 0,
"log_end_offset": 0
}
# 初始化消息存储
self.message_store[(topic, partition_id)] = []
def append_message(self, topic: str, partition_id: int,
message: KafkaMessage) -> bool:
"""追加消息到分区"""
if (topic, partition_id) not in self.message_store:
return False
# 设置offset
message.offset = len(self.message_store[(topic, partition_id)])
message.timestamp = int(time.time() * 1000)
# 存储消息
self.message_store[(topic, partition_id)].append(message)
# 更新分区元数据
if topic in self.topics and partition_id in self.topics[topic]:
partition_info = self.topics[topic][partition_id]
partition_info["log_end_offset"] = message.offset + 1
if partition_info["is_leader"]:
partition_info["high_water_mark"] = message.offset + 1
return True
def fetch_messages(self, topic: str, partition_id: int,
offset: int, max_bytes: int = 1024*1024) -> List[KafkaMessage]:
"""获取消息"""
if (topic, partition_id) not in self.message_store:
return []
messages = self.message_store[(topic, partition_id)]
# 从指定offset开始获取消息
result = []
current_bytes = 0
for i in range(offset, len(messages)):
message = messages[i]
message_size = len(str(message.value).encode('utf-8'))
if current_bytes + message_size > max_bytes and result:
break
result.append(message)
current_bytes += message_size
return result
def get_partition_metadata(self, topic: str, partition_id: int) -> Dict:
"""获取分区元数据"""
if topic not in self.topics or partition_id not in self.topics[topic]:
return {}
partition_info = self.topics[topic][partition_id]
return {
"topic": topic,
"partition": partition_id,
"leader": self.broker_id if partition_info["is_leader"] else None,
"high_water_mark": partition_info["high_water_mark"],
"log_end_offset": partition_info["log_end_offset"],
"message_count": len(self.message_store.get((topic, partition_id), []))
}
def _cleanup_logs(self):
"""日志清理"""
# 根据保留策略清理过期消息
current_time = int(time.time() * 1000)
retention_ms = 7 * 24 * 60 * 60 * 1000 # 7天
for (topic, partition_id), messages in self.message_store.items():
# 清理过期消息
valid_messages = [
msg for msg in messages
if current_time - msg.timestamp < retention_ms
]
if len(valid_messages) < len(messages):
self.message_store[(topic, partition_id)] = valid_messages
print(f"清理了 {len(messages) - len(valid_messages)} 条过期消息")
def _check_replica_sync(self):
"""检查副本同步状态"""
# 模拟副本同步检查
for topic, partitions in self.topics.items():
for partition_id, info in partitions.items():
if info["is_leader"]:
# 领导者检查ISR状态
pass
def _health_check(self):
"""健康检查"""
# 检查磁盘空间、内存使用等
pass
ZooKeeper集成
import json
from typing import Dict, List, Optional
class ZooKeeperClient:
"""ZooKeeper客户端模拟"""
def __init__(self, hosts: str):
self.hosts = hosts
self.connected = False
self.data_store = {} # 模拟ZK数据存储
self.watchers = {} # 路径监听器
def connect(self):
"""连接ZooKeeper"""
self.connected = True
# 初始化Kafka相关路径
self._init_kafka_paths()
print(f"已连接到ZooKeeper: {self.hosts}")
def _init_kafka_paths(self):
"""初始化Kafka在ZK中的路径结构"""
paths = [
"/kafka",
"/kafka/brokers",
"/kafka/brokers/ids",
"/kafka/brokers/topics",
"/kafka/consumers",
"/kafka/config",
"/kafka/config/topics",
"/kafka/config/clients",
"/kafka/admin",
"/kafka/controller"
]
for path in paths:
self.data_store[path] = {"data": None, "children": []}
def create(self, path: str, data: str, ephemeral: bool = False):
"""创建节点"""
if not self.connected:
raise Exception("未连接到ZooKeeper")
self.data_store[path] = {
"data": data,
"children": [],
"ephemeral": ephemeral
}
# 更新父节点的children列表
parent_path = "/".join(path.split("/")[:-1]) or "/"
if parent_path in self.data_store:
child_name = path.split("/")[-1]
if child_name not in self.data_store[parent_path]["children"]:
self.data_store[parent_path]["children"].append(child_name)
def get(self, path: str) -> Optional[str]:
"""获取节点数据"""
if path in self.data_store:
return self.data_store[path]["data"]
return None
def set(self, path: str, data: str):
"""设置节点数据"""
if path in self.data_store:
self.data_store[path]["data"] = data
else:
self.create(path, data)
def get_children(self, path: str) -> List[str]:
"""获取子节点列表"""
if path in self.data_store:
return self.data_store[path]["children"]
return []
def delete(self, path: str):
"""删除节点"""
if path in self.data_store:
del self.data_store[path]
# 从父节点的children中移除
parent_path = "/".join(path.split("/")[:-1]) or "/"
if parent_path in self.data_store:
child_name = path.split("/")[-1]
if child_name in self.data_store[parent_path]["children"]:
self.data_store[parent_path]["children"].remove(child_name)
class KafkaZooKeeperManager:
"""Kafka ZooKeeper管理器"""
def __init__(self, zk_client: ZooKeeperClient):
self.zk = zk_client
def register_broker(self, broker_id: int, host: str, port: int):
"""注册Broker到ZooKeeper"""
broker_info = {
"version": 1,
"host": host,
"port": port,
"jmx_port": port + 1000
}
broker_path = f"/kafka/brokers/ids/{broker_id}"
self.zk.create(broker_path, json.dumps(broker_info), ephemeral=True)
print(f"Broker {broker_id} 已注册到ZooKeeper")
def create_topic_in_zk(self, topic: str, partitions: int,
replication_factor: int, broker_ids: List[int]):
"""在ZooKeeper中创建主题元数据"""
# 分配分区到broker
partition_assignment = {}
for partition_id in range(partitions):
# 简单的轮询分配
replicas = []
for i in range(replication_factor):
broker_idx = (partition_id + i) % len(broker_ids)
replicas.append(broker_ids[broker_idx])
partition_assignment[str(partition_id)] = replicas
topic_config = {
"version": 1,
"partitions": partition_assignment
}
topic_path = f"/kafka/brokers/topics/{topic}"
self.zk.create(topic_path, json.dumps(topic_config))
# 创建主题配置
config_path = f"/kafka/config/topics/{topic}"
topic_config_data = {
"version": 1,
"config": {
"retention.ms": "604800000", # 7天
"cleanup.policy": "delete"
}
}
self.zk.create(config_path, json.dumps(topic_config_data))
print(f"主题 {topic} 元数据已创建")
def get_broker_list(self) -> List[Dict]:
"""获取活跃的Broker列表"""
broker_ids = self.zk.get_children("/kafka/brokers/ids")
brokers = []
for broker_id in broker_ids:
broker_path = f"/kafka/brokers/ids/{broker_id}"
broker_data = self.zk.get(broker_path)
if broker_data:
broker_info = json.loads(broker_data)
broker_info["id"] = int(broker_id)
brokers.append(broker_info)
return brokers
def get_topic_metadata(self, topic: str) -> Optional[Dict]:
"""获取主题元数据"""
topic_path = f"/kafka/brokers/topics/{topic}"
topic_data = self.zk.get(topic_path)
if topic_data:
return json.loads(topic_data)
return None
def elect_controller(self, broker_id: int) -> bool:
"""选举控制器"""
controller_path = "/kafka/controller"
try:
# 尝试创建临时节点
controller_data = {
"version": 1,
"brokerid": broker_id,
"timestamp": str(int(time.time() * 1000))
}
self.zk.create(controller_path, json.dumps(controller_data), ephemeral=True)
print(f"Broker {broker_id} 成为控制器")
return True
except:
# 节点已存在,选举失败
return False
def get_controller(self) -> Optional[int]:
"""获取当前控制器"""
controller_path = "/kafka/controller"
controller_data = self.zk.get(controller_path)
if controller_data:
controller_info = json.loads(controller_data)
return controller_info["brokerid"]
return None
1.4 消息存储机制
日志存储结构
import os
import struct
import mmap
from typing import BinaryIO
class LogSegment:
"""日志段"""
def __init__(self, base_offset: int, log_dir: str):
self.base_offset = base_offset
self.log_dir = log_dir
self.log_file_path = os.path.join(log_dir, f"{base_offset:020d}.log")
self.index_file_path = os.path.join(log_dir, f"{base_offset:020d}.index")
self.timeindex_file_path = os.path.join(log_dir, f"{base_offset:020d}.timeindex")
self.log_file: Optional[BinaryIO] = None
self.index_file: Optional[BinaryIO] = None
self.size = 0
self.last_offset = base_offset - 1
def open(self):
"""打开日志段文件"""
os.makedirs(self.log_dir, exist_ok=True)
self.log_file = open(self.log_file_path, "ab+")
self.index_file = open(self.index_file_path, "ab+")
# 获取当前文件大小
self.log_file.seek(0, 2) # 移动到文件末尾
self.size = self.log_file.tell()
def append(self, message: KafkaMessage) -> int:
"""追加消息到日志段"""
if not self.log_file:
self.open()
# 序列化消息
serialized_message = self._serialize_message(message)
# 写入日志文件
position = self.log_file.tell()
self.log_file.write(serialized_message)
self.log_file.flush()
# 更新索引
self._update_index(message.offset, position)
self.last_offset = message.offset
self.size += len(serialized_message)
return position
def _serialize_message(self, message: KafkaMessage) -> bytes:
"""序列化消息"""
# 简化的消息格式:
# [offset(8)] [timestamp(8)] [key_length(4)] [key] [value_length(4)] [value]
key_bytes = message.key.encode('utf-8') if message.key else b''
value_bytes = message.value.encode('utf-8') if message.value else b''
# 构建消息
message_data = struct.pack(
'>QQI', # offset, timestamp, key_length
message.offset,
message.timestamp or 0,
len(key_bytes)
)
message_data += key_bytes
message_data += struct.pack('>I', len(value_bytes)) # value_length
message_data += value_bytes
return message_data
def _update_index(self, offset: int, position: int):
"""更新索引文件"""
# 索引格式:[relative_offset(4)] [position(4)]
relative_offset = offset - self.base_offset
index_entry = struct.pack('>II', relative_offset, position)
self.index_file.write(index_entry)
self.index_file.flush()
def read(self, start_offset: int, max_bytes: int = 1024*1024) -> List[KafkaMessage]:
"""从日志段读取消息"""
if not self.log_file:
self.open()
messages = []
position = self._find_position(start_offset)
if position is None:
return messages
self.log_file.seek(position)
bytes_read = 0
while bytes_read < max_bytes:
try:
message = self._deserialize_message()
if message is None:
break
if message.offset >= start_offset:
messages.append(message)
bytes_read += len(str(message.value).encode('utf-8'))
except Exception as e:
print(f"读取消息时出错: {e}")
break
return messages
def _find_position(self, offset: int) -> Optional[int]:
"""通过索引查找消息位置"""
if offset < self.base_offset:
return None
# 读取索引文件
self.index_file.seek(0)
index_data = self.index_file.read()
if not index_data:
return 0 # 空索引,从文件开始读取
# 二分查找
entry_size = 8 # 4 bytes for offset + 4 bytes for position
entry_count = len(index_data) // entry_size
left, right = 0, entry_count - 1
target_relative_offset = offset - self.base_offset
while left <= right:
mid = (left + right) // 2
entry_offset = mid * entry_size
relative_offset, position = struct.unpack(
'>II',
index_data[entry_offset:entry_offset + entry_size]
)
if relative_offset == target_relative_offset:
return position
elif relative_offset < target_relative_offset:
left = mid + 1
else:
right = mid - 1
# 返回最接近的位置
if right >= 0:
entry_offset = right * entry_size
_, position = struct.unpack(
'>II',
index_data[entry_offset:entry_offset + entry_size]
)
return position
return 0
def _deserialize_message(self) -> Optional[KafkaMessage]:
"""反序列化消息"""
try:
# 读取消息头
header_data = self.log_file.read(20) # 8+8+4 bytes
if len(header_data) < 20:
return None
offset, timestamp, key_length = struct.unpack('>QQI', header_data)
# 读取key
key = None
if key_length > 0:
key_data = self.log_file.read(key_length)
key = key_data.decode('utf-8')
# 读取value长度
value_length_data = self.log_file.read(4)
if len(value_length_data) < 4:
return None
value_length = struct.unpack('>I', value_length_data)[0]
# 读取value
value = None
if value_length > 0:
value_data = self.log_file.read(value_length)
value = value_data.decode('utf-8')
return KafkaMessage(
key=key,
value=value,
offset=offset,
timestamp=timestamp
)
except Exception as e:
print(f"反序列化消息失败: {e}")
return None
def close(self):
"""关闭文件"""
if self.log_file:
self.log_file.close()
if self.index_file:
self.index_file.close()
def should_roll(self, max_segment_size: int = 1024*1024*1024) -> bool:
"""判断是否需要滚动到新段"""
return self.size >= max_segment_size
class TopicPartitionLog:
"""主题分区日志管理"""
def __init__(self, topic: str, partition: int, log_dir: str):
self.topic = topic
self.partition = partition
self.log_dir = os.path.join(log_dir, f"{topic}-{partition}")
self.segments: List[LogSegment] = []
self.active_segment: Optional[LogSegment] = None
self.next_offset = 0
self._load_segments()
def _load_segments(self):
"""加载现有的日志段"""
if not os.path.exists(self.log_dir):
os.makedirs(self.log_dir, exist_ok=True)
# 创建第一个段
self.active_segment = LogSegment(0, self.log_dir)
self.segments.append(self.active_segment)
return
# 扫描日志文件
log_files = [f for f in os.listdir(self.log_dir) if f.endswith('.log')]
log_files.sort()
for log_file in log_files:
base_offset = int(log_file.split('.')[0])
segment = LogSegment(base_offset, self.log_dir)
self.segments.append(segment)
# 计算下一个offset
segment.open()
if segment.last_offset >= 0:
self.next_offset = max(self.next_offset, segment.last_offset + 1)
segment.close()
# 设置活跃段
if self.segments:
self.active_segment = self.segments[-1]
else:
self.active_segment = LogSegment(0, self.log_dir)
self.segments.append(self.active_segment)
def append(self, message: KafkaMessage) -> int:
"""追加消息"""
message.offset = self.next_offset
message.timestamp = message.timestamp or int(time.time() * 1000)
# 检查是否需要滚动段
if self.active_segment.should_roll():
self._roll_segment()
# 追加到活跃段
position = self.active_segment.append(message)
self.next_offset += 1
return message.offset
def _roll_segment(self):
"""滚动到新的日志段"""
# 关闭当前活跃段
if self.active_segment:
self.active_segment.close()
# 创建新段
new_segment = LogSegment(self.next_offset, self.log_dir)
self.segments.append(new_segment)
self.active_segment = new_segment
print(f"滚动到新段: {self.next_offset}")
def read(self, start_offset: int, max_bytes: int = 1024*1024) -> List[KafkaMessage]:
"""读取消息"""
messages = []
# 找到包含start_offset的段
for segment in self.segments:
if (segment.base_offset <= start_offset <= segment.last_offset or
segment == self.active_segment):
segment_messages = segment.read(start_offset, max_bytes)
messages.extend(segment_messages)
# 如果读取的字节数已达到限制,停止读取
total_bytes = sum(len(str(msg.value).encode('utf-8')) for msg in messages)
if total_bytes >= max_bytes:
break
return messages
def get_log_end_offset(self) -> int:
"""获取日志结束offset"""
return self.next_offset
def get_log_start_offset(self) -> int:
"""获取日志开始offset"""
if self.segments:
return self.segments[0].base_offset
return 0
def cleanup(self, retention_ms: int):
"""清理过期日志段"""
current_time = int(time.time() * 1000)
segments_to_remove = []
for segment in self.segments[:-1]: # 保留活跃段
# 检查段是否过期(简化实现)
segment_age = current_time - (segment.base_offset * 1000) # 假设offset对应时间
if segment_age > retention_ms:
segments_to_remove.append(segment)
# 删除过期段
for segment in segments_to_remove:
segment.close()
try:
os.remove(segment.log_file_path)
os.remove(segment.index_file_path)
if os.path.exists(segment.timeindex_file_path):
os.remove(segment.timeindex_file_path)
self.segments.remove(segment)
print(f"删除过期段: {segment.base_offset}")
except Exception as e:
print(f"删除段失败: {e}")
1.5 本章总结
核心知识点
Kafka基本概念
- 分布式流处理平台
- 高吞吐量、低延迟、持久化
- 主题、分区、副本、消费者组
架构组件
- Broker:消息存储和处理节点
- ZooKeeper:元数据管理和协调
- Producer:消息生产者
- Consumer:消息消费者
存储机制
- 日志段(Log Segment)
- 索引文件(Index File)
- 消息序列化和持久化
分区机制
- 水平扩展和并行处理
- 分区分配和负载均衡
- 副本机制保证高可用
最佳实践
主题设计
- 合理规划分区数量
- 考虑消息顺序性需求
- 设置适当的副本因子
性能优化
- 批量发送消息
- 合理配置缓冲区大小
- 监控关键指标
运维管理
- 定期清理过期日志
- 监控集群健康状态
- 备份重要配置
练习题
基础题
- 解释Kafka中主题、分区、副本的关系
- 描述消息在Kafka中的存储流程
- 说明ZooKeeper在Kafka中的作用
进阶题
- 设计一个电商系统的Kafka主题结构
- 分析不同分区策略的优缺点
- 实现一个简单的消息序列化器
实战题
- 搭建一个3节点的Kafka集群
- 编写生产者和消费者程序
- 监控集群性能指标
下一章我们将学习Kafka的安装与环境配置,包括单机部署、集群搭建和基本配置优化。