3.1 主题管理
主题操作工具
import subprocess
import json
import re
from typing import List, Dict, Optional
from dataclasses import dataclass
@dataclass
class TopicConfig:
"""主题配置"""
name: str
partitions: int = 1
replication_factor: int = 1
configs: Dict[str, str] = None
def __post_init__(self):
if self.configs is None:
self.configs = {}
class KafkaTopicManager:
"""Kafka主题管理器"""
def __init__(self, bootstrap_servers: str = "localhost:9092",
kafka_home: str = "/opt/kafka"):
self.bootstrap_servers = bootstrap_servers
self.kafka_home = kafka_home
self.bin_dir = f"{kafka_home}/bin"
def create_topic(self, topic_config: TopicConfig) -> Dict[str, any]:
"""创建主题"""
cmd = [
f"{self.bin_dir}/kafka-topics.sh",
"--bootstrap-server", self.bootstrap_servers,
"--create",
"--topic", topic_config.name,
"--partitions", str(topic_config.partitions),
"--replication-factor", str(topic_config.replication_factor)
]
# 添加配置参数
for key, value in topic_config.configs.items():
cmd.extend(["--config", f"{key}={value}"])
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
check=True
)
return {
"success": True,
"message": f"主题 {topic_config.name} 创建成功",
"output": result.stdout
}
except subprocess.CalledProcessError as e:
return {
"success": False,
"message": f"创建主题失败: {e.stderr}",
"error": e.stderr
}
def list_topics(self) -> List[str]:
"""列出所有主题"""
cmd = [
f"{self.bin_dir}/kafka-topics.sh",
"--bootstrap-server", self.bootstrap_servers,
"--list"
]
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
check=True
)
topics = [line.strip() for line in result.stdout.split('\n') if line.strip()]
return topics
except subprocess.CalledProcessError as e:
print(f"获取主题列表失败: {e.stderr}")
return []
def describe_topic(self, topic_name: str) -> Dict[str, any]:
"""描述主题详情"""
cmd = [
f"{self.bin_dir}/kafka-topics.sh",
"--bootstrap-server", self.bootstrap_servers,
"--describe",
"--topic", topic_name
]
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
check=True
)
return self._parse_topic_description(result.stdout)
except subprocess.CalledProcessError as e:
return {
"success": False,
"error": e.stderr
}
def _parse_topic_description(self, output: str) -> Dict[str, any]:
"""解析主题描述输出"""
lines = output.strip().split('\n')
if not lines:
return {"success": False, "error": "无输出"}
# 解析主题基本信息
topic_line = lines[0]
topic_match = re.search(r'Topic: (\S+)\s+TopicId: (\S+)\s+PartitionCount: (\d+)\s+ReplicationFactor: (\d+)', topic_line)
if not topic_match:
return {"success": False, "error": "解析主题信息失败"}
topic_name = topic_match.group(1)
topic_id = topic_match.group(2)
partition_count = int(topic_match.group(3))
replication_factor = int(topic_match.group(4))
# 解析配置信息
configs = {}
config_match = re.search(r'Configs: (.+)', topic_line)
if config_match:
config_str = config_match.group(1)
for config_pair in config_str.split(','):
if '=' in config_pair:
key, value = config_pair.strip().split('=', 1)
configs[key] = value
# 解析分区信息
partitions = []
for line in lines[1:]:
if line.strip().startswith('Partition:'):
partition_match = re.search(
r'Partition: (\d+)\s+Leader: (\d+)\s+Replicas: ([\d,]+)\s+Isr: ([\d,]+)',
line
)
if partition_match:
partition_info = {
"partition_id": int(partition_match.group(1)),
"leader": int(partition_match.group(2)),
"replicas": [int(x) for x in partition_match.group(3).split(',')],
"isr": [int(x) for x in partition_match.group(4).split(',')]
}
partitions.append(partition_info)
return {
"success": True,
"topic_name": topic_name,
"topic_id": topic_id,
"partition_count": partition_count,
"replication_factor": replication_factor,
"configs": configs,
"partitions": partitions
}
def delete_topic(self, topic_name: str) -> Dict[str, any]:
"""删除主题"""
cmd = [
f"{self.bin_dir}/kafka-topics.sh",
"--bootstrap-server", self.bootstrap_servers,
"--delete",
"--topic", topic_name
]
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
check=True
)
return {
"success": True,
"message": f"主题 {topic_name} 删除成功",
"output": result.stdout
}
except subprocess.CalledProcessError as e:
return {
"success": False,
"message": f"删除主题失败: {e.stderr}",
"error": e.stderr
}
def alter_topic(self, topic_name: str, configs: Dict[str, str]) -> Dict[str, any]:
"""修改主题配置"""
cmd = [
f"{self.bin_dir}/kafka-configs.sh",
"--bootstrap-server", self.bootstrap_servers,
"--entity-type", "topics",
"--entity-name", topic_name,
"--alter"
]
# 构建配置字符串
config_pairs = [f"{key}={value}" for key, value in configs.items()]
config_string = ",".join(config_pairs)
cmd.extend(["--add-config", config_string])
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
check=True
)
return {
"success": True,
"message": f"主题 {topic_name} 配置修改成功",
"output": result.stdout
}
except subprocess.CalledProcessError as e:
return {
"success": False,
"message": f"修改主题配置失败: {e.stderr}",
"error": e.stderr
}
def increase_partitions(self, topic_name: str, new_partition_count: int) -> Dict[str, any]:
"""增加分区数"""
cmd = [
f"{self.bin_dir}/kafka-topics.sh",
"--bootstrap-server", self.bootstrap_servers,
"--alter",
"--topic", topic_name,
"--partitions", str(new_partition_count)
]
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
check=True
)
return {
"success": True,
"message": f"主题 {topic_name} 分区数已增加到 {new_partition_count}",
"output": result.stdout
}
except subprocess.CalledProcessError as e:
return {
"success": False,
"message": f"增加分区失败: {e.stderr}",
"error": e.stderr
}
def get_topic_configs(self, topic_name: str) -> Dict[str, any]:
"""获取主题配置"""
cmd = [
f"{self.bin_dir}/kafka-configs.sh",
"--bootstrap-server", self.bootstrap_servers,
"--entity-type", "topics",
"--entity-name", topic_name,
"--describe"
]
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
check=True
)
return self._parse_topic_configs(result.stdout)
except subprocess.CalledProcessError as e:
return {
"success": False,
"error": e.stderr
}
def _parse_topic_configs(self, output: str) -> Dict[str, any]:
"""解析主题配置输出"""
configs = {}
for line in output.split('\n'):
if '=' in line and not line.strip().startswith('Dynamic configs'):
# 解析配置行
config_match = re.search(r'(\S+)=(\S+)', line)
if config_match:
key = config_match.group(1)
value = config_match.group(2)
configs[key] = value
return {
"success": True,
"configs": configs
}
# 使用示例
if __name__ == "__main__":
topic_manager = KafkaTopicManager()
# 创建主题
topic_config = TopicConfig(
name="test-topic",
partitions=3,
replication_factor=1,
configs={
"retention.ms": "604800000", # 7天
"cleanup.policy": "delete"
}
)
result = topic_manager.create_topic(topic_config)
print(f"创建主题: {result}")
# 列出主题
topics = topic_manager.list_topics()
print(f"主题列表: {topics}")
# 描述主题
description = topic_manager.describe_topic("test-topic")
print(f"主题描述: {description}")
主题配置最佳实践
class TopicConfigOptimizer:
"""主题配置优化器"""
def __init__(self):
self.config_templates = {
"high_throughput": {
"compression.type": "snappy",
"batch.size": "65536",
"linger.ms": "10",
"buffer.memory": "67108864",
"acks": "1"
},
"low_latency": {
"compression.type": "none",
"batch.size": "1024",
"linger.ms": "0",
"acks": "1"
},
"high_durability": {
"acks": "all",
"min.insync.replicas": "2",
"unclean.leader.election.enable": "false",
"retries": "2147483647",
"max.in.flight.requests.per.connection": "1"
},
"log_aggregation": {
"retention.ms": "259200000", # 3天
"segment.ms": "86400000", # 1天
"cleanup.policy": "delete",
"compression.type": "gzip"
},
"event_streaming": {
"retention.ms": "604800000", # 7天
"segment.ms": "3600000", # 1小时
"cleanup.policy": "delete",
"compression.type": "snappy"
},
"cdc": {
"cleanup.policy": "compact",
"min.cleanable.dirty.ratio": "0.1",
"segment.ms": "86400000", # 1天
"delete.retention.ms": "86400000" # 1天
}
}
def get_optimized_config(self, use_case: str,
custom_configs: Dict[str, str] = None) -> Dict[str, str]:
"""获取优化配置"""
if use_case not in self.config_templates:
raise ValueError(f"不支持的用例: {use_case}")
config = self.config_templates[use_case].copy()
if custom_configs:
config.update(custom_configs)
return config
def calculate_partitions(self, target_throughput_mb: float,
message_size_kb: float,
producer_count: int = 1) -> int:
"""计算建议的分区数"""
# 假设每个分区可以处理10MB/s的吞吐量
partition_throughput_mb = 10
# 基于吞吐量计算
partitions_by_throughput = max(1, int(target_throughput_mb / partition_throughput_mb))
# 基于生产者数量计算(每个生产者至少一个分区)
partitions_by_producers = producer_count
# 取较大值
recommended_partitions = max(partitions_by_throughput, partitions_by_producers)
# 限制最大分区数
max_partitions = 100
return min(recommended_partitions, max_partitions)
def calculate_retention(self, daily_volume_gb: float,
max_storage_gb: float) -> int:
"""计算保留时间(毫秒)"""
if daily_volume_gb <= 0:
return 604800000 # 默认7天
# 计算可以保留多少天的数据
retention_days = max_storage_gb / daily_volume_gb
# 限制在1-30天之间
retention_days = max(1, min(retention_days, 30))
# 转换为毫秒
return int(retention_days * 24 * 60 * 60 * 1000)
def validate_config(self, config: Dict[str, str]) -> Dict[str, List[str]]:
"""验证配置"""
issues = []
warnings = []
# 检查必要配置
if "acks" in config:
acks = config["acks"]
if acks == "0":
warnings.append("acks=0 可能导致数据丢失")
elif acks == "all" and "min.insync.replicas" not in config:
warnings.append("使用acks=all时建议设置min.insync.replicas")
# 检查压缩配置
if "compression.type" in config:
compression = config["compression.type"]
if compression not in ["none", "gzip", "snappy", "lz4", "zstd"]:
issues.append(f"不支持的压缩类型: {compression}")
# 检查清理策略
if "cleanup.policy" in config:
policy = config["cleanup.policy"]
if policy not in ["delete", "compact", "compact,delete"]:
issues.append(f"不支持的清理策略: {policy}")
# 检查保留时间
if "retention.ms" in config:
try:
retention_ms = int(config["retention.ms"])
if retention_ms < 60000: # 小于1分钟
warnings.append("保留时间过短可能导致数据过早删除")
elif retention_ms > 2592000000: # 大于30天
warnings.append("保留时间过长可能占用大量存储空间")
except ValueError:
issues.append("retention.ms必须是数字")
return {
"issues": issues,
"warnings": warnings,
"valid": len(issues) == 0
}
def generate_topic_plan(self, requirements: Dict[str, any]) -> Dict[str, any]:
"""生成主题规划"""
use_case = requirements.get("use_case", "event_streaming")
target_throughput = requirements.get("target_throughput_mb", 10)
message_size = requirements.get("message_size_kb", 1)
producer_count = requirements.get("producer_count", 1)
daily_volume = requirements.get("daily_volume_gb", 10)
max_storage = requirements.get("max_storage_gb", 100)
replication_factor = requirements.get("replication_factor", 3)
# 计算分区数
partitions = self.calculate_partitions(
target_throughput, message_size, producer_count
)
# 计算保留时间
retention_ms = self.calculate_retention(daily_volume, max_storage)
# 获取基础配置
base_config = self.get_optimized_config(use_case)
# 添加计算出的配置
base_config["retention.ms"] = str(retention_ms)
# 创建主题配置
topic_config = TopicConfig(
name=requirements.get("topic_name", "new-topic"),
partitions=partitions,
replication_factor=replication_factor,
configs=base_config
)
# 验证配置
validation = self.validate_config(base_config)
return {
"topic_config": topic_config,
"validation": validation,
"recommendations": self._generate_recommendations(requirements, topic_config)
}
def _generate_recommendations(self, requirements: Dict[str, any],
topic_config: TopicConfig) -> List[str]:
"""生成建议"""
recommendations = []
# 分区建议
if topic_config.partitions > 50:
recommendations.append("分区数较多,考虑是否可以拆分为多个主题")
elif topic_config.partitions == 1:
recommendations.append("单分区限制了并行度,考虑增加分区数")
# 副本建议
if topic_config.replication_factor < 3:
recommendations.append("建议使用至少3个副本以保证高可用性")
# 存储建议
daily_volume = requirements.get("daily_volume_gb", 0)
if daily_volume > 100:
recommendations.append("数据量较大,考虑使用数据压缩和分层存储")
# 性能建议
target_throughput = requirements.get("target_throughput_mb", 0)
if target_throughput > 100:
recommendations.append("高吞吐量场景,建议优化生产者批处理配置")
return recommendations
# 使用示例
if __name__ == "__main__":
optimizer = TopicConfigOptimizer()
# 定义需求
requirements = {
"topic_name": "user-events",
"use_case": "event_streaming",
"target_throughput_mb": 50,
"message_size_kb": 2,
"producer_count": 5,
"daily_volume_gb": 20,
"max_storage_gb": 200,
"replication_factor": 3
}
# 生成主题规划
plan = optimizer.generate_topic_plan(requirements)
print("=== 主题规划 ===")
print(f"主题名称: {plan['topic_config'].name}")
print(f"分区数: {plan['topic_config'].partitions}")
print(f"副本因子: {plan['topic_config'].replication_factor}")
print(f"配置: {plan['topic_config'].configs}")
if plan['validation']['valid']:
print("\n✓ 配置验证通过")
else:
print("\n✗ 配置存在问题:")
for issue in plan['validation']['issues']:
print(f" - {issue}")
if plan['validation']['warnings']:
print("\n⚠ 警告:")
for warning in plan['validation']['warnings']:
print(f" - {warning}")
if plan['recommendations']:
print("\n💡 建议:")
for rec in plan['recommendations']:
print(f" - {rec}")
3.2 消息生产
生产者客户端
from kafka import KafkaProducer
import json
import time
import threading
from typing import Dict, List, Optional, Callable
from dataclasses import dataclass, asdict
from enum import Enum
class SerializationType(Enum):
STRING = "string"
JSON = "json"
AVRO = "avro"
BINARY = "binary"
@dataclass
class ProducerConfig:
"""生产者配置"""
bootstrap_servers: str = "localhost:9092"
acks: str = "1" # 0, 1, all
retries: int = 3
batch_size: int = 16384
linger_ms: int = 0
buffer_memory: int = 33554432
compression_type: str = "none" # none, gzip, snappy, lz4, zstd
max_in_flight_requests: int = 5
request_timeout_ms: int = 30000
delivery_timeout_ms: int = 120000
def to_kafka_config(self) -> Dict[str, any]:
"""转换为Kafka配置"""
return {
"bootstrap_servers": self.bootstrap_servers.split(","),
"acks": self.acks,
"retries": self.retries,
"batch_size": self.batch_size,
"linger_ms": self.linger_ms,
"buffer_memory": self.buffer_memory,
"compression_type": self.compression_type,
"max_in_flight_requests_per_connection": self.max_in_flight_requests,
"request_timeout_ms": self.request_timeout_ms,
"delivery_timeout_ms": self.delivery_timeout_ms
}
class KafkaMessageProducer:
"""Kafka消息生产者"""
def __init__(self, config: ProducerConfig):
self.config = config
self.producer = None
self.metrics = {
"messages_sent": 0,
"messages_failed": 0,
"bytes_sent": 0,
"send_rate": 0,
"error_rate": 0
}
self._lock = threading.Lock()
self._start_time = time.time()
def connect(self):
"""连接到Kafka"""
try:
kafka_config = self.config.to_kafka_config()
# 设置序列化器
kafka_config["key_serializer"] = self._serialize_key
kafka_config["value_serializer"] = self._serialize_value
self.producer = KafkaProducer(**kafka_config)
print("✓ 已连接到Kafka")
except Exception as e:
print(f"✗ 连接Kafka失败: {e}")
raise
def _serialize_key(self, key):
"""序列化键"""
if key is None:
return None
if isinstance(key, str):
return key.encode('utf-8')
return str(key).encode('utf-8')
def _serialize_value(self, value):
"""序列化值"""
if value is None:
return None
if isinstance(value, str):
return value.encode('utf-8')
if isinstance(value, dict):
return json.dumps(value).encode('utf-8')
return str(value).encode('utf-8')
def send_message(self, topic: str, message: any, key: any = None,
partition: Optional[int] = None,
headers: Optional[Dict[str, str]] = None,
callback: Optional[Callable] = None) -> bool:
"""发送单条消息"""
if not self.producer:
raise RuntimeError("生产者未连接")
try:
# 准备headers
kafka_headers = None
if headers:
kafka_headers = [(k, v.encode('utf-8')) for k, v in headers.items()]
# 发送消息
future = self.producer.send(
topic=topic,
value=message,
key=key,
partition=partition,
headers=kafka_headers
)
# 添加回调
if callback:
future.add_callback(callback)
future.add_errback(self._on_send_error)
# 更新指标
with self._lock:
self.metrics["messages_sent"] += 1
if isinstance(message, str):
self.metrics["bytes_sent"] += len(message.encode('utf-8'))
elif isinstance(message, dict):
self.metrics["bytes_sent"] += len(json.dumps(message).encode('utf-8'))
return True
except Exception as e:
print(f"发送消息失败: {e}")
with self._lock:
self.metrics["messages_failed"] += 1
return False
def send_batch(self, topic: str, messages: List[Dict[str, any]],
batch_size: int = 100) -> Dict[str, int]:
"""批量发送消息"""
if not self.producer:
raise RuntimeError("生产者未连接")
sent_count = 0
failed_count = 0
# 分批发送
for i in range(0, len(messages), batch_size):
batch = messages[i:i + batch_size]
for msg_data in batch:
success = self.send_message(
topic=topic,
message=msg_data.get("value"),
key=msg_data.get("key"),
partition=msg_data.get("partition"),
headers=msg_data.get("headers")
)
if success:
sent_count += 1
else:
failed_count += 1
# 刷新缓冲区
self.producer.flush()
return {
"sent": sent_count,
"failed": failed_count,
"total": len(messages)
}
def _on_send_error(self, exception):
"""发送错误回调"""
print(f"消息发送错误: {exception}")
with self._lock:
self.metrics["messages_failed"] += 1
def flush(self, timeout: Optional[float] = None):
"""刷新缓冲区"""
if self.producer:
self.producer.flush(timeout)
def close(self):
"""关闭生产者"""
if self.producer:
self.producer.close()
self.producer = None
print("生产者已关闭")
def get_metrics(self) -> Dict[str, any]:
"""获取指标"""
with self._lock:
current_time = time.time()
elapsed_time = current_time - self._start_time
if elapsed_time > 0:
self.metrics["send_rate"] = self.metrics["messages_sent"] / elapsed_time
total_messages = self.metrics["messages_sent"] + self.metrics["messages_failed"]
if total_messages > 0:
self.metrics["error_rate"] = self.metrics["messages_failed"] / total_messages
return self.metrics.copy()
def reset_metrics(self):
"""重置指标"""
with self._lock:
self.metrics = {
"messages_sent": 0,
"messages_failed": 0,
"bytes_sent": 0,
"send_rate": 0,
"error_rate": 0
}
self._start_time = time.time()
class MessageGenerator:
"""消息生成器"""
def __init__(self):
self.templates = {
"user_event": {
"user_id": "user_{}",
"event_type": ["login", "logout", "purchase", "view", "click"],
"timestamp": None, # 自动生成
"properties": {}
},
"order_event": {
"order_id": "order_{}",
"user_id": "user_{}",
"product_id": "product_{}",
"amount": None, # 随机生成
"status": ["created", "paid", "shipped", "delivered", "cancelled"],
"timestamp": None
},
"log_event": {
"level": ["DEBUG", "INFO", "WARN", "ERROR"],
"logger": "com.example.{}",
"message": "Sample log message {}",
"timestamp": None,
"thread": "thread-{}"
}
}
def generate_message(self, template_name: str, index: int = 0) -> Dict[str, any]:
"""生成单条消息"""
if template_name not in self.templates:
raise ValueError(f"未知模板: {template_name}")
template = self.templates[template_name].copy()
message = {}
for key, value in template.items():
if value is None:
# 自动生成值
if key == "timestamp":
message[key] = int(time.time() * 1000)
elif key == "amount":
import random
message[key] = round(random.uniform(10.0, 1000.0), 2)
else:
message[key] = f"auto_{index}"
elif isinstance(value, list):
# 从列表中随机选择
import random
message[key] = random.choice(value)
elif isinstance(value, str) and "{}" in value:
# 格式化字符串
message[key] = value.format(index)
else:
message[key] = value
return message
def generate_batch(self, template_name: str, count: int,
start_index: int = 0) -> List[Dict[str, any]]:
"""生成批量消息"""
messages = []
for i in range(count):
message = self.generate_message(template_name, start_index + i)
messages.append({
"value": message,
"key": str(start_index + i)
})
return messages
# 使用示例
if __name__ == "__main__":
# 配置生产者
config = ProducerConfig(
bootstrap_servers="localhost:9092",
acks="1",
batch_size=32768,
linger_ms=10,
compression_type="snappy"
)
# 创建生产者
producer = KafkaMessageProducer(config)
producer.connect()
try:
# 发送单条消息
success = producer.send_message(
topic="test-topic",
message={"hello": "world", "timestamp": int(time.time())},
key="test-key",
headers={"source": "demo"}
)
print(f"单条消息发送: {'成功' if success else '失败'}")
# 生成并发送批量消息
generator = MessageGenerator()
messages = generator.generate_batch("user_event", 100)
result = producer.send_batch("user-events", messages)
print(f"批量发送结果: {result}")
# 刷新缓冲区
producer.flush()
# 查看指标
metrics = producer.get_metrics()
print(f"生产者指标: {metrics}")
finally:
producer.close()
高级生产者功能
import asyncio
from concurrent.futures import ThreadPoolExecutor
import hashlib
from typing import Union
class AdvancedKafkaProducer:
"""高级Kafka生产者"""
def __init__(self, config: ProducerConfig):
self.config = config
self.producer = None
self.partitioner = None
self.interceptors = []
self.metrics_collector = None
self.circuit_breaker = None
def add_interceptor(self, interceptor: Callable):
"""添加拦截器"""
self.interceptors.append(interceptor)
def set_partitioner(self, partitioner: Callable):
"""设置分区器"""
self.partitioner = partitioner
def _apply_interceptors(self, topic: str, key: any, value: any,
headers: Dict[str, str]) -> tuple:
"""应用拦截器"""
for interceptor in self.interceptors:
try:
key, value, headers = interceptor(topic, key, value, headers)
except Exception as e:
print(f"拦截器执行失败: {e}")
return key, value, headers
def _select_partition(self, topic: str, key: any,
partition_count: int) -> Optional[int]:
"""选择分区"""
if self.partitioner:
return self.partitioner(topic, key, partition_count)
# 默认分区策略
if key is None:
return None # 轮询
# 基于key的哈希分区
key_str = str(key)
hash_value = int(hashlib.md5(key_str.encode()).hexdigest(), 16)
return hash_value % partition_count
async def send_async(self, topic: str, message: any, key: any = None,
headers: Optional[Dict[str, str]] = None) -> bool:
"""异步发送消息"""
loop = asyncio.get_event_loop()
# 在线程池中执行同步发送
with ThreadPoolExecutor() as executor:
future = loop.run_in_executor(
executor,
self.send_message,
topic, message, key, None, headers
)
return await future
def send_with_transaction(self, topic: str, messages: List[Dict[str, any]],
transaction_id: str) -> bool:
"""事务性发送"""
if not self.producer:
raise RuntimeError("生产者未连接")
try:
# 开始事务
self.producer.begin_transaction()
# 发送所有消息
for msg_data in messages:
self.send_message(
topic=topic,
message=msg_data.get("value"),
key=msg_data.get("key"),
partition=msg_data.get("partition"),
headers=msg_data.get("headers")
)
# 提交事务
self.producer.commit_transaction()
return True
except Exception as e:
print(f"事务发送失败: {e}")
try:
self.producer.abort_transaction()
except:
pass
return False
def send_with_retry(self, topic: str, message: any, key: any = None,
max_retries: int = 3, retry_delay: float = 1.0) -> bool:
"""带重试的发送"""
for attempt in range(max_retries + 1):
try:
success = self.send_message(topic, message, key)
if success:
return True
except Exception as e:
print(f"发送尝试 {attempt + 1} 失败: {e}")
if attempt < max_retries:
time.sleep(retry_delay * (2 ** attempt)) # 指数退避
return False
def send_with_schema_validation(self, topic: str, message: Dict[str, any],
schema: Dict[str, any]) -> bool:
"""带模式验证的发送"""
# 简单的模式验证
if not self._validate_schema(message, schema):
print("消息不符合模式")
return False
return self.send_message(topic, message)
def _validate_schema(self, message: Dict[str, any],
schema: Dict[str, any]) -> bool:
"""验证消息模式"""
required_fields = schema.get("required", [])
properties = schema.get("properties", {})
# 检查必需字段
for field in required_fields:
if field not in message:
print(f"缺少必需字段: {field}")
return False
# 检查字段类型
for field, value in message.items():
if field in properties:
expected_type = properties[field].get("type")
if expected_type and not self._check_type(value, expected_type):
print(f"字段 {field} 类型不匹配")
return False
return True
def _check_type(self, value: any, expected_type: str) -> bool:
"""检查类型"""
type_mapping = {
"string": str,
"integer": int,
"number": (int, float),
"boolean": bool,
"array": list,
"object": dict
}
expected_python_type = type_mapping.get(expected_type)
if expected_python_type:
return isinstance(value, expected_python_type)
return True
class ProducerInterceptors:
"""生产者拦截器集合"""
@staticmethod
def add_timestamp(topic: str, key: any, value: any,
headers: Dict[str, str]) -> tuple:
"""添加时间戳"""
if headers is None:
headers = {}
headers["timestamp"] = str(int(time.time() * 1000))
return key, value, headers
@staticmethod
def add_source_info(source: str):
"""添加来源信息"""
def interceptor(topic: str, key: any, value: any,
headers: Dict[str, str]) -> tuple:
if headers is None:
headers = {}
headers["source"] = source
return key, value, headers
return interceptor
@staticmethod
def add_correlation_id(topic: str, key: any, value: any,
headers: Dict[str, str]) -> tuple:
"""添加关联ID"""
import uuid
if headers is None:
headers = {}
headers["correlation_id"] = str(uuid.uuid4())
return key, value, headers
@staticmethod
def log_message(topic: str, key: any, value: any,
headers: Dict[str, str]) -> tuple:
"""记录消息日志"""
print(f"发送消息到主题 {topic}: key={key}, headers={headers}")
return key, value, headers
class CustomPartitioners:
"""自定义分区器集合"""
@staticmethod
def round_robin_partitioner(topic: str, key: any, partition_count: int) -> int:
"""轮询分区器"""
if not hasattr(CustomPartitioners, '_round_robin_counter'):
CustomPartitioners._round_robin_counter = 0
partition = CustomPartitioners._round_robin_counter % partition_count
CustomPartitioners._round_robin_counter += 1
return partition
@staticmethod
def sticky_partitioner(topic: str, key: any, partition_count: int) -> int:
"""粘性分区器(Kafka 2.4+默认)"""
if key is not None:
# 有key时使用哈希分区
key_str = str(key)
hash_value = int(hashlib.md5(key_str.encode()).hexdigest(), 16)
return hash_value % partition_count
# 无key时使用粘性分区
if not hasattr(CustomPartitioners, '_sticky_partition'):
import random
CustomPartitioners._sticky_partition = random.randint(0, partition_count - 1)
return CustomPartitioners._sticky_partition
@staticmethod
def geographic_partitioner(region_mapping: Dict[str, int]):
"""地理位置分区器"""
def partitioner(topic: str, key: any, partition_count: int) -> int:
if key and str(key) in region_mapping:
return region_mapping[str(key)] % partition_count
return 0 # 默认分区
return partitioner
# 使用示例
if __name__ == "__main__":
# 创建高级生产者
config = ProducerConfig(
bootstrap_servers="localhost:9092",
acks="all",
retries=5
)
producer = AdvancedKafkaProducer(config)
# 添加拦截器
producer.add_interceptor(ProducerInterceptors.add_timestamp)
producer.add_interceptor(ProducerInterceptors.add_source_info("demo-app"))
producer.add_interceptor(ProducerInterceptors.add_correlation_id)
# 设置分区器
producer.set_partitioner(CustomPartitioners.sticky_partitioner)
# 连接并发送消息
producer.connect()
try:
# 带重试的发送
success = producer.send_with_retry(
topic="test-topic",
message={"data": "test"},
key="test-key",
max_retries=3
)
print(f"重试发送: {'成功' if success else '失败'}")
# 带模式验证的发送
schema = {
"required": ["user_id", "event_type"],
"properties": {
"user_id": {"type": "string"},
"event_type": {"type": "string"},
"timestamp": {"type": "integer"}
}
}
message = {
"user_id": "user123",
"event_type": "login",
"timestamp": int(time.time())
}
success = producer.send_with_schema_validation(
topic="user-events",
message=message,
schema=schema
)
print(f"模式验证发送: {'成功' if success else '失败'}")
finally:
producer.close()
3.3 消息消费
消费者客户端
from kafka import KafkaConsumer
from kafka.errors import KafkaError
import json
import time
import threading
from typing import Dict, List, Optional, Callable, Set
from dataclasses import dataclass
from enum import Enum
class OffsetResetStrategy(Enum):
EARLIEST = "earliest"
LATEST = "latest"
NONE = "none"
@dataclass
class ConsumerConfig:
"""消费者配置"""
bootstrap_servers: str = "localhost:9092"
group_id: str = "default-group"
auto_offset_reset: str = "latest"
enable_auto_commit: bool = True
auto_commit_interval_ms: int = 5000
max_poll_records: int = 500
max_poll_interval_ms: int = 300000
session_timeout_ms: int = 10000
heartbeat_interval_ms: int = 3000
fetch_min_bytes: int = 1
fetch_max_wait_ms: int = 500
max_partition_fetch_bytes: int = 1048576
def to_kafka_config(self) -> Dict[str, any]:
"""转换为Kafka配置"""
return {
"bootstrap_servers": self.bootstrap_servers.split(","),
"group_id": self.group_id,
"auto_offset_reset": self.auto_offset_reset,
"enable_auto_commit": self.enable_auto_commit,
"auto_commit_interval_ms": self.auto_commit_interval_ms,
"max_poll_records": self.max_poll_records,
"max_poll_interval_ms": self.max_poll_interval_ms,
"session_timeout_ms": self.session_timeout_ms,
"heartbeat_interval_ms": self.heartbeat_interval_ms,
"fetch_min_bytes": self.fetch_min_bytes,
"fetch_max_wait_ms": self.fetch_max_wait_ms,
"max_partition_fetch_bytes": self.max_partition_fetch_bytes
}
class KafkaMessageConsumer:
"""Kafka消息消费者"""
def __init__(self, config: ConsumerConfig):
self.config = config
self.consumer = None
self.subscribed_topics: Set[str] = set()
self.message_handlers: Dict[str, Callable] = {}
self.running = False
self.metrics = {
"messages_consumed": 0,
"messages_processed": 0,
"messages_failed": 0,
"bytes_consumed": 0,
"consume_rate": 0,
"processing_time_avg": 0
}
self._lock = threading.Lock()
self._start_time = time.time()
self._processing_times = []
def connect(self):
"""连接到Kafka"""
try:
kafka_config = self.config.to_kafka_config()
# 设置反序列化器
kafka_config["key_deserializer"] = self._deserialize_key
kafka_config["value_deserializer"] = self._deserialize_value
self.consumer = KafkaConsumer(**kafka_config)
print(f"✓ 消费者已连接,组ID: {self.config.group_id}")
except Exception as e:
print(f"✗ 连接Kafka失败: {e}")
raise
def _deserialize_key(self, key_bytes):
"""反序列化键"""
if key_bytes is None:
return None
return key_bytes.decode('utf-8')
def _deserialize_value(self, value_bytes):
"""反序列化值"""
if value_bytes is None:
return None
try:
# 尝试JSON反序列化
return json.loads(value_bytes.decode('utf-8'))
except (json.JSONDecodeError, UnicodeDecodeError):
# 回退到字符串
try:
return value_bytes.decode('utf-8')
except UnicodeDecodeError:
# 回退到字节
return value_bytes
def subscribe(self, topics: List[str]):
"""订阅主题"""
if not self.consumer:
raise RuntimeError("消费者未连接")
self.consumer.subscribe(topics)
self.subscribed_topics.update(topics)
print(f"已订阅主题: {topics}")
def add_message_handler(self, topic: str, handler: Callable):
"""添加消息处理器"""
self.message_handlers[topic] = handler
def poll_messages(self, timeout_ms: int = 1000) -> List[Dict[str, any]]:
"""拉取消息"""
if not self.consumer:
raise RuntimeError("消费者未连接")
try:
message_batch = self.consumer.poll(timeout_ms=timeout_ms)
messages = []
for topic_partition, records in message_batch.items():
for record in records:
message = {
"topic": record.topic,
"partition": record.partition,
"offset": record.offset,
"key": record.key,
"value": record.value,
"timestamp": record.timestamp,
"headers": dict(record.headers) if record.headers else {}
}
messages.append(message)
# 更新指标
with self._lock:
self.metrics["messages_consumed"] += 1
if isinstance(record.value, (str, bytes)):
self.metrics["bytes_consumed"] += len(str(record.value))
return messages
except KafkaError as e:
print(f"拉取消息失败: {e}")
return []
def process_message(self, message: Dict[str, any]) -> bool:
"""处理单条消息"""
start_time = time.time()
try:
topic = message["topic"]
# 查找处理器
handler = self.message_handlers.get(topic)
if handler:
handler(message)
else:
# 默认处理器
self._default_message_handler(message)
# 更新指标
processing_time = time.time() - start_time
with self._lock:
self.metrics["messages_processed"] += 1
self._processing_times.append(processing_time)
# 保持最近100次处理时间
if len(self._processing_times) > 100:
self._processing_times.pop(0)
self.metrics["processing_time_avg"] = sum(self._processing_times) / len(self._processing_times)
return True
except Exception as e:
print(f"处理消息失败: {e}")
with self._lock:
self.metrics["messages_failed"] += 1
return False
def _default_message_handler(self, message: Dict[str, any]):
"""默认消息处理器"""
print(f"收到消息 - 主题: {message['topic']}, 分区: {message['partition']}, "
f"偏移量: {message['offset']}, 值: {message['value']}")
def start_consuming(self, auto_commit: bool = True):
"""开始消费"""
if not self.consumer:
raise RuntimeError("消费者未连接")
if not self.subscribed_topics:
raise RuntimeError("未订阅任何主题")
self.running = True
print("开始消费消息...")
try:
while self.running:
messages = self.poll_messages()
for message in messages:
if not self.running:
break
self.process_message(message)
# 手动提交offset
if not auto_commit and messages:
self.commit_offsets()
except KeyboardInterrupt:
print("收到中断信号,停止消费")
except Exception as e:
print(f"消费过程中出错: {e}")
finally:
self.running = False
def stop_consuming(self):
"""停止消费"""
self.running = False
print("停止消费")
def commit_offsets(self, offsets: Optional[Dict] = None):
"""提交offset"""
if not self.consumer:
return
try:
if offsets:
self.consumer.commit(offsets)
else:
self.consumer.commit()
except Exception as e:
print(f"提交offset失败: {e}")
def seek_to_offset(self, topic: str, partition: int, offset: int):
"""跳转到指定offset"""
if not self.consumer:
raise RuntimeError("消费者未连接")
from kafka import TopicPartition
tp = TopicPartition(topic, partition)
self.consumer.seek(tp, offset)
print(f"已跳转到 {topic}:{partition}:{offset}")
def seek_to_beginning(self, topic: str, partition: int):
"""跳转到分区开始"""
if not self.consumer:
raise RuntimeError("消费者未连接")
from kafka import TopicPartition
tp = TopicPartition(topic, partition)
self.consumer.seek_to_beginning(tp)
print(f"已跳转到 {topic}:{partition} 开始位置")
def seek_to_end(self, topic: str, partition: int):
"""跳转到分区末尾"""
if not self.consumer:
raise RuntimeError("消费者未连接")
from kafka import TopicPartition
tp = TopicPartition(topic, partition)
self.consumer.seek_to_end(tp)
print(f"已跳转到 {topic}:{partition} 末尾位置")
def get_current_offsets(self) -> Dict[str, Dict[int, int]]:
"""获取当前offset"""
if not self.consumer:
return {}
offsets = {}
for tp in self.consumer.assignment():
topic = tp.topic
partition = tp.partition
offset = self.consumer.position(tp)
if topic not in offsets:
offsets[topic] = {}
offsets[topic][partition] = offset
return offsets
def close(self):
"""关闭消费者"""
self.running = False
if self.consumer:
self.consumer.close()
self.consumer = None
print("消费者已关闭")
def get_metrics(self) -> Dict[str, any]:
"""获取指标"""
with self._lock:
current_time = time.time()
elapsed_time = current_time - self._start_time
if elapsed_time > 0:
self.metrics["consume_rate"] = self.metrics["messages_consumed"] / elapsed_time
return self.metrics.copy()
def reset_metrics(self):
"""重置指标"""
with self._lock:
self.metrics = {
"messages_consumed": 0,
"messages_processed": 0,
"messages_failed": 0,
"bytes_consumed": 0,
"consume_rate": 0,
"processing_time_avg": 0
}
self._start_time = time.time()
self._processing_times = []
class MessageProcessor:
"""消息处理器基类"""
def __init__(self):
self.filters = []
self.transformers = []
self.validators = []
def add_filter(self, filter_func: Callable) -> 'MessageProcessor':
"""添加过滤器"""
self.filters.append(filter_func)
return self
def add_transformer(self, transformer_func: Callable) -> 'MessageProcessor':
"""添加转换器"""
self.transformers.append(transformer_func)
return self
def add_validator(self, validator_func: Callable) -> 'MessageProcessor':
"""添加验证器"""
self.validators.append(validator_func)
return self
def process(self, message: Dict[str, any]) -> Optional[Dict[str, any]]:
"""处理消息"""
# 应用过滤器
for filter_func in self.filters:
if not filter_func(message):
return None # 消息被过滤
# 应用转换器
for transformer in self.transformers:
message = transformer(message)
if message is None:
return None
# 应用验证器
for validator in self.validators:
if not validator(message):
raise ValueError("消息验证失败")
return message
class BatchMessageProcessor:
"""批量消息处理器"""
def __init__(self, batch_size: int = 100, flush_interval: float = 5.0):
self.batch_size = batch_size
self.flush_interval = flush_interval
self.batch = []
self.last_flush = time.time()
self.batch_handlers = {}
def add_batch_handler(self, topic: str, handler: Callable):
"""添加批量处理器"""
self.batch_handlers[topic] = handler
def add_message(self, message: Dict[str, any]):
"""添加消息到批次"""
self.batch.append(message)
# 检查是否需要刷新
current_time = time.time()
if (len(self.batch) >= self.batch_size or
current_time - self.last_flush >= self.flush_interval):
self.flush_batch()
def flush_batch(self):
"""刷新批次"""
if not self.batch:
return
# 按主题分组
topic_batches = {}
for message in self.batch:
topic = message["topic"]
if topic not in topic_batches:
topic_batches[topic] = []
topic_batches[topic].append(message)
# 处理每个主题的批次
for topic, messages in topic_batches.items():
handler = self.batch_handlers.get(topic)
if handler:
try:
handler(messages)
except Exception as e:
print(f"批量处理失败 - 主题: {topic}, 错误: {e}")
else:
self._default_batch_handler(topic, messages)
# 清空批次
self.batch = []
self.last_flush = time.time()
def _default_batch_handler(self, topic: str, messages: List[Dict[str, any]]):
"""默认批量处理器"""
print(f"批量处理 {len(messages)} 条消息 - 主题: {topic}")
# 使用示例
if __name__ == "__main__":
# 配置消费者
config = ConsumerConfig(
bootstrap_servers="localhost:9092",
group_id="demo-consumer-group",
auto_offset_reset="earliest",
enable_auto_commit=False
)
# 创建消费者
consumer = KafkaMessageConsumer(config)
consumer.connect()
# 定义消息处理器
def user_event_handler(message):
print(f"处理用户事件: {message['value']}")
def order_event_handler(message):
print(f"处理订单事件: {message['value']}")
# 添加处理器
consumer.add_message_handler("user-events", user_event_handler)
consumer.add_message_handler("order-events", order_event_handler)
# 订阅主题
consumer.subscribe(["user-events", "order-events", "test-topic"])
try:
# 手动拉取和处理消息
for i in range(10):
messages = consumer.poll_messages(timeout_ms=1000)
print(f"拉取到 {len(messages)} 条消息")
for message in messages:
consumer.process_message(message)
# 手动提交offset
if messages:
consumer.commit_offsets()
time.sleep(1)
# 查看指标
metrics = consumer.get_metrics()
print(f"消费者指标: {metrics}")
# 查看当前offset
offsets = consumer.get_current_offsets()
print(f"当前offset: {offsets}")
finally:
consumer.close()
高级消费者功能
class AdvancedKafkaConsumer:
"""高级Kafka消费者"""
def __init__(self, config: ConsumerConfig):
self.config = config
self.consumer = None
self.rebalance_listener = None
self.error_handler = None
self.retry_policy = None
self.dead_letter_queue = None
def set_rebalance_listener(self, listener):
"""设置重平衡监听器"""
self.rebalance_listener = listener
def set_error_handler(self, handler):
"""设置错误处理器"""
self.error_handler = handler
def set_retry_policy(self, policy):
"""设置重试策略"""
self.retry_policy = policy
def consume_with_retry(self, message: Dict[str, any],
max_retries: int = 3) -> bool:
"""带重试的消费"""
for attempt in range(max_retries + 1):
try:
# 处理消息
self.process_message(message)
return True
except Exception as e:
print(f"处理失败,尝试 {attempt + 1}/{max_retries + 1}: {e}")
if attempt == max_retries:
# 最后一次尝试失败,发送到死信队列
if self.dead_letter_queue:
self.send_to_dlq(message, str(e))
return False
# 等待后重试
time.sleep(2 ** attempt)
return False
def send_to_dlq(self, message: Dict[str, any], error: str):
"""发送到死信队列"""
dlq_message = {
"original_message": message,
"error": error,
"timestamp": int(time.time() * 1000),
"retry_count": message.get("retry_count", 0)
}
# 这里应该发送到实际的死信队列主题
print(f"发送到死信队列: {dlq_message}")
class ConsumerRebalanceListener:
"""消费者重平衡监听器"""
def __init__(self, consumer):
self.consumer = consumer
self.current_offsets = {}
def on_partitions_revoked(self, revoked):
"""分区被撤销时调用"""
print(f"分区被撤销: {revoked}")
# 保存当前offset
for tp in revoked:
offset = self.consumer.position(tp)
self.current_offsets[tp] = offset
# 提交offset
if self.current_offsets:
self.consumer.commit(self.current_offsets)
def on_partitions_assigned(self, assigned):
"""分区被分配时调用"""
print(f"分区被分配: {assigned}")
# 可以在这里设置特定的offset位置
for tp in assigned:
# 例如:跳转到特定位置
# self.consumer.seek(tp, specific_offset)
pass
class MessageFilters:
"""消息过滤器集合"""
@staticmethod
def by_timestamp(start_time: int, end_time: int = None):
"""按时间戳过滤"""
def filter_func(message):
timestamp = message.get("timestamp", 0)
if timestamp < start_time:
return False
if end_time and timestamp > end_time:
return False
return True
return filter_func
@staticmethod
def by_key_pattern(pattern: str):
"""按键模式过滤"""
import re
regex = re.compile(pattern)
def filter_func(message):
key = message.get("key")
if key is None:
return False
return bool(regex.match(str(key)))
return filter_func
@staticmethod
def by_value_field(field: str, expected_value: any):
"""按值字段过滤"""
def filter_func(message):
value = message.get("value", {})
if isinstance(value, dict):
return value.get(field) == expected_value
return False
return filter_func
@staticmethod
def by_header(header_key: str, header_value: str = None):
"""按头部过滤"""
def filter_func(message):
headers = message.get("headers", {})
if header_key not in headers:
return False
if header_value is not None:
return headers[header_key] == header_value
return True
return filter_func
class MessageTransformers:
"""消息转换器集合"""
@staticmethod
def add_processing_timestamp(message):
"""添加处理时间戳"""
if isinstance(message.get("value"), dict):
message["value"]["processing_timestamp"] = int(time.time() * 1000)
return message
@staticmethod
def extract_json_field(field_name: str):
"""提取JSON字段"""
def transformer(message):
value = message.get("value", {})
if isinstance(value, dict) and field_name in value:
message["extracted_field"] = value[field_name]
return message
return transformer
@staticmethod
def normalize_timestamp(timestamp_field: str = "timestamp"):
"""标准化时间戳"""
def transformer(message):
value = message.get("value", {})
if isinstance(value, dict) and timestamp_field in value:
timestamp = value[timestamp_field]
# 转换为毫秒时间戳
if isinstance(timestamp, str):
from datetime import datetime
dt = datetime.fromisoformat(timestamp.replace('Z', '+00:00'))
value[timestamp_field] = int(dt.timestamp() * 1000)
elif len(str(timestamp)) == 10: # 秒级时间戳
value[timestamp_field] = timestamp * 1000
return message
return transformer
@staticmethod
def enrich_with_metadata(metadata: Dict[str, any]):
"""丰富元数据"""
def transformer(message):
if isinstance(message.get("value"), dict):
message["value"]["metadata"] = metadata
return message
return transformer
## 3.4 消费者组管理
### 消费者组操作
```python
class KafkaConsumerGroupManager:
"""Kafka消费者组管理器"""
def __init__(self, bootstrap_servers: str = "localhost:9092",
kafka_home: str = "/opt/kafka"):
self.bootstrap_servers = bootstrap_servers
self.kafka_home = kafka_home
self.bin_dir = f"{kafka_home}/bin"
def list_consumer_groups(self) -> List[Dict[str, str]]:
"""列出所有消费者组"""
cmd = [
f"{self.bin_dir}/kafka-consumer-groups.sh",
"--bootstrap-server", self.bootstrap_servers,
"--list"
]
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
check=True
)
groups = []
for line in result.stdout.strip().split('\n'):
if line.strip():
groups.append({"group_id": line.strip()})
return groups
except subprocess.CalledProcessError as e:
print(f"获取消费者组列表失败: {e.stderr}")
return []
def describe_consumer_group(self, group_id: str) -> Dict[str, any]:
"""描述消费者组详情"""
cmd = [
f"{self.bin_dir}/kafka-consumer-groups.sh",
"--bootstrap-server", self.bootstrap_servers,
"--describe",
"--group", group_id
]
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
check=True
)
return self._parse_group_description(result.stdout)
except subprocess.CalledProcessError as e:
return {
"success": False,
"error": e.stderr
}
def _parse_group_description(self, output: str) -> Dict[str, any]:
"""解析消费者组描述输出"""
lines = output.strip().split('\n')
if len(lines) < 2:
return {"success": False, "error": "无效输出"}
# 跳过标题行
header_line = lines[0]
data_lines = lines[1:]
members = []
for line in data_lines:
if line.strip():
parts = line.split()
if len(parts) >= 6:
member = {
"topic": parts[0],
"partition": int(parts[1]),
"current_offset": int(parts[2]) if parts[2] != "-" else None,
"log_end_offset": int(parts[3]) if parts[3] != "-" else None,
"lag": int(parts[4]) if parts[4] != "-" else None,
"consumer_id": parts[5] if len(parts) > 5 else None,
"host": parts[6] if len(parts) > 6 else None,
"client_id": parts[7] if len(parts) > 7 else None
}
members.append(member)
return {
"success": True,
"members": members
}
def reset_consumer_group_offsets(self, group_id: str, topic: str,
reset_type: str = "earliest") -> Dict[str, any]:
"""重置消费者组offset"""
cmd = [
f"{self.bin_dir}/kafka-consumer-groups.sh",
"--bootstrap-server", self.bootstrap_servers,
"--group", group_id,
"--topic", topic,
"--reset-offsets",
f"--to-{reset_type}",
"--execute"
]
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
check=True
)
return {
"success": True,
"message": f"消费者组 {group_id} 的offset已重置",
"output": result.stdout
}
except subprocess.CalledProcessError as e:
return {
"success": False,
"message": f"重置offset失败: {e.stderr}",
"error": e.stderr
}
def delete_consumer_group(self, group_id: str) -> Dict[str, any]:
"""删除消费者组"""
cmd = [
f"{self.bin_dir}/kafka-consumer-groups.sh",
"--bootstrap-server", self.bootstrap_servers,
"--delete",
"--group", group_id
]
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
check=True
)
return {
"success": True,
"message": f"消费者组 {group_id} 已删除",
"output": result.stdout
}
except subprocess.CalledProcessError as e:
return {
"success": False,
"message": f"删除消费者组失败: {e.stderr}",
"error": e.stderr
}
def get_consumer_group_lag(self, group_id: str) -> Dict[str, any]:
"""获取消费者组延迟"""
description = self.describe_consumer_group(group_id)
if not description.get("success"):
return description
total_lag = 0
partition_lags = {}
for member in description["members"]:
topic = member["topic"]
partition = member["partition"]
lag = member["lag"]
if lag is not None:
total_lag += lag
if topic not in partition_lags:
partition_lags[topic] = {}
partition_lags[topic][partition] = lag
return {
"success": True,
"group_id": group_id,
"total_lag": total_lag,
"partition_lags": partition_lags
}
# 使用示例
if __name__ == "__main__":
group_manager = KafkaConsumerGroupManager()
# 列出消费者组
groups = group_manager.list_consumer_groups()
print(f"消费者组列表: {groups}")
# 描述消费者组
if groups:
group_id = groups[0]["group_id"]
description = group_manager.describe_consumer_group(group_id)
print(f"消费者组详情: {description}")
# 获取延迟信息
lag_info = group_manager.get_consumer_group_lag(group_id)
print(f"消费者组延迟: {lag_info}")
3.5 本章总结
核心知识点
主题管理
- 主题创建、删除、修改
- 分区和副本配置
- 主题配置优化
- 分区数量计算
消息生产
- 生产者配置和优化
- 消息序列化
- 分区策略
- 批量发送和异步发送
- 事务性发送
消息消费
- 消费者配置和优化
- 消息反序列化
- Offset管理
- 消费者组和重平衡
- 消息处理和错误处理
消费者组管理
- 消费者组监控
- Offset重置
- 延迟监控
- 消费者组删除
最佳实践
性能优化
- 合理设置批处理大小
- 选择合适的压缩算法
- 优化分区数量
- 调整缓冲区大小
可靠性保证
- 设置合适的acks参数
- 启用重试机制
- 实现幂等性
- 使用事务保证一致性
监控和运维
- 监控生产和消费速率
- 跟踪消费者延迟
- 监控错误率
- 定期检查主题配置
错误处理
- 实现重试机制
- 使用死信队列
- 记录详细日志
- 设置告警机制
练习题
基础练习
- 创建一个包含3个分区、副本因子为2的主题
- 编写生产者发送100条测试消息
- 编写消费者消费这些消息并打印
- 查看消费者组的延迟情况
进阶练习
- 实现自定义分区器,按用户ID进行分区
- 实现消息过滤器,只处理特定类型的消息
- 实现批量消息处理器
- 添加消息拦截器记录处理时间
项目练习
- 构建一个日志收集系统
- 实现一个事件驱动的订单处理系统
- 创建一个实时数据分析管道
- 构建一个消息重试和死信队列机制
通过本章的学习,你应该掌握了Kafka的基本操作,包括主题管理、消息生产和消费、以及消费者组管理。这些是使用Kafka的基础技能,为后续的高级功能学习打下了坚实的基础。