5.1 性能调优
5.1.1 生产者性能优化
Kafka生产者的性能直接影响整个系统的吞吐量,通过合理的配置和优化策略可以显著提升性能。
import time
import threading
import queue
import statistics
from typing import Dict, List, Any, Optional, Callable
from dataclasses import dataclass, field
from enum import Enum
import json
import asyncio
from concurrent.futures import ThreadPoolExecutor
class CompressionType(Enum):
"""压缩类型"""
NONE = "none"
GZIP = "gzip"
SNAPPY = "snappy"
LZ4 = "lz4"
ZSTD = "zstd"
class AcksConfig(Enum):
"""确认配置"""
NONE = 0 # 不等待确认
LEADER = 1 # 等待leader确认
ALL = -1 # 等待所有副本确认
@dataclass
class ProducerMetrics:
"""生产者指标"""
total_records_sent: int = 0
total_bytes_sent: int = 0
total_record_errors: int = 0
total_record_retries: int = 0
avg_batch_size: float = 0.0
avg_request_latency_ms: float = 0.0
avg_record_queue_time_ms: float = 0.0
throughput_records_per_sec: float = 0.0
throughput_bytes_per_sec: float = 0.0
compression_ratio: float = 0.0
buffer_pool_wait_time_ms: float = 0.0
# 时间窗口统计
latency_samples: List[float] = field(default_factory=list)
throughput_samples: List[float] = field(default_factory=list)
start_time: float = field(default_factory=time.time)
@dataclass
class ProducerConfig:
"""生产者配置"""
# 性能相关配置
batch_size: int = 16384 # 16KB
linger_ms: int = 0 # 批次延迟时间
buffer_memory: int = 33554432 # 32MB
compression_type: CompressionType = CompressionType.NONE
acks: AcksConfig = AcksConfig.ALL
retries: int = 2147483647 # 最大重试次数
max_in_flight_requests_per_connection: int = 5
request_timeout_ms: int = 30000
delivery_timeout_ms: int = 120000
# 分区和序列化
partitioner_class: str = "default"
key_serializer: str = "string"
value_serializer: str = "string"
# 网络配置
send_buffer_bytes: int = 131072 # 128KB
receive_buffer_bytes: int = 65536 # 64KB
max_request_size: int = 1048576 # 1MB
# 安全配置
security_protocol: str = "PLAINTEXT"
enable_idempotence: bool = False
class RecordBatch:
"""记录批次"""
def __init__(self, topic: str, partition: int, max_size: int):
self.topic = topic
self.partition = partition
self.max_size = max_size
self.records = []
self.size_bytes = 0
self.created_time = time.time()
self.last_append_time = time.time()
self.closed = False
def try_append(self, key: Any, value: Any, timestamp: Optional[int] = None) -> bool:
"""尝试添加记录到批次"""
if self.closed:
return False
# 估算记录大小
record_size = self._estimate_record_size(key, value)
if self.size_bytes + record_size > self.max_size and self.records:
return False # 批次已满
record = {
"key": key,
"value": value,
"timestamp": timestamp or int(time.time() * 1000),
"offset": len(self.records)
}
self.records.append(record)
self.size_bytes += record_size
self.last_append_time = time.time()
return True
def close(self):
"""关闭批次"""
self.closed = True
def is_full(self) -> bool:
"""检查批次是否已满"""
return self.size_bytes >= self.max_size
def is_expired(self, linger_ms: int) -> bool:
"""检查批次是否过期"""
if not self.records:
return False
return (time.time() - self.created_time) * 1000 >= linger_ms
def _estimate_record_size(self, key: Any, value: Any) -> int:
"""估算记录大小"""
key_size = len(str(key).encode('utf-8')) if key else 0
value_size = len(str(value).encode('utf-8')) if value else 0
return key_size + value_size + 50 # 加上元数据开销
class PartitionerStrategy:
"""分区策略"""
@staticmethod
def default_partitioner(key: Any, num_partitions: int) -> int:
"""默认分区器"""
if key is None:
return hash(time.time()) % num_partitions
return hash(str(key)) % num_partitions
@staticmethod
def round_robin_partitioner(key: Any, num_partitions: int,
counter: Dict[str, int]) -> int:
"""轮询分区器"""
topic_key = "round_robin_counter"
counter[topic_key] = counter.get(topic_key, 0) + 1
return counter[topic_key] % num_partitions
@staticmethod
def sticky_partitioner(key: Any, num_partitions: int,
sticky_partition: Dict[str, int]) -> int:
"""粘性分区器(提高批次效率)"""
if key is not None:
return hash(str(key)) % num_partitions
# 对于null key,使用粘性分区
topic_key = "sticky_partition"
if topic_key not in sticky_partition:
sticky_partition[topic_key] = hash(time.time()) % num_partitions
return sticky_partition[topic_key]
class CompressionEngine:
"""压缩引擎"""
@staticmethod
def compress(data: bytes, compression_type: CompressionType) -> bytes:
"""压缩数据"""
if compression_type == CompressionType.NONE:
return data
elif compression_type == CompressionType.GZIP:
import gzip
return gzip.compress(data)
elif compression_type == CompressionType.LZ4:
# 模拟LZ4压缩
return data # 实际应使用lz4库
elif compression_type == CompressionType.SNAPPY:
# 模拟Snappy压缩
return data # 实际应使用snappy库
elif compression_type == CompressionType.ZSTD:
# 模拟ZSTD压缩
return data # 实际应使用zstd库
else:
return data
@staticmethod
def get_compression_ratio(original_size: int, compressed_size: int) -> float:
"""计算压缩比"""
if original_size == 0:
return 1.0
return compressed_size / original_size
class OptimizedKafkaProducer:
"""优化的Kafka生产者"""
def __init__(self, config: ProducerConfig):
self.config = config
self.metrics = ProducerMetrics()
self.batches = {} # {topic-partition: RecordBatch}
self.send_queue = queue.Queue()
self.result_futures = {}
# 分区相关
self.partition_counter = {}
self.sticky_partitions = {}
self.topic_metadata = {} # {topic: {partitions: int}}
# 线程池和控制
self.sender_thread = None
self.batch_expiry_thread = None
self.running = False
self.lock = threading.RLock()
# 缓冲区管理
self.buffer_pool = BufferPool(config.buffer_memory)
# 网络客户端
self.network_client = NetworkClient(config)
def start(self):
"""启动生产者"""
self.running = True
self.sender_thread = threading.Thread(target=self._sender_loop)
self.sender_thread.daemon = True
self.sender_thread.start()
self.batch_expiry_thread = threading.Thread(target=self._batch_expiry_loop)
self.batch_expiry_thread.daemon = True
self.batch_expiry_thread.start()
print("优化的Kafka生产者已启动")
def send(self, topic: str, key: Any = None, value: Any = None,
partition: Optional[int] = None, timestamp: Optional[int] = None,
callback: Optional[Callable] = None) -> 'ProducerFuture':
"""发送消息"""
if not self.running:
raise RuntimeError("生产者未启动")
# 获取主题元数据
if topic not in self.topic_metadata:
self.topic_metadata[topic] = {"partitions": 3} # 默认3个分区
# 确定分区
if partition is None:
num_partitions = self.topic_metadata[topic]["partitions"]
if self.config.partitioner_class == "round_robin":
partition = PartitionerStrategy.round_robin_partitioner(
key, num_partitions, self.partition_counter
)
elif self.config.partitioner_class == "sticky":
partition = PartitionerStrategy.sticky_partitioner(
key, num_partitions, self.sticky_partitions
)
else:
partition = PartitionerStrategy.default_partitioner(key, num_partitions)
# 创建Future
future = ProducerFuture()
if callback:
future.add_callback(callback)
# 添加到批次
with self.lock:
batch_key = f"{topic}-{partition}"
if batch_key not in self.batches:
self.batches[batch_key] = RecordBatch(
topic, partition, self.config.batch_size
)
batch = self.batches[batch_key]
# 尝试添加到现有批次
if not batch.try_append(key, value, timestamp):
# 当前批次已满,发送并创建新批次
self._send_batch(batch)
# 创建新批次
self.batches[batch_key] = RecordBatch(
topic, partition, self.config.batch_size
)
new_batch = self.batches[batch_key]
new_batch.try_append(key, value, timestamp)
batch = new_batch
# 记录Future
record_id = f"{batch_key}-{len(batch.records)-1}"
self.result_futures[record_id] = future
# 如果批次已满或不等待,立即发送
if batch.is_full() or self.config.linger_ms == 0:
self._send_batch(batch)
del self.batches[batch_key]
return future
def _send_batch(self, batch: RecordBatch):
"""发送批次"""
batch.close()
self.send_queue.put(batch)
def _sender_loop(self):
"""发送线程循环"""
while self.running:
try:
batch = self.send_queue.get(timeout=1.0)
self._process_batch(batch)
except queue.Empty:
continue
except Exception as e:
print(f"发送线程错误: {e}")
def _process_batch(self, batch: RecordBatch):
"""处理批次"""
start_time = time.time()
try:
# 序列化数据
serialized_data = self._serialize_batch(batch)
# 压缩数据
original_size = len(serialized_data)
compressed_data = CompressionEngine.compress(
serialized_data, self.config.compression_type
)
compressed_size = len(compressed_data)
# 发送到网络
response = self.network_client.send_batch(
batch.topic, batch.partition, compressed_data
)
# 更新指标
self._update_metrics(batch, original_size, compressed_size, start_time)
# 完成Futures
self._complete_batch_futures(batch, response)
except Exception as e:
print(f"处理批次失败: {e}")
self._fail_batch_futures(batch, e)
def _serialize_batch(self, batch: RecordBatch) -> bytes:
"""序列化批次"""
# 简化的序列化实现
data = {
"topic": batch.topic,
"partition": batch.partition,
"records": batch.records
}
return json.dumps(data).encode('utf-8')
def _update_metrics(self, batch: RecordBatch, original_size: int,
compressed_size: int, start_time: float):
"""更新指标"""
end_time = time.time()
latency = (end_time - start_time) * 1000 # 转换为毫秒
with self.lock:
self.metrics.total_records_sent += len(batch.records)
self.metrics.total_bytes_sent += compressed_size
self.metrics.latency_samples.append(latency)
# 计算平均值
if self.metrics.latency_samples:
self.metrics.avg_request_latency_ms = statistics.mean(
self.metrics.latency_samples[-100:] # 最近100个样本
)
# 计算压缩比
if original_size > 0:
compression_ratio = CompressionEngine.get_compression_ratio(
original_size, compressed_size
)
self.metrics.compression_ratio = compression_ratio
# 计算吞吐量
elapsed_time = end_time - self.metrics.start_time
if elapsed_time > 0:
self.metrics.throughput_records_per_sec = (
self.metrics.total_records_sent / elapsed_time
)
self.metrics.throughput_bytes_per_sec = (
self.metrics.total_bytes_sent / elapsed_time
)
def _complete_batch_futures(self, batch: RecordBatch, response: Dict[str, Any]):
"""完成批次的Futures"""
batch_key = f"{batch.topic}-{batch.partition}"
for i, record in enumerate(batch.records):
record_id = f"{batch_key}-{i}"
if record_id in self.result_futures:
future = self.result_futures[record_id]
# 创建记录元数据
metadata = {
"topic": batch.topic,
"partition": batch.partition,
"offset": response.get("base_offset", 0) + i,
"timestamp": record["timestamp"]
}
future.complete(metadata)
del self.result_futures[record_id]
def _fail_batch_futures(self, batch: RecordBatch, error: Exception):
"""失败批次的Futures"""
batch_key = f"{batch.topic}-{batch.partition}"
for i in range(len(batch.records)):
record_id = f"{batch_key}-{i}"
if record_id in self.result_futures:
future = self.result_futures[record_id]
future.fail(error)
del self.result_futures[record_id]
def _batch_expiry_loop(self):
"""批次过期检查循环"""
while self.running:
try:
with self.lock:
expired_batches = []
for batch_key, batch in self.batches.items():
if batch.is_expired(self.config.linger_ms):
expired_batches.append((batch_key, batch))
# 发送过期批次
for batch_key, batch in expired_batches:
self._send_batch(batch)
del self.batches[batch_key]
time.sleep(0.1) # 100ms检查间隔
except Exception as e:
print(f"批次过期检查错误: {e}")
def flush(self, timeout_ms: Optional[int] = None):
"""刷新所有待发送的消息"""
start_time = time.time()
with self.lock:
# 发送所有待处理的批次
for batch_key, batch in list(self.batches.items()):
if batch.records:
self._send_batch(batch)
del self.batches[batch_key]
# 等待发送完成
while self.result_futures:
if timeout_ms and (time.time() - start_time) * 1000 > timeout_ms:
break
time.sleep(0.01)
def get_metrics(self) -> ProducerMetrics:
"""获取生产者指标"""
with self.lock:
return self.metrics
def close(self, timeout_ms: int = 30000):
"""关闭生产者"""
print("正在关闭生产者...")
# 刷新待发送消息
self.flush(timeout_ms)
# 停止线程
self.running = False
if self.sender_thread:
self.sender_thread.join(timeout=timeout_ms/1000)
if self.batch_expiry_thread:
self.batch_expiry_thread.join(timeout=timeout_ms/1000)
print("生产者已关闭")
class ProducerFuture:
"""生产者Future"""
def __init__(self):
self.completed = False
self.result = None
self.error = None
self.callbacks = []
self.lock = threading.Lock()
def add_callback(self, callback: Callable):
"""添加回调"""
with self.lock:
if self.completed:
self._invoke_callback(callback)
else:
self.callbacks.append(callback)
def complete(self, result: Any):
"""完成Future"""
with self.lock:
if self.completed:
return
self.completed = True
self.result = result
for callback in self.callbacks:
self._invoke_callback(callback)
def fail(self, error: Exception):
"""失败Future"""
with self.lock:
if self.completed:
return
self.completed = True
self.error = error
for callback in self.callbacks:
self._invoke_callback(callback)
def _invoke_callback(self, callback: Callable):
"""调用回调"""
try:
if self.error:
callback(self.error, None)
else:
callback(None, self.result)
except Exception as e:
print(f"回调执行错误: {e}")
def get(self, timeout_ms: Optional[int] = None) -> Any:
"""获取结果"""
start_time = time.time()
while not self.completed:
if timeout_ms and (time.time() - start_time) * 1000 > timeout_ms:
raise TimeoutError("等待结果超时")
time.sleep(0.001)
if self.error:
raise self.error
return self.result
class BufferPool:
"""缓冲区池"""
def __init__(self, total_memory: int):
self.total_memory = total_memory
self.available_memory = total_memory
self.lock = threading.RLock()
self.waiters = []
def allocate(self, size: int, timeout_ms: Optional[int] = None) -> bool:
"""分配内存"""
start_time = time.time()
with self.lock:
while self.available_memory < size:
if timeout_ms and (time.time() - start_time) * 1000 > timeout_ms:
return False
# 等待内存释放
time.sleep(0.001)
self.available_memory -= size
return True
def deallocate(self, size: int):
"""释放内存"""
with self.lock:
self.available_memory += size
self.available_memory = min(self.available_memory, self.total_memory)
class NetworkClient:
"""网络客户端"""
def __init__(self, config: ProducerConfig):
self.config = config
self.connection_pool = {}
self.request_id = 0
self.lock = threading.RLock()
def send_batch(self, topic: str, partition: int, data: bytes) -> Dict[str, Any]:
"""发送批次数据"""
# 模拟网络发送
time.sleep(0.001) # 模拟网络延迟
with self.lock:
self.request_id += 1
# 模拟响应
response = {
"topic": topic,
"partition": partition,
"base_offset": self.request_id * 100,
"timestamp": int(time.time() * 1000),
"error_code": 0
}
return response
# 使用示例
if __name__ == "__main__":
# 创建优化配置
config = ProducerConfig(
batch_size=32768, # 32KB批次
linger_ms=10, # 10ms延迟
compression_type=CompressionType.LZ4,
acks=AcksConfig.ALL,
enable_idempotence=True,
max_in_flight_requests_per_connection=1 # 保证顺序
)
# 创建生产者
producer = OptimizedKafkaProducer(config)
producer.start()
try:
# 发送消息
futures = []
for i in range(1000):
future = producer.send(
topic="performance-test",
key=f"key-{i}",
value={"id": i, "data": f"message-{i}", "timestamp": time.time()}
)
futures.append(future)
if i % 100 == 0:
print(f"已发送 {i} 条消息")
# 等待所有消息发送完成
print("等待消息发送完成...")
for future in futures:
try:
result = future.get(timeout_ms=5000)
# print(f"消息发送成功: {result}")
except Exception as e:
print(f"消息发送失败: {e}")
# 获取性能指标
metrics = producer.get_metrics()
print(f"\n=== 性能指标 ===")
print(f"总发送记录数: {metrics.total_records_sent}")
print(f"总发送字节数: {metrics.total_bytes_sent}")
print(f"平均延迟: {metrics.avg_request_latency_ms:.2f} ms")
print(f"吞吐量: {metrics.throughput_records_per_sec:.2f} records/sec")
print(f"吞吐量: {metrics.throughput_bytes_per_sec:.2f} bytes/sec")
print(f"压缩比: {metrics.compression_ratio:.2f}")
finally:
producer.close()
5.1.2 消费者性能优化
消费者性能优化主要关注拉取效率、处理并发和偏移量管理。
import threading
import queue
import time
from typing import Dict, List, Any, Optional, Callable, Set
from dataclasses import dataclass, field
from concurrent.futures import ThreadPoolExecutor, Future
import statistics
@dataclass
class ConsumerMetrics:
"""消费者指标"""
total_records_consumed: int = 0
total_bytes_consumed: int = 0
total_fetch_requests: int = 0
avg_fetch_latency_ms: float = 0.0
avg_record_processing_time_ms: float = 0.0
throughput_records_per_sec: float = 0.0
throughput_bytes_per_sec: float = 0.0
fetch_size_avg: float = 0.0
fetch_size_max: int = 0
lag_sum: int = 0
# 时间窗口统计
fetch_latency_samples: List[float] = field(default_factory=list)
processing_time_samples: List[float] = field(default_factory=list)
start_time: float = field(default_factory=time.time)
@dataclass
class ConsumerConfig:
"""消费者配置"""
# 拉取配置
fetch_min_bytes: int = 1024 # 1KB
fetch_max_bytes: int = 52428800 # 50MB
fetch_max_wait_ms: int = 500
max_partition_fetch_bytes: int = 1048576 # 1MB
max_poll_records: int = 500
# 会话配置
session_timeout_ms: int = 10000
heartbeat_interval_ms: int = 3000
max_poll_interval_ms: int = 300000
# 偏移量配置
auto_offset_reset: str = "latest" # earliest, latest, none
enable_auto_commit: bool = True
auto_commit_interval_ms: int = 5000
# 性能配置
receive_buffer_bytes: int = 65536 # 64KB
send_buffer_bytes: int = 131072 # 128KB
check_crcs: bool = True
# 并发配置
max_processing_threads: int = 4
processing_queue_size: int = 1000
class FetchRequest:
"""拉取请求"""
def __init__(self, topic_partitions: List[str], max_bytes: int):
self.topic_partitions = topic_partitions
self.max_bytes = max_bytes
self.timestamp = time.time()
self.id = id(self)
class FetchResponse:
"""拉取响应"""
def __init__(self, records: List[Dict[str, Any]], error_code: int = 0):
self.records = records
self.error_code = error_code
self.timestamp = time.time()
self.size_bytes = sum(len(str(record).encode()) for record in records)
class PartitionAssignment:
"""分区分配"""
def __init__(self, topic: str, partition: int):
self.topic = topic
self.partition = partition
self.current_offset = 0
self.high_water_mark = 0
self.lag = 0
self.last_fetch_time = 0
def update_lag(self):
"""更新延迟"""
self.lag = max(0, self.high_water_mark - self.current_offset)
def __str__(self):
return f"{self.topic}-{self.partition}"
class RecordProcessor:
"""记录处理器"""
def __init__(self, processor_func: Callable[[Dict[str, Any]], Any]):
self.processor_func = processor_func
self.processed_count = 0
self.error_count = 0
self.total_processing_time = 0.0
def process(self, record: Dict[str, Any]) -> Any:
"""处理记录"""
start_time = time.time()
try:
result = self.processor_func(record)
self.processed_count += 1
return result
except Exception as e:
self.error_count += 1
raise e
finally:
self.total_processing_time += time.time() - start_time
def get_avg_processing_time(self) -> float:
"""获取平均处理时间"""
if self.processed_count == 0:
return 0.0
return (self.total_processing_time / self.processed_count) * 1000 # 转换为毫秒
class OptimizedKafkaConsumer:
"""优化的Kafka消费者"""
def __init__(self, group_id: str, config: ConsumerConfig):
self.group_id = group_id
self.config = config
self.metrics = ConsumerMetrics()
# 分区分配
self.assignments: Dict[str, PartitionAssignment] = {}
self.subscribed_topics: Set[str] = set()
# 拉取管理
self.fetch_queue = queue.Queue()
self.record_queue = queue.Queue(maxsize=config.processing_queue_size)
self.pending_offsets = {} # {topic-partition: offset}
# 线程管理
self.fetcher_thread = None
self.processor_executor = ThreadPoolExecutor(
max_workers=config.max_processing_threads
)
self.running = False
self.lock = threading.RLock()
# 偏移量管理
self.offset_manager = OffsetManager(group_id, config)
# 处理器
self.record_processor = None
def subscribe(self, topics: List[str]):
"""订阅主题"""
self.subscribed_topics.update(topics)
# 模拟分区分配
for topic in topics:
for partition in range(3): # 假设每个主题3个分区
partition_key = f"{topic}-{partition}"
self.assignments[partition_key] = PartitionAssignment(topic, partition)
print(f"订阅主题: {topics},分配分区: {list(self.assignments.keys())}")
def set_record_processor(self, processor_func: Callable[[Dict[str, Any]], Any]):
"""设置记录处理器"""
self.record_processor = RecordProcessor(processor_func)
def start(self):
"""启动消费者"""
self.running = True
# 启动拉取线程
self.fetcher_thread = threading.Thread(target=self._fetcher_loop)
self.fetcher_thread.daemon = True
self.fetcher_thread.start()
# 启动偏移量提交线程
if self.config.enable_auto_commit:
self.offset_manager.start_auto_commit()
print(f"优化的Kafka消费者已启动,组ID: {self.group_id}")
def poll(self, timeout_ms: int = 1000) -> List[Dict[str, Any]]:
"""拉取消息"""
records = []
start_time = time.time()
while (time.time() - start_time) * 1000 < timeout_ms:
try:
record = self.record_queue.get(timeout=0.1)
records.append(record)
if len(records) >= self.config.max_poll_records:
break
except queue.Empty:
continue
# 更新指标
if records:
self._update_consumption_metrics(records)
return records
def _fetcher_loop(self):
"""拉取线程循环"""
while self.running:
try:
self._fetch_records()
time.sleep(0.01) # 短暂休眠
except Exception as e:
print(f"拉取线程错误: {e}")
time.sleep(1)
def _fetch_records(self):
"""拉取记录"""
if not self.assignments:
return
# 创建拉取请求
topic_partitions = list(self.assignments.keys())
fetch_request = FetchRequest(
topic_partitions=topic_partitions,
max_bytes=self.config.fetch_max_bytes
)
# 执行拉取
start_time = time.time()
response = self._execute_fetch(fetch_request)
fetch_latency = (time.time() - start_time) * 1000
# 更新拉取指标
self._update_fetch_metrics(response, fetch_latency)
# 处理响应
if response.error_code == 0:
self._process_fetch_response(response)
def _execute_fetch(self, request: FetchRequest) -> FetchResponse:
"""执行拉取请求"""
# 模拟网络拉取
time.sleep(0.01) # 模拟网络延迟
# 生成模拟记录
records = []
for tp in request.topic_partitions:
topic, partition = tp.split('-')
assignment = self.assignments[tp]
# 生成一些记录
for i in range(10): # 每个分区10条记录
record = {
"topic": topic,
"partition": int(partition),
"offset": assignment.current_offset + i,
"key": f"key-{assignment.current_offset + i}",
"value": {
"id": assignment.current_offset + i,
"data": f"message-{assignment.current_offset + i}",
"timestamp": int(time.time() * 1000)
},
"timestamp": int(time.time() * 1000)
}
records.append(record)
# 更新偏移量
assignment.current_offset += 10
assignment.high_water_mark = assignment.current_offset + 100
assignment.update_lag()
return FetchResponse(records)
def _process_fetch_response(self, response: FetchResponse):
"""处理拉取响应"""
for record in response.records:
try:
# 如果有处理器,异步处理记录
if self.record_processor:
future = self.processor_executor.submit(
self._process_record_async, record
)
else:
# 直接放入队列
self.record_queue.put(record, timeout=1.0)
# 更新待提交偏移量
tp_key = f"{record['topic']}-{record['partition']}"
self.pending_offsets[tp_key] = record['offset'] + 1
except queue.Full:
print("记录队列已满,丢弃记录")
break
def _process_record_async(self, record: Dict[str, Any]):
"""异步处理记录"""
try:
start_time = time.time()
# 处理记录
result = self.record_processor.process(record)
# 更新处理时间指标
processing_time = (time.time() - start_time) * 1000
with self.lock:
self.metrics.processing_time_samples.append(processing_time)
if len(self.metrics.processing_time_samples) > 1000:
self.metrics.processing_time_samples = self.metrics.processing_time_samples[-500:]
# 将处理后的记录放入队列
processed_record = record.copy()
processed_record['processed_result'] = result
processed_record['processing_time_ms'] = processing_time
self.record_queue.put(processed_record, timeout=1.0)
except Exception as e:
print(f"异步处理记录失败: {e}")
# 即使处理失败,也要将原记录放入队列
error_record = record.copy()
error_record['processing_error'] = str(e)
self.record_queue.put(error_record, timeout=1.0)
def _update_fetch_metrics(self, response: FetchResponse, latency: float):
"""更新拉取指标"""
with self.lock:
self.metrics.total_fetch_requests += 1
self.metrics.fetch_latency_samples.append(latency)
if len(self.metrics.fetch_latency_samples) > 1000:
self.metrics.fetch_latency_samples = self.metrics.fetch_latency_samples[-500:]
# 计算平均拉取延迟
if self.metrics.fetch_latency_samples:
self.metrics.avg_fetch_latency_ms = statistics.mean(
self.metrics.fetch_latency_samples
)
# 更新拉取大小统计
self.metrics.fetch_size_max = max(
self.metrics.fetch_size_max, response.size_bytes
)
# 计算平均拉取大小
if self.metrics.total_fetch_requests > 0:
total_fetch_bytes = sum(
len(str(record).encode()) for record in response.records
)
self.metrics.fetch_size_avg = (
(self.metrics.fetch_size_avg * (self.metrics.total_fetch_requests - 1) +
total_fetch_bytes) / self.metrics.total_fetch_requests
)
def _update_consumption_metrics(self, records: List[Dict[str, Any]]):
"""更新消费指标"""
with self.lock:
self.metrics.total_records_consumed += len(records)
total_bytes = sum(len(str(record).encode()) for record in records)
self.metrics.total_bytes_consumed += total_bytes
# 计算吞吐量
elapsed_time = time.time() - self.metrics.start_time
if elapsed_time > 0:
self.metrics.throughput_records_per_sec = (
self.metrics.total_records_consumed / elapsed_time
)
self.metrics.throughput_bytes_per_sec = (
self.metrics.total_bytes_consumed / elapsed_time
)
# 计算平均处理时间
if self.metrics.processing_time_samples:
self.metrics.avg_record_processing_time_ms = statistics.mean(
self.metrics.processing_time_samples[-100:]
)
# 计算总延迟
self.metrics.lag_sum = sum(
assignment.lag for assignment in self.assignments.values()
)
def commit_sync(self, offsets: Optional[Dict[str, int]] = None):
"""同步提交偏移量"""
commit_offsets = offsets or self.pending_offsets.copy()
self.offset_manager.commit_offsets(commit_offsets)
# 清除已提交的偏移量
for tp in commit_offsets:
if tp in self.pending_offsets:
del self.pending_offsets[tp]
def commit_async(self, offsets: Optional[Dict[str, int]] = None,
callback: Optional[Callable] = None):
"""异步提交偏移量"""
def commit_task():
try:
self.commit_sync(offsets)
if callback:
callback(None, offsets or self.pending_offsets)
except Exception as e:
if callback:
callback(e, None)
thread = threading.Thread(target=commit_task)
thread.daemon = True
thread.start()
def seek_to_beginning(self, topic_partitions: Optional[List[str]] = None):
"""跳转到分区开始"""
partitions = topic_partitions or list(self.assignments.keys())
for tp in partitions:
if tp in self.assignments:
self.assignments[tp].current_offset = 0
print(f"跳转到开始位置: {partitions}")
def seek_to_end(self, topic_partitions: Optional[List[str]] = None):
"""跳转到分区末尾"""
partitions = topic_partitions or list(self.assignments.keys())
for tp in partitions:
if tp in self.assignments:
assignment = self.assignments[tp]
assignment.current_offset = assignment.high_water_mark
print(f"跳转到末尾位置: {partitions}")
def get_metrics(self) -> ConsumerMetrics:
"""获取消费者指标"""
with self.lock:
return self.metrics
def get_assignment_info(self) -> Dict[str, Dict[str, Any]]:
"""获取分区分配信息"""
info = {}
for tp, assignment in self.assignments.items():
info[tp] = {
"current_offset": assignment.current_offset,
"high_water_mark": assignment.high_water_mark,
"lag": assignment.lag,
"last_fetch_time": assignment.last_fetch_time
}
return info
def close(self, timeout_ms: int = 30000):
"""关闭消费者"""
print("正在关闭消费者...")
# 提交待处理的偏移量
if self.pending_offsets:
self.commit_sync()
# 停止线程
self.running = False
if self.fetcher_thread:
self.fetcher_thread.join(timeout=timeout_ms/1000)
# 关闭线程池
self.processor_executor.shutdown(wait=True, timeout=timeout_ms/1000)
# 关闭偏移量管理器
self.offset_manager.close()
print("消费者已关闭")
class OffsetManager:
"""偏移量管理器"""
def __init__(self, group_id: str, config: ConsumerConfig):
self.group_id = group_id
self.config = config
self.committed_offsets = {} # {topic-partition: offset}
self.auto_commit_thread = None
self.running = False
self.lock = threading.RLock()
def start_auto_commit(self):
"""启动自动提交"""
if not self.config.enable_auto_commit:
return
self.running = True
self.auto_commit_thread = threading.Thread(target=self._auto_commit_loop)
self.auto_commit_thread.daemon = True
self.auto_commit_thread.start()
def _auto_commit_loop(self):
"""自动提交循环"""
while self.running:
try:
time.sleep(self.config.auto_commit_interval_ms / 1000)
# 这里应该自动提交偏移量
# 由于是模拟,暂时跳过实际提交
except Exception as e:
print(f"自动提交错误: {e}")
def commit_offsets(self, offsets: Dict[str, int]):
"""提交偏移量"""
with self.lock:
self.committed_offsets.update(offsets)
print(f"提交偏移量: {offsets}")
def get_committed_offsets(self, topic_partitions: List[str]) -> Dict[str, int]:
"""获取已提交的偏移量"""
with self.lock:
return {tp: self.committed_offsets.get(tp, 0) for tp in topic_partitions}
def close(self):
"""关闭偏移量管理器"""
self.running = False
if self.auto_commit_thread:
self.auto_commit_thread.join()
# 使用示例
if __name__ == "__main__":
# 创建优化配置
config = ConsumerConfig(
fetch_min_bytes=10240, # 10KB
fetch_max_bytes=1048576, # 1MB
fetch_max_wait_ms=100,
max_poll_records=1000,
max_processing_threads=8,
enable_auto_commit=True,
auto_commit_interval_ms=1000
)
# 创建消费者
consumer = OptimizedKafkaConsumer("performance-test-group", config)
# 设置记录处理器
def process_record(record):
# 模拟处理逻辑
time.sleep(0.001) # 1ms处理时间
return f"processed-{record['value']['id']}"
consumer.set_record_processor(process_record)
# 订阅主题
consumer.subscribe(["performance-test"])
consumer.start()
try:
# 消费消息
total_consumed = 0
start_time = time.time()
while total_consumed < 1000:
records = consumer.poll(timeout_ms=1000)
if records:
total_consumed += len(records)
print(f"消费了 {len(records)} 条消息,总计: {total_consumed}")
# 处理记录
for record in records:
if 'processed_result' in record:
# print(f"处理结果: {record['processed_result']}")
pass
elif 'processing_error' in record:
print(f"处理错误: {record['processing_error']}")
# 手动提交偏移量
if total_consumed % 100 == 0:
consumer.commit_sync()
# 获取性能指标
metrics = consumer.get_metrics()
assignment_info = consumer.get_assignment_info()
print(f"\n=== 消费者性能指标 ===")
print(f"总消费记录数: {metrics.total_records_consumed}")
print(f"总消费字节数: {metrics.total_bytes_consumed}")
print(f"平均拉取延迟: {metrics.avg_fetch_latency_ms:.2f} ms")
print(f"平均处理时间: {metrics.avg_record_processing_time_ms:.2f} ms")
print(f"吞吐量: {metrics.throughput_records_per_sec:.2f} records/sec")
print(f"吞吐量: {metrics.throughput_bytes_per_sec:.2f} bytes/sec")
print(f"总延迟: {metrics.lag_sum}")
print(f"\n=== 分区分配信息 ===")
for tp, info in assignment_info.items():
print(f"{tp}: offset={info['current_offset']}, lag={info['lag']}")
finally:
consumer.close()
5.1.3 Broker性能优化
Broker是Kafka集群的核心组件,其性能直接影响整个集群的表现。
import os
import psutil
import threading
import time
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, field
from enum import Enum
import json
class LogCleanupPolicy(Enum):
"""日志清理策略"""
DELETE = "delete"
COMPACT = "compact"
COMPACT_DELETE = "compact,delete"
class CompressionType(Enum):
"""压缩类型"""
PRODUCER = "producer"
GZIP = "gzip"
SNAPPY = "snappy"
LZ4 = "lz4"
ZSTD = "zstd"
UNCOMPRESSED = "uncompressed"
@dataclass
class BrokerConfig:
"""Broker配置"""
# 基础配置
broker_id: int = 0
listeners: str = "PLAINTEXT://localhost:9092"
log_dirs: List[str] = field(default_factory=lambda: ["/tmp/kafka-logs"])
# 网络配置
num_network_threads: int = 8
num_io_threads: int = 8
socket_send_buffer_bytes: int = 102400 # 100KB
socket_receive_buffer_bytes: int = 102400 # 100KB
socket_request_max_bytes: int = 104857600 # 100MB
# 日志配置
num_partitions: int = 1
num_recovery_threads_per_data_dir: int = 1
offsets_topic_replication_factor: int = 1
transaction_state_log_replication_factor: int = 1
transaction_state_log_min_isr: int = 1
# 日志保留配置
log_retention_hours: int = 168 # 7天
log_retention_bytes: int = 1073741824 # 1GB
log_segment_bytes: int = 1073741824 # 1GB
log_retention_check_interval_ms: int = 300000 # 5分钟
# 日志清理配置
log_cleanup_policy: LogCleanupPolicy = LogCleanupPolicy.DELETE
log_cleaner_enable: bool = True
log_cleaner_threads: int = 1
log_cleaner_io_max_bytes_per_second: int = 1048576 # 1MB/s
log_cleaner_dedupe_buffer_size: int = 134217728 # 128MB
# 压缩配置
compression_type: CompressionType = CompressionType.PRODUCER
# 副本配置
default_replication_factor: int = 1
min_insync_replicas: int = 1
# ZooKeeper配置
zookeeper_connect: str = "localhost:2181"
zookeeper_connection_timeout_ms: int = 18000
zookeeper_session_timeout_ms: int = 18000
# 性能调优配置
replica_fetch_max_bytes: int = 1048576 # 1MB
replica_fetch_wait_max_ms: int = 500
replica_high_watermark_checkpoint_interval_ms: int = 5000
replica_lag_time_max_ms: int = 30000
# 生产者配置
producer_purgatory_purge_interval_requests: int = 1000
# 消费者配置
fetch_purgatory_purge_interval_requests: int = 1000
# JVM配置
heap_opts: str = "-Xmx1G -Xms1G"
jvm_performance_opts: str = "-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35"
@dataclass
class BrokerMetrics:
"""Broker指标"""
# 消息指标
messages_in_per_sec: float = 0.0
bytes_in_per_sec: float = 0.0
bytes_out_per_sec: float = 0.0
# 请求指标
request_handler_avg_idle_percent: float = 0.0
network_processor_avg_idle_percent: float = 0.0
request_queue_size: int = 0
response_queue_size: int = 0
# 日志指标
log_flush_rate_and_time_ms: float = 0.0
log_size_bytes: int = 0
log_segments_count: int = 0
# 副本指标
under_replicated_partitions: int = 0
under_min_isr_partition_count: int = 0
offline_partitions_count: int = 0
# 系统指标
cpu_usage_percent: float = 0.0
memory_usage_percent: float = 0.0
disk_usage_percent: float = 0.0
network_io_rate: float = 0.0
# GC指标
gc_time_ms: float = 0.0
gc_count: int = 0
# 时间戳
timestamp: float = field(default_factory=time.time)
class SystemMonitor:
"""系统监控器"""
def __init__(self):
self.process = psutil.Process()
def get_cpu_usage(self) -> float:
"""获取CPU使用率"""
return self.process.cpu_percent(interval=1)
def get_memory_usage(self) -> float:
"""获取内存使用率"""
memory_info = self.process.memory_info()
total_memory = psutil.virtual_memory().total
return (memory_info.rss / total_memory) * 100
def get_disk_usage(self, path: str) -> float:
"""获取磁盘使用率"""
disk_usage = psutil.disk_usage(path)
return (disk_usage.used / disk_usage.total) * 100
def get_network_io(self) -> Dict[str, float]:
"""获取网络IO"""
net_io = psutil.net_io_counters()
return {
"bytes_sent": net_io.bytes_sent,
"bytes_recv": net_io.bytes_recv,
"packets_sent": net_io.packets_sent,
"packets_recv": net_io.packets_recv
}
def get_disk_io(self) -> Dict[str, float]:
"""获取磁盘IO"""
disk_io = psutil.disk_io_counters()
return {
"read_bytes": disk_io.read_bytes,
"write_bytes": disk_io.write_bytes,
"read_count": disk_io.read_count,
"write_count": disk_io.write_count
}
class LogManager:
"""日志管理器"""
def __init__(self, config: BrokerConfig):
self.config = config
self.log_dirs = config.log_dirs
self.segments = {} # {topic-partition: [LogSegment]}
self.cleaner_thread = None
self.running = False
def start(self):
"""启动日志管理器"""
self.running = True
if self.config.log_cleaner_enable:
self.cleaner_thread = threading.Thread(target=self._log_cleaner_loop)
self.cleaner_thread.daemon = True
self.cleaner_thread.start()
print("日志管理器已启动")
def _log_cleaner_loop(self):
"""日志清理循环"""
while self.running:
try:
self._clean_logs()
time.sleep(self.config.log_retention_check_interval_ms / 1000)
except Exception as e:
print(f"日志清理错误: {e}")
def _clean_logs(self):
"""清理日志"""
current_time = time.time() * 1000 # 转换为毫秒
retention_ms = self.config.log_retention_hours * 3600 * 1000
for topic_partition, segments in self.segments.items():
# 按时间清理
segments_to_remove = []
for segment in segments:
if current_time - segment.created_time > retention_ms:
segments_to_remove.append(segment)
# 按大小清理
total_size = sum(segment.size for segment in segments)
if total_size > self.config.log_retention_bytes:
# 删除最老的段
segments.sort(key=lambda s: s.created_time)
while total_size > self.config.log_retention_bytes and segments:
segment = segments.pop(0)
segments_to_remove.append(segment)
total_size -= segment.size
# 执行删除
for segment in segments_to_remove:
self._delete_segment(segment)
if segment in segments:
segments.remove(segment)
def _delete_segment(self, segment):
"""删除段文件"""
print(f"删除日志段: {segment.file_path}")
# 实际实现中会删除文件
def get_log_size(self) -> int:
"""获取日志总大小"""
total_size = 0
for segments in self.segments.values():
total_size += sum(segment.size for segment in segments)
return total_size
def get_segment_count(self) -> int:
"""获取段总数"""
total_count = 0
for segments in self.segments.values():
total_count += len(segments)
return total_count
def stop(self):
"""停止日志管理器"""
self.running = False
if self.cleaner_thread:
self.cleaner_thread.join()
class LogSegment:
"""日志段"""
def __init__(self, file_path: str, base_offset: int):
self.file_path = file_path
self.base_offset = base_offset
self.size = 0
self.created_time = time.time() * 1000
self.last_modified_time = self.created_time
def append(self, data: bytes):
"""追加数据"""
self.size += len(data)
self.last_modified_time = time.time() * 1000
class ReplicationManager:
"""副本管理器"""
def __init__(self, config: BrokerConfig):
self.config = config
self.replicas = {} # {topic-partition: ReplicaInfo}
self.under_replicated_partitions = set()
self.offline_partitions = set()
def get_under_replicated_count(self) -> int:
"""获取副本不足的分区数"""
return len(self.under_replicated_partitions)
def get_offline_partitions_count(self) -> int:
"""获取离线分区数"""
return len(self.offline_partitions)
def check_replica_health(self):
"""检查副本健康状态"""
# 模拟副本健康检查
pass
class RequestHandler:
"""请求处理器"""
def __init__(self, config: BrokerConfig):
self.config = config
self.request_queue = []
self.response_queue = []
self.handler_threads = []
self.running = False
self.total_requests = 0
self.total_processing_time = 0.0
def start(self):
"""启动请求处理器"""
self.running = True
# 启动处理线程
for i in range(self.config.num_io_threads):
thread = threading.Thread(target=self._request_handler_loop)
thread.daemon = True
thread.start()
self.handler_threads.append(thread)
print(f"请求处理器已启动,{self.config.num_io_threads}个处理线程")
def _request_handler_loop(self):
"""请求处理循环"""
while self.running:
try:
if self.request_queue:
request = self.request_queue.pop(0)
self._process_request(request)
else:
time.sleep(0.001) # 短暂休眠
except Exception as e:
print(f"请求处理错误: {e}")
def _process_request(self, request):
"""处理请求"""
start_time = time.time()
# 模拟请求处理
time.sleep(0.001) # 1ms处理时间
processing_time = time.time() - start_time
self.total_requests += 1
self.total_processing_time += processing_time
# 生成响应
response = {"request_id": request.get("id"), "status": "success"}
self.response_queue.append(response)
def get_avg_processing_time(self) -> float:
"""获取平均处理时间"""
if self.total_requests == 0:
return 0.0
return (self.total_processing_time / self.total_requests) * 1000 # 转换为毫秒
def get_queue_sizes(self) -> Dict[str, int]:
"""获取队列大小"""
return {
"request_queue_size": len(self.request_queue),
"response_queue_size": len(self.response_queue)
}
def stop(self):
"""停止请求处理器"""
self.running = False
for thread in self.handler_threads:
thread.join()
class OptimizedKafkaBroker:
"""优化的Kafka Broker"""
def __init__(self, config: BrokerConfig):
self.config = config
self.metrics = BrokerMetrics()
# 组件初始化
self.system_monitor = SystemMonitor()
self.log_manager = LogManager(config)
self.replication_manager = ReplicationManager(config)
self.request_handler = RequestHandler(config)
# 监控线程
self.metrics_thread = None
self.running = False
self.lock = threading.RLock()
# 性能计数器
self.message_count = 0
self.bytes_in_count = 0
self.bytes_out_count = 0
self.last_metrics_time = time.time()
def start(self):
"""启动Broker"""
print(f"启动Kafka Broker {self.config.broker_id}...")
self.running = True
# 启动组件
self.log_manager.start()
self.request_handler.start()
# 启动指标收集线程
self.metrics_thread = threading.Thread(target=self._metrics_collection_loop)
self.metrics_thread.daemon = True
self.metrics_thread.start()
print(f"Kafka Broker {self.config.broker_id} 已启动")
def _metrics_collection_loop(self):
"""指标收集循环"""
while self.running:
try:
self._collect_metrics()
time.sleep(10) # 每10秒收集一次指标
except Exception as e:
print(f"指标收集错误: {e}")
def _collect_metrics(self):
"""收集指标"""
current_time = time.time()
time_diff = current_time - self.last_metrics_time
with self.lock:
# 计算速率指标
if time_diff > 0:
self.metrics.messages_in_per_sec = self.message_count / time_diff
self.metrics.bytes_in_per_sec = self.bytes_in_count / time_diff
self.metrics.bytes_out_per_sec = self.bytes_out_count / time_diff
# 重置计数器
self.message_count = 0
self.bytes_in_count = 0
self.bytes_out_count = 0
self.last_metrics_time = current_time
# 系统指标
self.metrics.cpu_usage_percent = self.system_monitor.get_cpu_usage()
self.metrics.memory_usage_percent = self.system_monitor.get_memory_usage()
if self.config.log_dirs:
self.metrics.disk_usage_percent = self.system_monitor.get_disk_usage(
self.config.log_dirs[0]
)
# 日志指标
self.metrics.log_size_bytes = self.log_manager.get_log_size()
self.metrics.log_segments_count = self.log_manager.get_segment_count()
# 副本指标
self.metrics.under_replicated_partitions = (
self.replication_manager.get_under_replicated_count()
)
self.metrics.offline_partitions_count = (
self.replication_manager.get_offline_partitions_count()
)
# 请求处理指标
queue_sizes = self.request_handler.get_queue_sizes()
self.metrics.request_queue_size = queue_sizes["request_queue_size"]
self.metrics.response_queue_size = queue_sizes["response_queue_size"]
# 更新时间戳
self.metrics.timestamp = current_time
def produce_message(self, topic: str, partition: int, message: bytes):
"""生产消息"""
with self.lock:
self.message_count += 1
self.bytes_in_count += len(message)
def consume_message(self, topic: str, partition: int, message_size: int):
"""消费消息"""
with self.lock:
self.bytes_out_count += message_size
def get_metrics(self) -> BrokerMetrics:
"""获取Broker指标"""
with self.lock:
return self.metrics
def get_config_recommendations(self) -> Dict[str, Any]:
"""获取配置建议"""
recommendations = []
# CPU使用率建议
if self.metrics.cpu_usage_percent > 80:
recommendations.append({
"type": "cpu",
"message": "CPU使用率过高,建议增加num.io.threads或num.network.threads",
"current_value": self.metrics.cpu_usage_percent,
"suggested_config": {
"num.io.threads": self.config.num_io_threads * 2,
"num.network.threads": self.config.num_network_threads * 2
}
})
# 内存使用率建议
if self.metrics.memory_usage_percent > 85:
recommendations.append({
"type": "memory",
"message": "内存使用率过高,建议增加JVM堆内存",
"current_value": self.metrics.memory_usage_percent,
"suggested_config": {
"heap_opts": "-Xmx2G -Xms2G" # 增加到2GB
}
})
# 磁盘使用率建议
if self.metrics.disk_usage_percent > 80:
recommendations.append({
"type": "disk",
"message": "磁盘使用率过高,建议调整日志保留策略",
"current_value": self.metrics.disk_usage_percent,
"suggested_config": {
"log.retention.hours": self.config.log_retention_hours // 2,
"log.retention.bytes": self.config.log_retention_bytes // 2
}
})
# 副本不足建议
if self.metrics.under_replicated_partitions > 0:
recommendations.append({
"type": "replication",
"message": "存在副本不足的分区,检查集群健康状态",
"current_value": self.metrics.under_replicated_partitions,
"suggested_action": "检查Broker状态和网络连接"
})
return {
"timestamp": time.time(),
"broker_id": self.config.broker_id,
"recommendations": recommendations
}
def stop(self):
"""停止Broker"""
print(f"正在停止Kafka Broker {self.config.broker_id}...")
self.running = False
# 停止组件
self.log_manager.stop()
self.request_handler.stop()
# 停止指标收集线程
if self.metrics_thread:
self.metrics_thread.join()
print(f"Kafka Broker {self.config.broker_id} 已停止")
# 使用示例
if __name__ == "__main__":
# 创建优化配置
config = BrokerConfig(
broker_id=1,
num_network_threads=16,
num_io_threads=16,
log_retention_hours=72, # 3天
log_segment_bytes=536870912, # 512MB
compression_type=CompressionType.LZ4,
heap_opts="-Xmx2G -Xms2G"
)
# 创建Broker
broker = OptimizedKafkaBroker(config)
broker.start()
try:
# 模拟消息生产和消费
for i in range(1000):
# 模拟生产消息
message = f"test-message-{i}".encode('utf-8')
broker.produce_message("test-topic", 0, message)
# 模拟消费消息
broker.consume_message("test-topic", 0, len(message))
if i % 100 == 0:
print(f"处理了 {i} 条消息")
# 获取指标
metrics = broker.get_metrics()
print(f"消息速率: {metrics.messages_in_per_sec:.2f} msg/sec")
print(f"字节输入速率: {metrics.bytes_in_per_sec:.2f} bytes/sec")
print(f"CPU使用率: {metrics.cpu_usage_percent:.2f}%")
print(f"内存使用率: {metrics.memory_usage_percent:.2f}%")
# 获取配置建议
recommendations = broker.get_config_recommendations()
if recommendations["recommendations"]:
print("\n=== 配置建议 ===")
for rec in recommendations["recommendations"]:
print(f"- {rec['message']}")
print("-" * 50)
time.sleep(0.01) # 10ms间隔
finally:
broker.stop()
5.2 监控与告警
5.2.1 JMX监控
Kafka通过JMX(Java Management Extensions)暴露大量的监控指标。
import json
import time
import threading
from typing import Dict, List, Any, Optional, Callable
from dataclasses import dataclass, field
from enum import Enum
import statistics
class MetricType(Enum):
"""指标类型"""
COUNTER = "counter"
GAUGE = "gauge"
HISTOGRAM = "histogram"
TIMER = "timer"
class AlertLevel(Enum):
"""告警级别"""
INFO = "info"
WARNING = "warning"
CRITICAL = "critical"
@dataclass
class JMXMetric:
"""JMX指标"""
name: str
value: float
metric_type: MetricType
timestamp: float = field(default_factory=time.time)
tags: Dict[str, str] = field(default_factory=dict)
description: str = ""
@dataclass
class AlertRule:
"""告警规则"""
name: str
metric_name: str
condition: str # ">", "<", ">=", "<=", "==", "!="
threshold: float
level: AlertLevel
duration_seconds: int = 0 # 持续时间
enabled: bool = True
description: str = ""
@dataclass
class Alert:
"""告警"""
rule_name: str
metric_name: str
current_value: float
threshold: float
level: AlertLevel
message: str
timestamp: float = field(default_factory=time.time)
resolved: bool = False
resolved_timestamp: Optional[float] = None
class JMXMetricsCollector:
"""JMX指标收集器"""
def __init__(self):
self.metrics = {} # {metric_name: [JMXMetric]}
self.collection_interval = 10 # 10秒
self.max_history = 1000 # 最大历史记录数
self.running = False
self.collection_thread = None
self.lock = threading.RLock()
def start(self):
"""启动指标收集"""
self.running = True
self.collection_thread = threading.Thread(target=self._collection_loop)
self.collection_thread.daemon = True
self.collection_thread.start()
print("JMX指标收集器已启动")
def _collection_loop(self):
"""收集循环"""
while self.running:
try:
self._collect_kafka_metrics()
time.sleep(self.collection_interval)
except Exception as e:
print(f"指标收集错误: {e}")
def _collect_kafka_metrics(self):
"""收集Kafka指标"""
# 模拟收集各种Kafka JMX指标
current_time = time.time()
# Broker指标
broker_metrics = [
JMXMetric("kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec",
self._simulate_metric_value(100, 1000), MetricType.COUNTER),
JMXMetric("kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec",
self._simulate_metric_value(10000, 100000), MetricType.COUNTER),
JMXMetric("kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec",
self._simulate_metric_value(10000, 100000), MetricType.COUNTER),
JMXMetric("kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions",
self._simulate_metric_value(0, 5), MetricType.GAUGE),
JMXMetric("kafka.server:type=ReplicaManager,name=OfflinePartitionsCount",
self._simulate_metric_value(0, 2), MetricType.GAUGE),
]
# 网络指标
network_metrics = [
JMXMetric("kafka.network:type=RequestMetrics,name=RequestsPerSec,request=Produce",
self._simulate_metric_value(50, 500), MetricType.COUNTER),
JMXMetric("kafka.network:type=RequestMetrics,name=RequestsPerSec,request=FetchConsumer",
self._simulate_metric_value(30, 300), MetricType.COUNTER),
JMXMetric("kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Produce",
self._simulate_metric_value(1, 50), MetricType.TIMER),
JMXMetric("kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchConsumer",
self._simulate_metric_value(1, 30), MetricType.TIMER),
]
# 日志指标
log_metrics = [
JMXMetric("kafka.log:type=LogFlushStats,name=LogFlushRateAndTimeMs",
self._simulate_metric_value(1, 10), MetricType.TIMER),
JMXMetric("kafka.log:type=LogSize,name=Size",
self._simulate_metric_value(1000000, 10000000), MetricType.GAUGE),
]
# JVM指标
jvm_metrics = [
JMXMetric("java.lang:type=Memory,name=HeapMemoryUsage.used",
self._simulate_metric_value(500000000, 1500000000), MetricType.GAUGE),
JMXMetric("java.lang:type=GarbageCollector,name=G1YoungGeneration,CollectionCount",
self._simulate_metric_value(100, 1000), MetricType.COUNTER),
JMXMetric("java.lang:type=GarbageCollector,name=G1YoungGeneration,CollectionTime",
self._simulate_metric_value(1000, 10000), MetricType.COUNTER),
]
# 存储所有指标
all_metrics = broker_metrics + network_metrics + log_metrics + jvm_metrics
with self.lock:
for metric in all_metrics:
if metric.name not in self.metrics:
self.metrics[metric.name] = []
self.metrics[metric.name].append(metric)
# 限制历史记录数量
if len(self.metrics[metric.name]) > self.max_history:
self.metrics[metric.name] = self.metrics[metric.name][-self.max_history//2:]
def _simulate_metric_value(self, min_val: float, max_val: float) -> float:
"""模拟指标值"""
import random
return random.uniform(min_val, max_val)
def get_metric_history(self, metric_name: str,
duration_seconds: int = 3600) -> List[JMXMetric]:
"""获取指标历史"""
with self.lock:
if metric_name not in self.metrics:
return []
cutoff_time = time.time() - duration_seconds
return [
metric for metric in self.metrics[metric_name]
if metric.timestamp >= cutoff_time
]
def get_latest_metric(self, metric_name: str) -> Optional[JMXMetric]:
"""获取最新指标值"""
with self.lock:
if metric_name not in self.metrics or not self.metrics[metric_name]:
return None
return self.metrics[metric_name][-1]
def get_all_metrics(self) -> Dict[str, List[JMXMetric]]:
"""获取所有指标"""
with self.lock:
return self.metrics.copy()
def stop(self):
"""停止指标收集"""
self.running = False
if self.collection_thread:
self.collection_thread.join()
print("JMX指标收集器已停止")
class AlertManager:
"""告警管理器"""
def __init__(self, metrics_collector: JMXMetricsCollector):
self.metrics_collector = metrics_collector
self.rules = {} # {rule_name: AlertRule}
self.active_alerts = {} # {rule_name: Alert}
self.alert_history = [] # List[Alert]
self.alert_callbacks = [] # List[Callable[[Alert], None]]
self.evaluation_interval = 30 # 30秒
self.running = False
self.evaluation_thread = None
self.lock = threading.RLock()
def add_rule(self, rule: AlertRule):
"""添加告警规则"""
with self.lock:
self.rules[rule.name] = rule
print(f"添加告警规则: {rule.name}")
def remove_rule(self, rule_name: str):
"""移除告警规则"""
with self.lock:
if rule_name in self.rules:
del self.rules[rule_name]
if rule_name in self.active_alerts:
del self.active_alerts[rule_name]
print(f"移除告警规则: {rule_name}")
def add_alert_callback(self, callback: Callable[[Alert], None]):
"""添加告警回调"""
self.alert_callbacks.append(callback)
def start(self):
"""启动告警管理器"""
self.running = True
self.evaluation_thread = threading.Thread(target=self._evaluation_loop)
self.evaluation_thread.daemon = True
self.evaluation_thread.start()
print("告警管理器已启动")
def _evaluation_loop(self):
"""告警评估循环"""
while self.running:
try:
self._evaluate_rules()
time.sleep(self.evaluation_interval)
except Exception as e:
print(f"告警评估错误: {e}")
def _evaluate_rules(self):
"""评估告警规则"""
with self.lock:
for rule_name, rule in self.rules.items():
if not rule.enabled:
continue
try:
self._evaluate_single_rule(rule)
except Exception as e:
print(f"评估规则 {rule_name} 失败: {e}")
def _evaluate_single_rule(self, rule: AlertRule):
"""评估单个告警规则"""
# 获取最新指标值
metric = self.metrics_collector.get_latest_metric(rule.metric_name)
if not metric:
return
# 检查条件
condition_met = self._check_condition(metric.value, rule.condition, rule.threshold)
if condition_met:
# 条件满足,检查是否需要触发告警
if rule.name not in self.active_alerts:
# 新告警
alert = Alert(
rule_name=rule.name,
metric_name=rule.metric_name,
current_value=metric.value,
threshold=rule.threshold,
level=rule.level,
message=f"{rule.description or rule.name}: {metric.value} {rule.condition} {rule.threshold}"
)
self.active_alerts[rule.name] = alert
self.alert_history.append(alert)
# 触发回调
self._trigger_alert_callbacks(alert)
print(f"触发告警: {alert.message}")
else:
# 条件不满足,检查是否需要解除告警
if rule.name in self.active_alerts:
alert = self.active_alerts[rule.name]
alert.resolved = True
alert.resolved_timestamp = time.time()
del self.active_alerts[rule.name]
print(f"解除告警: {alert.message}")
def _check_condition(self, value: float, condition: str, threshold: float) -> bool:
"""检查条件"""
if condition == ">":
return value > threshold
elif condition == "<":
return value < threshold
elif condition == ">=":
return value >= threshold
elif condition == "<=":
return value <= threshold
elif condition == "==":
return value == threshold
elif condition == "!=":
return value != threshold
else:
return False
def _trigger_alert_callbacks(self, alert: Alert):
"""触发告警回调"""
for callback in self.alert_callbacks:
try:
callback(alert)
except Exception as e:
print(f"告警回调执行失败: {e}")
def get_active_alerts(self) -> List[Alert]:
"""获取活跃告警"""
with self.lock:
return list(self.active_alerts.values())
def get_alert_history(self, hours: int = 24) -> List[Alert]:
"""获取告警历史"""
cutoff_time = time.time() - (hours * 3600)
return [
alert for alert in self.alert_history
if alert.timestamp >= cutoff_time
]
def stop(self):
"""停止告警管理器"""
self.running = False
if self.evaluation_thread:
self.evaluation_thread.join()
print("告警管理器已停止")
class MetricsAnalyzer:
"""指标分析器"""
def __init__(self, metrics_collector: JMXMetricsCollector):
self.metrics_collector = metrics_collector
def analyze_throughput_trend(self, metric_name: str,
duration_hours: int = 1) -> Dict[str, Any]:
"""分析吞吐量趋势"""
metrics = self.metrics_collector.get_metric_history(
metric_name, duration_hours * 3600
)
if len(metrics) < 2:
return {"trend": "insufficient_data", "metrics_count": len(metrics)}
values = [m.value for m in metrics]
timestamps = [m.timestamp for m in metrics]
# 计算趋势
if len(values) >= 10:
# 使用线性回归计算趋势
x = list(range(len(values)))
slope = self._calculate_slope(x, values)
if slope > 0.1:
trend = "increasing"
elif slope < -0.1:
trend = "decreasing"
else:
trend = "stable"
else:
# 简单比较
if values[-1] > values[0] * 1.1:
trend = "increasing"
elif values[-1] < values[0] * 0.9:
trend = "decreasing"
else:
trend = "stable"
return {
"trend": trend,
"current_value": values[-1],
"avg_value": statistics.mean(values),
"min_value": min(values),
"max_value": max(values),
"std_dev": statistics.stdev(values) if len(values) > 1 else 0,
"metrics_count": len(metrics),
"time_range": {
"start": timestamps[0],
"end": timestamps[-1]
}
}
def _calculate_slope(self, x: List[float], y: List[float]) -> float:
"""计算斜率"""
n = len(x)
if n < 2:
return 0
sum_x = sum(x)
sum_y = sum(y)
sum_xy = sum(x[i] * y[i] for i in range(n))
sum_x2 = sum(x[i] * x[i] for i in range(n))
denominator = n * sum_x2 - sum_x * sum_x
if denominator == 0:
return 0
return (n * sum_xy - sum_x * sum_y) / denominator
def detect_anomalies(self, metric_name: str,
duration_hours: int = 24) -> List[Dict[str, Any]]:
"""检测异常"""
metrics = self.metrics_collector.get_metric_history(
metric_name, duration_hours * 3600
)
if len(metrics) < 10:
return []
values = [m.value for m in metrics]
mean_val = statistics.mean(values)
std_val = statistics.stdev(values)
# 使用3-sigma规则检测异常
threshold = 3 * std_val
anomalies = []
for metric in metrics:
if abs(metric.value - mean_val) > threshold:
anomalies.append({
"timestamp": metric.timestamp,
"value": metric.value,
"expected_range": [mean_val - threshold, mean_val + threshold],
"deviation": abs(metric.value - mean_val) / std_val
})
return anomalies
def generate_health_report(self) -> Dict[str, Any]:
"""生成健康报告"""
report = {
"timestamp": time.time(),
"overall_health": "healthy",
"metrics_analysis": {},
"recommendations": []
}
# 关键指标分析
key_metrics = [
"kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec",
"kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions",
"kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Produce",
"java.lang:type=Memory,name=HeapMemoryUsage.used"
]
health_issues = 0
for metric_name in key_metrics:
latest_metric = self.metrics_collector.get_latest_metric(metric_name)
if not latest_metric:
continue
analysis = self.analyze_throughput_trend(metric_name, 1)
anomalies = self.detect_anomalies(metric_name, 24)
report["metrics_analysis"][metric_name] = {
"current_value": latest_metric.value,
"trend": analysis["trend"],
"anomalies_count": len(anomalies)
}
# 健康检查
if "UnderReplicatedPartitions" in metric_name and latest_metric.value > 0:
health_issues += 1
report["recommendations"].append(
"检测到副本不足的分区,请检查集群状态"
)
if "TotalTimeMs" in metric_name and latest_metric.value > 100:
health_issues += 1
report["recommendations"].append(
"请求处理时间过长,考虑优化配置或增加资源"
)
if "HeapMemoryUsage" in metric_name and latest_metric.value > 1200000000: # 1.2GB
health_issues += 1
report["recommendations"].append(
"JVM堆内存使用率过高,考虑增加堆内存或优化GC"
)
# 确定整体健康状态
if health_issues == 0:
report["overall_health"] = "healthy"
elif health_issues <= 2:
report["overall_health"] = "warning"
else:
report["overall_health"] = "critical"
return report
# 使用示例
if __name__ == "__main__":
# 创建指标收集器
metrics_collector = JMXMetricsCollector()
metrics_collector.start()
# 创建告警管理器
alert_manager = AlertManager(metrics_collector)
# 添加告警规则
rules = [
AlertRule(
name="high_under_replicated_partitions",
metric_name="kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions",
condition=">",
threshold=0,
level=AlertLevel.CRITICAL,
description="副本不足的分区数过多"
),
AlertRule(
name="high_request_latency",
metric_name="kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Produce",
condition=">",
threshold=50,
level=AlertLevel.WARNING,
description="生产请求延迟过高"
),
AlertRule(
name="high_memory_usage",
metric_name="java.lang:type=Memory,name=HeapMemoryUsage.used",
condition=">",
threshold=1200000000, # 1.2GB
level=AlertLevel.WARNING,
description="JVM堆内存使用率过高"
)
]
for rule in rules:
alert_manager.add_rule(rule)
# 添加告警回调
def alert_callback(alert: Alert):
print(f"\n🚨 告警通知: [{alert.level.value.upper()}] {alert.message}")
print(f" 时间: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(alert.timestamp))}")
print(f" 当前值: {alert.current_value}")
print(f" 阈值: {alert.threshold}")
alert_manager.add_alert_callback(alert_callback)
alert_manager.start()
# 创建指标分析器
analyzer = MetricsAnalyzer(metrics_collector)
try:
# 运行监控
for i in range(10):
time.sleep(30) # 等待30秒
print(f"\n=== 监控报告 #{i+1} ===")
# 获取活跃告警
active_alerts = alert_manager.get_active_alerts()
if active_alerts:
print(f"活跃告警数: {len(active_alerts)}")
for alert in active_alerts:
print(f" - {alert.message}")
else:
print("无活跃告警")
# 生成健康报告
health_report = analyzer.generate_health_report()
print(f"整体健康状态: {health_report['overall_health']}")
if health_report['recommendations']:
print("建议:")
for rec in health_report['recommendations']:
print(f" - {rec}")
# 分析关键指标趋势
throughput_analysis = analyzer.analyze_throughput_trend(
"kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec"
)
print(f"消息吞吐量趋势: {throughput_analysis['trend']}")
print(f"当前值: {throughput_analysis.get('current_value', 'N/A')}")
finally:
alert_manager.stop()
metrics_collector.stop()
5.2.2 集群监控
集群级别的监控需要关注整体健康状态和各个组件的协调工作。
import socket
import subprocess
import json
from typing import Dict, List, Any, Optional, Tuple
from dataclasses import dataclass, field
from enum import Enum
import time
import threading
class BrokerStatus(Enum):
"""Broker状态"""
ONLINE = "online"
OFFLINE = "offline"
RECOVERING = "recovering"
UNKNOWN = "unknown"
class PartitionStatus(Enum):
"""分区状态"""
HEALTHY = "healthy"
UNDER_REPLICATED = "under_replicated"
OFFLINE = "offline"
LEADER_ELECTION = "leader_election"
@dataclass
class BrokerInfo:
"""Broker信息"""
broker_id: int
host: str
port: int
status: BrokerStatus
last_seen: float = field(default_factory=time.time)
version: str = ""
rack: Optional[str] = None
# 性能指标
cpu_usage: float = 0.0
memory_usage: float = 0.0
disk_usage: float = 0.0
network_in_rate: float = 0.0
network_out_rate: float = 0.0
# Kafka指标
partition_count: int = 0
leader_count: int = 0
replica_count: int = 0
under_replicated_partitions: int = 0
@dataclass
class TopicInfo:
"""主题信息"""
name: str
partition_count: int
replication_factor: int
config: Dict[str, str] = field(default_factory=dict)
# 分区详情
partitions: List[Dict[str, Any]] = field(default_factory=list)
# 性能指标
messages_per_sec: float = 0.0
bytes_per_sec: float = 0.0
total_size_bytes: int = 0
@dataclass
class ConsumerGroupInfo:
"""消费者组信息"""
group_id: str
state: str
protocol: str
protocol_type: str
members: List[Dict[str, Any]] = field(default_factory=list)
# 延迟信息
total_lag: int = 0
partition_lags: Dict[str, int] = field(default_factory=dict)
class ZooKeeperMonitor:
"""ZooKeeper监控器"""
def __init__(self, zk_hosts: List[str]):
self.zk_hosts = zk_hosts
self.connection_timeout = 5
def check_zk_health(self) -> Dict[str, Any]:
"""检查ZooKeeper健康状态"""
results = {}
for host in self.zk_hosts:
try:
host_port = host.split(':')
hostname = host_port[0]
port = int(host_port[1]) if len(host_port) > 1 else 2181
# 检查连接
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(self.connection_timeout)
result = sock.connect_ex((hostname, port))
sock.close()
if result == 0:
# 发送ruok命令
status = self._send_zk_command(hostname, port, "ruok")
results[host] = {
"status": "healthy" if status == "imok" else "unhealthy",
"response": status,
"timestamp": time.time()
}
else:
results[host] = {
"status": "unreachable",
"error": f"Connection failed: {result}",
"timestamp": time.time()
}
except Exception as e:
results[host] = {
"status": "error",
"error": str(e),
"timestamp": time.time()
}
return results
def _send_zk_command(self, host: str, port: int, command: str) -> str:
"""发送ZooKeeper命令"""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(self.connection_timeout)
sock.connect((host, port))
sock.send(command.encode())
response = sock.recv(1024).decode().strip()
sock.close()
return response
except Exception as e:
return f"error: {e}"
def get_zk_stats(self) -> Dict[str, Any]:
"""获取ZooKeeper统计信息"""
stats = {}
for host in self.zk_hosts:
try:
host_port = host.split(':')
hostname = host_port[0]
port = int(host_port[1]) if len(host_port) > 1 else 2181
# 获取统计信息
stat_response = self._send_zk_command(hostname, port, "stat")
if not stat_response.startswith("error:"):
stats[host] = self._parse_zk_stats(stat_response)
else:
stats[host] = {"error": stat_response}
except Exception as e:
stats[host] = {"error": str(e)}
return stats
def _parse_zk_stats(self, stat_response: str) -> Dict[str, Any]:
"""解析ZooKeeper统计信息"""
stats = {}
lines = stat_response.split('\n')
for line in lines:
if ':' in line:
key, value = line.split(':', 1)
key = key.strip()
value = value.strip()
# 尝试转换为数字
try:
if '.' in value:
stats[key] = float(value)
else:
stats[key] = int(value)
except ValueError:
stats[key] = value
return stats
class KafkaClusterMonitor:
"""Kafka集群监控器"""
def __init__(self, bootstrap_servers: List[str], zk_hosts: List[str]):
self.bootstrap_servers = bootstrap_servers
self.zk_monitor = ZooKeeperMonitor(zk_hosts)
self.brokers = {} # {broker_id: BrokerInfo}
self.topics = {} # {topic_name: TopicInfo}
self.consumer_groups = {} # {group_id: ConsumerGroupInfo}
self.monitoring_interval = 30 # 30秒
self.running = False
self.monitor_thread = None
self.lock = threading.RLock()
def start(self):
"""启动集群监控"""
self.running = True
self.monitor_thread = threading.Thread(target=self._monitoring_loop)
self.monitor_thread.daemon = True
self.monitor_thread.start()
print("Kafka集群监控器已启动")
def _monitoring_loop(self):
"""监控循环"""
while self.running:
try:
self._collect_cluster_info()
time.sleep(self.monitoring_interval)
except Exception as e:
print(f"集群监控错误: {e}")
def _collect_cluster_info(self):
"""收集集群信息"""
# 模拟收集集群信息
self._update_broker_info()
self._update_topic_info()
self._update_consumer_group_info()
def _update_broker_info(self):
"""更新Broker信息"""
# 模拟Broker信息
broker_data = [
{"id": 1, "host": "kafka-1", "port": 9092, "status": BrokerStatus.ONLINE},
{"id": 2, "host": "kafka-2", "port": 9092, "status": BrokerStatus.ONLINE},
{"id": 3, "host": "kafka-3", "port": 9092, "status": BrokerStatus.ONLINE}
]
with self.lock:
for data in broker_data:
broker_info = BrokerInfo(
broker_id=data["id"],
host=data["host"],
port=data["port"],
status=data["status"],
cpu_usage=self._simulate_metric(20, 80),
memory_usage=self._simulate_metric(30, 70),
disk_usage=self._simulate_metric(10, 60),
partition_count=self._simulate_metric(50, 200, int),
leader_count=self._simulate_metric(20, 80, int),
under_replicated_partitions=self._simulate_metric(0, 5, int)
)
self.brokers[data["id"]] = broker_info
def _update_topic_info(self):
"""更新主题信息"""
# 模拟主题信息
topic_data = [
{"name": "user-events", "partitions": 12, "replication": 3},
{"name": "order-events", "partitions": 6, "replication": 3},
{"name": "system-logs", "partitions": 3, "replication": 2}
]
with self.lock:
for data in topic_data:
topic_info = TopicInfo(
name=data["name"],
partition_count=data["partitions"],
replication_factor=data["replication"],
messages_per_sec=self._simulate_metric(100, 1000),
bytes_per_sec=self._simulate_metric(10000, 100000),
total_size_bytes=self._simulate_metric(1000000, 10000000, int)
)
self.topics[data["name"]] = topic_info
def _update_consumer_group_info(self):
"""更新消费者组信息"""
# 模拟消费者组信息
group_data = [
{"id": "user-service", "state": "Stable", "lag": self._simulate_metric(0, 1000, int)},
{"id": "order-service", "state": "Stable", "lag": self._simulate_metric(0, 500, int)},
{"id": "analytics", "state": "Rebalancing", "lag": self._simulate_metric(100, 2000, int)}
]
with self.lock:
for data in group_data:
group_info = ConsumerGroupInfo(
group_id=data["id"],
state=data["state"],
protocol="range",
protocol_type="consumer",
total_lag=data["lag"]
)
self.consumer_groups[data["id"]] = group_info
def _simulate_metric(self, min_val: float, max_val: float, value_type=float):
"""模拟指标值"""
import random
value = random.uniform(min_val, max_val)
return value_type(value)
def get_cluster_health(self) -> Dict[str, Any]:
"""获取集群健康状态"""
with self.lock:
# 检查ZooKeeper健康状态
zk_health = self.zk_monitor.check_zk_health()
zk_healthy = all(info.get("status") == "healthy" for info in zk_health.values())
# 检查Broker健康状态
online_brokers = sum(1 for broker in self.brokers.values()
if broker.status == BrokerStatus.ONLINE)
total_brokers = len(self.brokers)
# 检查副本状态
total_under_replicated = sum(broker.under_replicated_partitions
for broker in self.brokers.values())
# 检查消费者组状态
stable_groups = sum(1 for group in self.consumer_groups.values()
if group.state == "Stable")
total_groups = len(self.consumer_groups)
# 计算总体延迟
total_lag = sum(group.total_lag for group in self.consumer_groups.values())
# 确定健康状态
health_score = 100
issues = []
if not zk_healthy:
health_score -= 30
issues.append("ZooKeeper集群不健康")
if online_brokers < total_brokers:
health_score -= 25
issues.append(f"有{total_brokers - online_brokers}个Broker离线")
if total_under_replicated > 0:
health_score -= 20
issues.append(f"有{total_under_replicated}个分区副本不足")
if total_lag > 10000:
health_score -= 15
issues.append(f"消费者总延迟过高: {total_lag}")
if stable_groups < total_groups:
health_score -= 10
issues.append(f"有{total_groups - stable_groups}个消费者组不稳定")
# 确定健康等级
if health_score >= 90:
health_level = "excellent"
elif health_score >= 75:
health_level = "good"
elif health_score >= 60:
health_level = "warning"
else:
health_level = "critical"
return {
"timestamp": time.time(),
"health_score": health_score,
"health_level": health_level,
"issues": issues,
"summary": {
"zookeeper_healthy": zk_healthy,
"online_brokers": f"{online_brokers}/{total_brokers}",
"under_replicated_partitions": total_under_replicated,
"stable_consumer_groups": f"{stable_groups}/{total_groups}",
"total_consumer_lag": total_lag
},
"details": {
"zookeeper": zk_health,
"brokers": {bid: {
"status": broker.status.value,
"cpu_usage": broker.cpu_usage,
"memory_usage": broker.memory_usage,
"under_replicated_partitions": broker.under_replicated_partitions
} for bid, broker in self.brokers.items()},
"topics": {name: {
"partition_count": topic.partition_count,
"replication_factor": topic.replication_factor,
"messages_per_sec": topic.messages_per_sec
} for name, topic in self.topics.items()},
"consumer_groups": {gid: {
"state": group.state,
"total_lag": group.total_lag
} for gid, group in self.consumer_groups.items()}
}
}
def get_performance_summary(self) -> Dict[str, Any]:
"""获取性能摘要"""
with self.lock:
# 计算集群级别的性能指标
total_messages_per_sec = sum(topic.messages_per_sec for topic in self.topics.values())
total_bytes_per_sec = sum(topic.bytes_per_sec for topic in self.topics.values())
avg_cpu_usage = sum(broker.cpu_usage for broker in self.brokers.values()) / len(self.brokers) if self.brokers else 0
avg_memory_usage = sum(broker.memory_usage for broker in self.brokers.values()) / len(self.brokers) if self.brokers else 0
total_partitions = sum(topic.partition_count for topic in self.topics.values())
total_replicas = sum(topic.partition_count * topic.replication_factor for topic in self.topics.values())
return {
"timestamp": time.time(),
"throughput": {
"messages_per_sec": total_messages_per_sec,
"bytes_per_sec": total_bytes_per_sec,
"mb_per_sec": total_bytes_per_sec / (1024 * 1024)
},
"resource_usage": {
"avg_cpu_usage": avg_cpu_usage,
"avg_memory_usage": avg_memory_usage,
"broker_count": len(self.brokers),
"online_brokers": sum(1 for b in self.brokers.values() if b.status == BrokerStatus.ONLINE)
},
"data_distribution": {
"topic_count": len(self.topics),
"total_partitions": total_partitions,
"total_replicas": total_replicas,
"avg_partitions_per_topic": total_partitions / len(self.topics) if self.topics else 0
},
"consumer_health": {
"consumer_group_count": len(self.consumer_groups),
"total_lag": sum(group.total_lag for group in self.consumer_groups.values()),
"stable_groups": sum(1 for g in self.consumer_groups.values() if g.state == "Stable")
}
}
def stop(self):
"""停止集群监控"""
self.running = False
if self.monitor_thread:
self.monitor_thread.join()
print("Kafka集群监控器已停止")
# 使用示例
if __name__ == "__main__":
# 创建集群监控器
cluster_monitor = KafkaClusterMonitor(
bootstrap_servers=["kafka-1:9092", "kafka-2:9092", "kafka-3:9092"],
zk_hosts=["zk-1:2181", "zk-2:2181", "zk-3:2181"]
)
cluster_monitor.start()
try:
# 运行监控
for i in range(5):
time.sleep(60) # 等待1分钟
print(f"\n=== 集群监控报告 #{i+1} ===")
# 获取集群健康状态
health = cluster_monitor.get_cluster_health()
print(f"集群健康评分: {health['health_score']}/100 ({health['health_level']})")
if health['issues']:
print("发现问题:")
for issue in health['issues']:
print(f" - {issue}")
else:
print("集群运行正常")
# 获取性能摘要
performance = cluster_monitor.get_performance_summary()
print(f"\n性能摘要:")
print(f" 吞吐量: {performance['throughput']['messages_per_sec']:.0f} msg/sec, {performance['throughput']['mb_per_sec']:.2f} MB/sec")
print(f" 资源使用: CPU {performance['resource_usage']['avg_cpu_usage']:.1f}%, 内存 {performance['resource_usage']['avg_memory_usage']:.1f}%")
print(f" 在线Broker: {performance['resource_usage']['online_brokers']}/{performance['resource_usage']['broker_count']}")
print(f" 消费者延迟: {performance['consumer_health']['total_lag']}")
finally:
cluster_monitor.stop()
5.3 故障排查
5.3.1 常见问题诊断
import re
import os
import subprocess
from typing import Dict, List, Any, Optional, Tuple
from dataclasses import dataclass, field
from enum import Enum
import time
class IssueType(Enum):
"""问题类型"""
PERFORMANCE = "performance"
CONNECTIVITY = "connectivity"
DATA_LOSS = "data_loss"
REPLICATION = "replication"
CONSUMER_LAG = "consumer_lag"
DISK_SPACE = "disk_space"
MEMORY = "memory"
NETWORK = "network"
CONFIGURATION = "configuration"
class Severity(Enum):
"""严重程度"""
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
@dataclass
class DiagnosticResult:
"""诊断结果"""
issue_type: IssueType
severity: Severity
title: str
description: str
symptoms: List[str] = field(default_factory=list)
possible_causes: List[str] = field(default_factory=list)
solutions: List[str] = field(default_factory=list)
commands: List[str] = field(default_factory=list)
timestamp: float = field(default_factory=time.time)
class KafkaDiagnosticTool:
"""Kafka诊断工具"""
def __init__(self, kafka_home: str, bootstrap_servers: str):
self.kafka_home = kafka_home
self.bootstrap_servers = bootstrap_servers
self.kafka_topics_cmd = os.path.join(kafka_home, "bin", "kafka-topics.sh")
self.kafka_consumer_groups_cmd = os.path.join(kafka_home, "bin", "kafka-consumer-groups.sh")
self.kafka_log_dirs_cmd = os.path.join(kafka_home, "bin", "kafka-log-dirs.sh")
def diagnose_all(self) -> List[DiagnosticResult]:
"""执行全面诊断"""
results = []
# 执行各种诊断检查
results.extend(self.check_broker_connectivity())
results.extend(self.check_topic_health())
results.extend(self.check_consumer_lag())
results.extend(self.check_disk_usage())
results.extend(self.check_replication_issues())
results.extend(self.check_performance_issues())
return results
def check_broker_connectivity(self) -> List[DiagnosticResult]:
"""检查Broker连接性"""
results = []
try:
# 尝试列出主题来测试连接
cmd = [self.kafka_topics_cmd, "--bootstrap-server", self.bootstrap_servers, "--list"]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if result.returncode != 0:
results.append(DiagnosticResult(
issue_type=IssueType.CONNECTIVITY,
severity=Severity.CRITICAL,
title="无法连接到Kafka集群",
description="无法连接到指定的Kafka bootstrap服务器",
symptoms=[
"kafka-topics命令失败",
f"错误输出: {result.stderr}"
],
possible_causes=[
"Broker服务未启动",
"网络连接问题",
"防火墙阻止连接",
"错误的bootstrap服务器地址",
"SSL/SASL配置问题"
],
solutions=[
"检查Broker服务状态",
"验证网络连接",
"检查防火墙设置",
"确认bootstrap服务器地址",
"检查安全配置"
],
commands=[
"systemctl status kafka",
"netstat -tlnp | grep 9092",
"telnet <broker-host> 9092"
]
))
except subprocess.TimeoutExpired:
results.append(DiagnosticResult(
issue_type=IssueType.CONNECTIVITY,
severity=Severity.HIGH,
title="连接超时",
description="连接Kafka集群超时",
symptoms=["命令执行超时"],
possible_causes=[
"网络延迟过高",
"Broker响应缓慢",
"负载过高"
],
solutions=[
"检查网络延迟",
"检查Broker负载",
"增加超时时间"
]
))
except Exception as e:
results.append(DiagnosticResult(
issue_type=IssueType.CONNECTIVITY,
severity=Severity.HIGH,
title="连接检查失败",
description=f"执行连接检查时发生错误: {e}",
symptoms=[f"异常: {e}"],
possible_causes=["系统配置问题", "权限问题"],
solutions=["检查系统配置", "验证执行权限"]
))
return results
def check_topic_health(self) -> List[DiagnosticResult]:
"""检查主题健康状态"""
results = []
try:
# 获取主题列表
cmd = [self.kafka_topics_cmd, "--bootstrap-server", self.bootstrap_servers, "--describe"]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if result.returncode == 0:
# 解析主题信息
topic_info = self._parse_topic_describe_output(result.stdout)
for topic_name, info in topic_info.items():
# 检查副本不足的分区
under_replicated = info.get('under_replicated_partitions', [])
if under_replicated:
results.append(DiagnosticResult(
issue_type=IssueType.REPLICATION,
severity=Severity.HIGH,
title=f"主题 {topic_name} 存在副本不足的分区",
description=f"分区 {under_replicated} 的副本数量不足",
symptoms=[
f"副本不足的分区: {under_replicated}",
"数据可能面临丢失风险"
],
possible_causes=[
"Broker节点故障",
"网络分区",
"磁盘空间不足",
"配置错误"
],
solutions=[
"检查所有Broker状态",
"重启故障的Broker",
"检查磁盘空间",
"执行分区重分配"
],
commands=[
f"kafka-topics.sh --bootstrap-server {self.bootstrap_servers} --describe --topic {topic_name}",
"kafka-reassign-partitions.sh --bootstrap-server {self.bootstrap_servers} --generate"
]
))
# 检查离线分区
offline_partitions = info.get('offline_partitions', [])
if offline_partitions:
results.append(DiagnosticResult(
issue_type=IssueType.DATA_LOSS,
severity=Severity.CRITICAL,
title=f"主题 {topic_name} 存在离线分区",
description=f"分区 {offline_partitions} 完全离线",
symptoms=[
f"离线分区: {offline_partitions}",
"无法读写数据"
],
possible_causes=[
"所有副本都不可用",
"严重的硬件故障",
"数据损坏"
],
solutions=[
"立即检查所有相关Broker",
"尝试恢复故障的Broker",
"考虑从备份恢复数据",
"如果数据无法恢复,考虑重置分区"
],
commands=[
f"kafka-topics.sh --bootstrap-server {self.bootstrap_servers} --describe --topic {topic_name}",
"kafka-log-dirs.sh --bootstrap-server {self.bootstrap_servers} --describe"
]
))
except Exception as e:
results.append(DiagnosticResult(
issue_type=IssueType.CONFIGURATION,
severity=Severity.MEDIUM,
title="无法检查主题健康状态",
description=f"检查主题时发生错误: {e}",
symptoms=[f"异常: {e}"],
possible_causes=["权限问题", "配置错误"],
solutions=["检查执行权限", "验证配置"]
))
return results
def check_consumer_lag(self) -> List[DiagnosticResult]:
"""检查消费者延迟"""
results = []
try:
# 获取消费者组列表
cmd = [self.kafka_consumer_groups_cmd, "--bootstrap-server", self.bootstrap_servers, "--list"]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if result.returncode == 0:
groups = result.stdout.strip().split('\n')
for group in groups:
if group.strip():
lag_info = self._check_group_lag(group.strip())
if lag_info['total_lag'] > 10000: # 高延迟阈值
results.append(DiagnosticResult(
issue_type=IssueType.CONSUMER_LAG,
severity=Severity.HIGH,
title=f"消费者组 {group} 延迟过高",
description=f"总延迟: {lag_info['total_lag']} 条消息",
symptoms=[
f"总延迟: {lag_info['total_lag']}",
f"最大分区延迟: {lag_info['max_lag']}",
"消费速度跟不上生产速度"
],
possible_causes=[
"消费者处理速度慢",
"消费者实例数量不足",
"网络问题",
"消费者配置不当",
"下游系统瓶颈"
],
solutions=[
"增加消费者实例数量",
"优化消费者处理逻辑",
"调整消费者配置",
"检查下游系统性能",
"考虑增加分区数量"
],
commands=[
f"kafka-consumer-groups.sh --bootstrap-server {self.bootstrap_servers} --describe --group {group}",
f"kafka-consumer-groups.sh --bootstrap-server {self.bootstrap_servers} --reset-offsets --group {group} --to-latest --execute"
]
))
elif lag_info['total_lag'] > 1000: # 中等延迟阈值
results.append(DiagnosticResult(
issue_type=IssueType.CONSUMER_LAG,
severity=Severity.MEDIUM,
title=f"消费者组 {group} 存在延迟",
description=f"总延迟: {lag_info['total_lag']} 条消息",
symptoms=[f"总延迟: {lag_info['total_lag']}"],
possible_causes=["消费速度略慢", "临时负载增加"],
solutions=["监控延迟趋势", "考虑优化消费者"]
))
except Exception as e:
results.append(DiagnosticResult(
issue_type=IssueType.CONSUMER_LAG,
severity=Severity.MEDIUM,
title="无法检查消费者延迟",
description=f"检查消费者延迟时发生错误: {e}",
symptoms=[f"异常: {e}"],
possible_causes=["权限问题", "配置错误"],
solutions=["检查执行权限", "验证配置"]
))
return results
def check_disk_usage(self) -> List[DiagnosticResult]:
"""检查磁盘使用情况"""
results = []
try:
# 检查Kafka日志目录的磁盘使用情况
# 这里模拟检查,实际实现需要根据具体的日志目录配置
log_dirs = ["/var/kafka-logs", "/data/kafka-logs"]
for log_dir in log_dirs:
if os.path.exists(log_dir):
usage = self._get_disk_usage(log_dir)
if usage > 90: # 磁盘使用率超过90%
results.append(DiagnosticResult(
issue_type=IssueType.DISK_SPACE,
severity=Severity.CRITICAL,
title=f"磁盘空间严重不足: {log_dir}",
description=f"磁盘使用率: {usage}%",
symptoms=[
f"磁盘使用率: {usage}%",
"可能导致Broker停止工作",
"新消息无法写入"
],
possible_causes=[
"日志保留时间过长",
"日志段大小配置不当",
"清理策略未生效",
"磁盘容量不足"
],
solutions=[
"立即清理旧日志",
"调整日志保留策略",
"增加磁盘容量",
"迁移部分数据到其他磁盘"
],
commands=[
f"df -h {log_dir}",
f"du -sh {log_dir}/*",
"kafka-log-dirs.sh --bootstrap-server {self.bootstrap_servers} --describe"
]
))
elif usage > 80: # 磁盘使用率超过80%
results.append(DiagnosticResult(
issue_type=IssueType.DISK_SPACE,
severity=Severity.HIGH,
title=f"磁盘空间不足警告: {log_dir}",
description=f"磁盘使用率: {usage}%",
symptoms=[f"磁盘使用率: {usage}%"],
possible_causes=["日志积累过多", "清理不及时"],
solutions=[
"检查日志保留策略",
"清理不必要的日志",
"监控磁盘使用趋势"
]
))
except Exception as e:
results.append(DiagnosticResult(
issue_type=IssueType.DISK_SPACE,
severity=Severity.MEDIUM,
title="无法检查磁盘使用情况",
description=f"检查磁盘使用时发生错误: {e}",
symptoms=[f"异常: {e}"],
possible_causes=["权限问题", "路径不存在"],
solutions=["检查路径和权限"]
))
return results
def check_replication_issues(self) -> List[DiagnosticResult]:
"""检查副本相关问题"""
results = []
# 这里可以添加更详细的副本检查逻辑
# 例如检查ISR收缩、Leader选举频繁等问题
return results
def check_performance_issues(self) -> List[DiagnosticResult]:
"""检查性能相关问题"""
results = []
# 这里可以添加性能检查逻辑
# 例如检查JVM GC、网络延迟、请求处理时间等
return results
def _parse_topic_describe_output(self, output: str) -> Dict[str, Dict[str, Any]]:
"""解析主题描述输出"""
topics = {}
current_topic = None
for line in output.split('\n'):
line = line.strip()
if line.startswith('Topic:'):
# 解析主题行
parts = line.split()
topic_name = parts[1]
current_topic = topic_name
topics[topic_name] = {
'partitions': [],
'under_replicated_partitions': [],
'offline_partitions': []
}
elif line.startswith('Partition:') and current_topic:
# 解析分区行
if 'Isr:' in line:
# 检查ISR是否完整
replicas_match = re.search(r'Replicas: \[([^\]]+)\]', line)
isr_match = re.search(r'Isr: \[([^\]]+)\]', line)
if replicas_match and isr_match:
replicas = set(replicas_match.group(1).split(','))
isr = set(isr_match.group(1).split(','))
partition_match = re.search(r'Partition: (\d+)', line)
if partition_match:
partition_id = int(partition_match.group(1))
if len(isr) < len(replicas):
topics[current_topic]['under_replicated_partitions'].append(partition_id)
if len(isr) == 0:
topics[current_topic]['offline_partitions'].append(partition_id)
return topics
def _check_group_lag(self, group_id: str) -> Dict[str, int]:
"""检查消费者组延迟"""
try:
cmd = [self.kafka_consumer_groups_cmd, "--bootstrap-server", self.bootstrap_servers,
"--describe", "--group", group_id]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if result.returncode == 0:
return self._parse_consumer_group_output(result.stdout)
else:
return {'total_lag': 0, 'max_lag': 0}
except Exception:
return {'total_lag': 0, 'max_lag': 0}
def _parse_consumer_group_output(self, output: str) -> Dict[str, int]:
"""解析消费者组输出"""
total_lag = 0
max_lag = 0
for line in output.split('\n'):
if 'LAG' in line and not line.startswith('TOPIC'):
parts = line.split()
if len(parts) >= 5:
try:
lag = int(parts[4]) if parts[4] != '-' else 0
total_lag += lag
max_lag = max(max_lag, lag)
except ValueError:
pass
return {'total_lag': total_lag, 'max_lag': max_lag}
def _get_disk_usage(self, path: str) -> float:
"""获取磁盘使用率"""
try:
result = subprocess.run(['df', path], capture_output=True, text=True)
if result.returncode == 0:
lines = result.stdout.strip().split('\n')
if len(lines) >= 2:
parts = lines[1].split()
if len(parts) >= 5:
usage_str = parts[4].rstrip('%')
return float(usage_str)
return 0.0
except Exception:
return 0.0
def generate_report(self, results: List[DiagnosticResult]) -> str:
"""生成诊断报告"""
if not results:
return "✅ 未发现问题,Kafka集群运行正常。"
report = ["\n🔍 Kafka集群诊断报告"]
report.append("=" * 50)
report.append(f"诊断时间: {time.strftime('%Y-%m-%d %H:%M:%S')}")
report.append(f"发现问题数量: {len(results)}")
# 按严重程度分组
by_severity = {}
for result in results:
if result.severity not in by_severity:
by_severity[result.severity] = []
by_severity[result.severity].append(result)
# 按严重程度输出
severity_order = [Severity.CRITICAL, Severity.HIGH, Severity.MEDIUM, Severity.LOW]
severity_icons = {
Severity.CRITICAL: "🚨",
Severity.HIGH: "⚠️",
Severity.MEDIUM: "⚡",
Severity.LOW: "ℹ️"
}
for severity in severity_order:
if severity in by_severity:
report.append(f"\n{severity_icons[severity]} {severity.value.upper()} 级别问题:")
report.append("-" * 30)
for i, result in enumerate(by_severity[severity], 1):
report.append(f"\n{i}. {result.title}")
report.append(f" 类型: {result.issue_type.value}")
report.append(f" 描述: {result.description}")
if result.symptoms:
report.append(" 症状:")
for symptom in result.symptoms:
report.append(f" • {symptom}")
if result.possible_causes:
report.append(" 可能原因:")
for cause in result.possible_causes:
report.append(f" • {cause}")
if result.solutions:
report.append(" 解决方案:")
for solution in result.solutions:
report.append(f" • {solution}")
if result.commands:
report.append(" 相关命令:")
for command in result.commands:
report.append(f" $ {command}")
return "\n".join(report)
# 使用示例
if __name__ == "__main__":
# 创建诊断工具
diagnostic_tool = KafkaDiagnosticTool(
kafka_home="/opt/kafka",
bootstrap_servers="localhost:9092"
)
print("开始Kafka集群诊断...")
# 执行诊断
results = diagnostic_tool.diagnose_all()
# 生成报告
report = diagnostic_tool.generate_report(results)
print(report)
# 保存报告到文件
timestamp = time.strftime('%Y%m%d_%H%M%S')
report_file = f"kafka_diagnostic_report_{timestamp}.txt"
with open(report_file, 'w', encoding='utf-8') as f:
f.write(report)
print(f"\n诊断报告已保存到: {report_file}")
5.4 本章总结
核心知识点
性能调优
- 生产者优化:批处理、压缩、异步发送
- 消费者优化:拉取配置、并发处理、偏移量管理
- Broker优化:网络线程、IO线程、日志配置
监控体系
- JMX指标收集和分析
- 集群健康状态监控
- 告警规则和通知机制
故障排查
- 常见问题诊断方法
- 系统化的问题分析流程
- 自动化诊断工具
最佳实践
性能优化
- 根据业务场景选择合适的配置参数
- 定期进行性能测试和调优
- 监控关键性能指标
监控告警
- 建立完善的监控体系
- 设置合理的告警阈值
- 实现自动化的问题检测
故障处理
- 建立标准化的故障处理流程
- 定期进行故障演练
- 维护详细的运维文档
练习题
性能调优实践
- 设计一个高吞吐量的生产者配置方案
- 优化一个消费延迟过高的消费者组
- 调优Broker配置以提高整体性能
监控系统设计
- 设计一个完整的Kafka监控方案
- 实现自定义的告警规则
- 开发性能分析工具
故障排查演练
- 模拟各种故障场景并进行排查
- 开发自动化的诊断脚本
- 建立故障处理知识库
通过本章的学习,你应该能够: - 掌握Kafka性能调优的方法和技巧 - 建立完善的监控和告警体系 - 具备快速诊断和解决问题的能力 - 实现Kafka集群的高效运维管理 “`