4.1 Kafka Streams
4.1.1 Streams概述
Kafka Streams是一个用于构建实时流处理应用程序的客户端库。它提供了高级的流处理抽象,使开发者能够轻松构建复杂的流处理拓扑。
import json
import time
from typing import Dict, List, Any, Callable, Optional
from collections import defaultdict, deque
from dataclasses import dataclass
from datetime import datetime, timedelta
import threading
import queue
@dataclass
class StreamRecord:
"""流记录"""
key: str
value: Any
timestamp: int
topic: str
partition: int
offset: int
headers: Dict[str, str] = None
def __post_init__(self):
if self.headers is None:
self.headers = {}
class KafkaStreamsConfig:
"""Kafka Streams配置"""
def __init__(self):
self.application_id = "kafka-streams-app"
self.bootstrap_servers = "localhost:9092"
self.default_key_serde = "string"
self.default_value_serde = "string"
self.commit_interval_ms = 30000
self.cache_max_bytes_buffering = 10485760 # 10MB
self.num_stream_threads = 1
self.processing_guarantee = "at_least_once" # at_least_once, exactly_once
self.state_dir = "/tmp/kafka-streams"
self.replication_factor = 1
self.window_store_change_log_additional_retention_ms = 86400000 # 1 day
def to_dict(self) -> Dict[str, Any]:
"""转换为字典"""
return {
"application.id": self.application_id,
"bootstrap.servers": self.bootstrap_servers,
"default.key.serde": self.default_key_serde,
"default.value.serde": self.default_value_serde,
"commit.interval.ms": self.commit_interval_ms,
"cache.max.bytes.buffering": self.cache_max_bytes_buffering,
"num.stream.threads": self.num_stream_threads,
"processing.guarantee": self.processing_guarantee,
"state.dir": self.state_dir,
"replication.factor": self.replication_factor,
"windowstore.changelog.additional.retention.ms": self.window_store_change_log_additional_retention_ms
}
class KStream:
"""Kafka流"""
def __init__(self, topic: str, config: KafkaStreamsConfig):
self.topic = topic
self.config = config
self.processors = []
self.branches = []
self.joined_streams = []
def filter(self, predicate: Callable[[StreamRecord], bool]) -> 'KStream':
"""过滤流"""
new_stream = KStream(f"{self.topic}-filtered", self.config)
new_stream.processors = self.processors.copy()
new_stream.processors.append(('filter', predicate))
return new_stream
def map(self, mapper: Callable[[StreamRecord], StreamRecord]) -> 'KStream':
"""映射流"""
new_stream = KStream(f"{self.topic}-mapped", self.config)
new_stream.processors = self.processors.copy()
new_stream.processors.append(('map', mapper))
return new_stream
def map_values(self, mapper: Callable[[Any], Any]) -> 'KStream':
"""映射值"""
def value_mapper(record: StreamRecord) -> StreamRecord:
new_record = StreamRecord(
key=record.key,
value=mapper(record.value),
timestamp=record.timestamp,
topic=record.topic,
partition=record.partition,
offset=record.offset,
headers=record.headers
)
return new_record
return self.map(value_mapper)
def flat_map(self, mapper: Callable[[StreamRecord], List[StreamRecord]]) -> 'KStream':
"""扁平映射"""
new_stream = KStream(f"{self.topic}-flatmapped", self.config)
new_stream.processors = self.processors.copy()
new_stream.processors.append(('flatmap', mapper))
return new_stream
def group_by_key(self) -> 'KGroupedStream':
"""按键分组"""
return KGroupedStream(self.topic, self.config, self.processors)
def group_by(self, key_selector: Callable[[StreamRecord], str]) -> 'KGroupedStream':
"""按指定键分组"""
def rekey_mapper(record: StreamRecord) -> StreamRecord:
new_key = key_selector(record)
return StreamRecord(
key=new_key,
value=record.value,
timestamp=record.timestamp,
topic=record.topic,
partition=record.partition,
offset=record.offset,
headers=record.headers
)
rekeyed_stream = self.map(rekey_mapper)
return KGroupedStream(f"{self.topic}-rekeyed", self.config, rekeyed_stream.processors)
def join(self, other_stream: 'KStream', joiner: Callable[[Any, Any], Any],
window_size_ms: int = 60000) -> 'KStream':
"""流连接"""
new_stream = KStream(f"{self.topic}-joined", self.config)
new_stream.processors = self.processors.copy()
new_stream.processors.append(('join', other_stream, joiner, window_size_ms))
return new_stream
def branch(self, *predicates: Callable[[StreamRecord], bool]) -> List['KStream']:
"""分支流"""
branches = []
for i, predicate in enumerate(predicates):
branch_stream = KStream(f"{self.topic}-branch-{i}", self.config)
branch_stream.processors = self.processors.copy()
branch_stream.processors.append(('branch', predicate))
branches.append(branch_stream)
return branches
def to(self, topic: str):
"""输出到主题"""
self.processors.append(('to', topic))
def foreach(self, action: Callable[[StreamRecord], None]):
"""对每个记录执行操作"""
self.processors.append(('foreach', action))
class KGroupedStream:
"""分组流"""
def __init__(self, topic: str, config: KafkaStreamsConfig, processors: List):
self.topic = topic
self.config = config
self.processors = processors
def count(self, window_size_ms: int = None) -> 'KTable':
"""计数聚合"""
if window_size_ms:
return self.windowed_by(window_size_ms).count()
table = KTable(f"{self.topic}-count", self.config)
table.processors = self.processors.copy()
table.processors.append(('count', None))
return table
def aggregate(self, initializer: Callable[[], Any],
aggregator: Callable[[Any, StreamRecord], Any],
window_size_ms: int = None) -> 'KTable':
"""自定义聚合"""
if window_size_ms:
return self.windowed_by(window_size_ms).aggregate(initializer, aggregator)
table = KTable(f"{self.topic}-aggregated", self.config)
table.processors = self.processors.copy()
table.processors.append(('aggregate', initializer, aggregator))
return table
def reduce(self, reducer: Callable[[Any, Any], Any],
window_size_ms: int = None) -> 'KTable':
"""归约聚合"""
if window_size_ms:
return self.windowed_by(window_size_ms).reduce(reducer)
table = KTable(f"{self.topic}-reduced", self.config)
table.processors = self.processors.copy()
table.processors.append(('reduce', reducer))
return table
def windowed_by(self, window_size_ms: int) -> 'TimeWindowedKStream':
"""时间窗口"""
return TimeWindowedKStream(self.topic, self.config, self.processors, window_size_ms)
class TimeWindowedKStream:
"""时间窗口流"""
def __init__(self, topic: str, config: KafkaStreamsConfig,
processors: List, window_size_ms: int):
self.topic = topic
self.config = config
self.processors = processors
self.window_size_ms = window_size_ms
def count(self) -> 'KTable':
"""窗口计数"""
table = KTable(f"{self.topic}-windowed-count", self.config)
table.processors = self.processors.copy()
table.processors.append(('windowed_count', self.window_size_ms))
return table
def aggregate(self, initializer: Callable[[], Any],
aggregator: Callable[[Any, StreamRecord], Any]) -> 'KTable':
"""窗口聚合"""
table = KTable(f"{self.topic}-windowed-aggregated", self.config)
table.processors = self.processors.copy()
table.processors.append(('windowed_aggregate', self.window_size_ms, initializer, aggregator))
return table
def reduce(self, reducer: Callable[[Any, Any], Any]) -> 'KTable':
"""窗口归约"""
table = KTable(f"{self.topic}-windowed-reduced", self.config)
table.processors = self.processors.copy()
table.processors.append(('windowed_reduce', self.window_size_ms, reducer))
return table
class KTable:
"""Kafka表"""
def __init__(self, topic: str, config: KafkaStreamsConfig):
self.topic = topic
self.config = config
self.processors = []
def filter(self, predicate: Callable[[str, Any], bool]) -> 'KTable':
"""过滤表"""
new_table = KTable(f"{self.topic}-filtered", self.config)
new_table.processors = self.processors.copy()
new_table.processors.append(('table_filter', predicate))
return new_table
def map_values(self, mapper: Callable[[Any], Any]) -> 'KTable':
"""映射值"""
new_table = KTable(f"{self.topic}-mapped", self.config)
new_table.processors = self.processors.copy()
new_table.processors.append(('table_map_values', mapper))
return new_table
def join(self, other_table: 'KTable', joiner: Callable[[Any, Any], Any]) -> 'KTable':
"""表连接"""
new_table = KTable(f"{self.topic}-joined", self.config)
new_table.processors = self.processors.copy()
new_table.processors.append(('table_join', other_table, joiner))
return new_table
def to_stream(self) -> KStream:
"""转换为流"""
stream = KStream(f"{self.topic}-stream", self.config)
stream.processors = self.processors.copy()
stream.processors.append(('to_stream',))
return stream
def to(self, topic: str):
"""输出到主题"""
self.processors.append(('table_to', topic))
class StreamsBuilder:
"""流构建器"""
def __init__(self, config: KafkaStreamsConfig):
self.config = config
self.streams = {}
self.tables = {}
def stream(self, topic: str) -> KStream:
"""创建流"""
if topic not in self.streams:
self.streams[topic] = KStream(topic, self.config)
return self.streams[topic]
def table(self, topic: str) -> KTable:
"""创建表"""
if topic not in self.tables:
self.tables[topic] = KTable(topic, self.config)
return self.tables[topic]
def build(self) -> 'Topology':
"""构建拓扑"""
return Topology(self.config, self.streams, self.tables)
class Topology:
"""流处理拓扑"""
def __init__(self, config: KafkaStreamsConfig, streams: Dict[str, KStream],
tables: Dict[str, KTable]):
self.config = config
self.streams = streams
self.tables = tables
self.state_stores = {}
self.processors_graph = self._build_processors_graph()
def _build_processors_graph(self) -> Dict[str, List]:
"""构建处理器图"""
graph = {}
# 添加流处理器
for topic, stream in self.streams.items():
graph[topic] = stream.processors
# 添加表处理器
for topic, table in self.tables.items():
graph[topic] = table.processors
return graph
def describe(self) -> str:
"""描述拓扑"""
description = f"Topology for application: {self.config.application_id}\n"
description += "Streams:\n"
for topic, processors in self.processors_graph.items():
description += f" {topic}:\n"
for i, processor in enumerate(processors):
if isinstance(processor, tuple):
description += f" {i}: {processor[0]}\n"
else:
description += f" {i}: {processor}\n"
return description
class KafkaStreams:
"""Kafka Streams应用程序"""
def __init__(self, topology: Topology):
self.topology = topology
self.config = topology.config
self.state = "CREATED"
self.stream_threads = []
self.state_stores = {}
self.metrics = {
"records_processed": 0,
"records_failed": 0,
"processing_rate": 0.0,
"error_rate": 0.0
}
self.running = False
self.exception_handler = None
def set_uncaught_exception_handler(self, handler: Callable[[Exception], None]):
"""设置异常处理器"""
self.exception_handler = handler
def start(self):
"""启动流处理"""
if self.state != "CREATED":
raise RuntimeError(f"Cannot start streams in state: {self.state}")
self.state = "STARTING"
self.running = True
# 创建流处理线程
for i in range(self.config.num_stream_threads):
thread = StreamThread(f"stream-thread-{i}", self)
self.stream_threads.append(thread)
thread.start()
self.state = "RUNNING"
print(f"Kafka Streams应用程序已启动: {self.config.application_id}")
def close(self, timeout_ms: int = 30000):
"""关闭流处理"""
if self.state not in ["RUNNING", "ERROR"]:
return
self.state = "PENDING_SHUTDOWN"
self.running = False
# 等待所有线程结束
for thread in self.stream_threads:
thread.join(timeout_ms / 1000)
self.state = "NOT_RUNNING"
print(f"Kafka Streams应用程序已关闭: {self.config.application_id}")
def get_metrics(self) -> Dict[str, Any]:
"""获取指标"""
return self.metrics.copy()
def process_record(self, record: StreamRecord) -> List[StreamRecord]:
"""处理记录"""
try:
results = [record]
# 获取对应的处理器链
processors = self.topology.processors_graph.get(record.topic, [])
for processor in processors:
new_results = []
for result_record in results:
processed = self._apply_processor(processor, result_record)
if processed:
if isinstance(processed, list):
new_results.extend(processed)
else:
new_results.append(processed)
results = new_results
self.metrics["records_processed"] += 1
return results
except Exception as e:
self.metrics["records_failed"] += 1
if self.exception_handler:
self.exception_handler(e)
else:
print(f"处理记录时发生错误: {e}")
return []
def _apply_processor(self, processor, record: StreamRecord):
"""应用处理器"""
if not isinstance(processor, tuple):
return record
processor_type = processor[0]
if processor_type == "filter":
predicate = processor[1]
return record if predicate(record) else None
elif processor_type == "map":
mapper = processor[1]
return mapper(record)
elif processor_type == "flatmap":
mapper = processor[1]
return mapper(record)
elif processor_type == "foreach":
action = processor[1]
action(record)
return record
elif processor_type == "to":
# 这里应该发送到指定主题
topic = processor[1]
print(f"发送记录到主题 {topic}: {record}")
return record
# 其他处理器类型的实现...
return record
class StreamThread(threading.Thread):
"""流处理线程"""
def __init__(self, name: str, streams_app: KafkaStreams):
super().__init__(name=name)
self.streams_app = streams_app
self.daemon = True
def run(self):
"""运行流处理"""
print(f"流处理线程启动: {self.name}")
# 模拟消费和处理消息
while self.streams_app.running:
try:
# 这里应该从Kafka消费消息
# 为了演示,我们创建一些模拟记录
time.sleep(1)
# 模拟处理记录
sample_record = StreamRecord(
key="test-key",
value={"message": "test message", "timestamp": int(time.time())},
timestamp=int(time.time() * 1000),
topic="input-topic",
partition=0,
offset=1
)
self.streams_app.process_record(sample_record)
except Exception as e:
print(f"流处理线程错误: {e}")
if self.streams_app.exception_handler:
self.streams_app.exception_handler(e)
print(f"流处理线程结束: {self.name}")
# 使用示例
if __name__ == "__main__":
# 配置
config = KafkaStreamsConfig()
config.application_id = "word-count-app"
config.bootstrap_servers = "localhost:9092"
# 构建拓扑
builder = StreamsBuilder(config)
# 创建流处理拓扑
input_stream = builder.stream("input-topic")
# 单词计数示例
word_counts = (input_stream
.map_values(lambda text: text.lower())
.flat_map(lambda record: [
StreamRecord(
key=word,
value=1,
timestamp=record.timestamp,
topic=record.topic,
partition=record.partition,
offset=record.offset
) for word in record.value.split()
])
.group_by_key()
.count())
word_counts.to("word-counts-output")
# 构建并启动应用程序
topology = builder.build()
streams = KafkaStreams(topology)
print("拓扑描述:")
print(topology.describe())
try:
streams.start()
time.sleep(30) # 运行30秒
finally:
streams.close()
4.1.2 状态存储
import os
import pickle
import threading
from abc import ABC, abstractmethod
from typing import Dict, Any, Optional, Iterator, Tuple
class StateStore(ABC):
"""状态存储接口"""
@abstractmethod
def put(self, key: str, value: Any) -> None:
"""存储键值对"""
pass
@abstractmethod
def get(self, key: str) -> Optional[Any]:
"""获取值"""
pass
@abstractmethod
def delete(self, key: str) -> Optional[Any]:
"""删除键值对"""
pass
@abstractmethod
def all(self) -> Iterator[Tuple[str, Any]]:
"""获取所有键值对"""
pass
@abstractmethod
def close(self) -> None:
"""关闭存储"""
pass
@abstractmethod
def flush(self) -> None:
"""刷新到持久化存储"""
pass
class InMemoryKeyValueStore(StateStore):
"""内存键值存储"""
def __init__(self, name: str):
self.name = name
self.store = {}
self.lock = threading.RLock()
def put(self, key: str, value: Any) -> None:
with self.lock:
self.store[key] = value
def get(self, key: str) -> Optional[Any]:
with self.lock:
return self.store.get(key)
def delete(self, key: str) -> Optional[Any]:
with self.lock:
return self.store.pop(key, None)
def all(self) -> Iterator[Tuple[str, Any]]:
with self.lock:
for key, value in self.store.items():
yield key, value
def close(self) -> None:
with self.lock:
self.store.clear()
def flush(self) -> None:
# 内存存储不需要刷新
pass
def size(self) -> int:
with self.lock:
return len(self.store)
class PersistentKeyValueStore(StateStore):
"""持久化键值存储"""
def __init__(self, name: str, state_dir: str):
self.name = name
self.state_dir = state_dir
self.store_path = os.path.join(state_dir, f"{name}.db")
self.memory_store = {}
self.lock = threading.RLock()
# 确保状态目录存在
os.makedirs(state_dir, exist_ok=True)
# 加载现有数据
self._load_from_disk()
def _load_from_disk(self):
"""从磁盘加载数据"""
if os.path.exists(self.store_path):
try:
with open(self.store_path, 'rb') as f:
self.memory_store = pickle.load(f)
except Exception as e:
print(f"加载状态存储失败: {e}")
self.memory_store = {}
def _save_to_disk(self):
"""保存数据到磁盘"""
try:
with open(self.store_path, 'wb') as f:
pickle.dump(self.memory_store, f)
except Exception as e:
print(f"保存状态存储失败: {e}")
def put(self, key: str, value: Any) -> None:
with self.lock:
self.memory_store[key] = value
def get(self, key: str) -> Optional[Any]:
with self.lock:
return self.memory_store.get(key)
def delete(self, key: str) -> Optional[Any]:
with self.lock:
return self.memory_store.pop(key, None)
def all(self) -> Iterator[Tuple[str, Any]]:
with self.lock:
for key, value in self.memory_store.items():
yield key, value
def close(self) -> None:
self.flush()
with self.lock:
self.memory_store.clear()
def flush(self) -> None:
with self.lock:
self._save_to_disk()
def size(self) -> int:
with self.lock:
return len(self.memory_store)
class WindowStore(StateStore):
"""窗口存储"""
def __init__(self, name: str, window_size_ms: int, retention_ms: int):
self.name = name
self.window_size_ms = window_size_ms
self.retention_ms = retention_ms
self.windows = {} # {window_start: {key: value}}
self.lock = threading.RLock()
def _get_window_start(self, timestamp: int) -> int:
"""获取窗口开始时间"""
return (timestamp // self.window_size_ms) * self.window_size_ms
def _cleanup_old_windows(self, current_time: int):
"""清理过期窗口"""
cutoff_time = current_time - self.retention_ms
expired_windows = []
for window_start in self.windows:
if window_start < cutoff_time:
expired_windows.append(window_start)
for window_start in expired_windows:
del self.windows[window_start]
def put(self, key: str, value: Any, timestamp: int = None) -> None:
if timestamp is None:
timestamp = int(time.time() * 1000)
with self.lock:
window_start = self._get_window_start(timestamp)
if window_start not in self.windows:
self.windows[window_start] = {}
self.windows[window_start][key] = value
# 清理过期窗口
self._cleanup_old_windows(timestamp)
def get(self, key: str, timestamp: int = None) -> Optional[Any]:
if timestamp is None:
timestamp = int(time.time() * 1000)
with self.lock:
window_start = self._get_window_start(timestamp)
window_data = self.windows.get(window_start, {})
return window_data.get(key)
def get_window(self, window_start: int) -> Dict[str, Any]:
"""获取整个窗口的数据"""
with self.lock:
return self.windows.get(window_start, {}).copy()
def get_windows(self, key: str, from_time: int, to_time: int) -> Iterator[Tuple[int, Any]]:
"""获取指定时间范围内的窗口数据"""
with self.lock:
for window_start in sorted(self.windows.keys()):
if from_time <= window_start < to_time:
window_data = self.windows[window_start]
if key in window_data:
yield window_start, window_data[key]
def delete(self, key: str, timestamp: int = None) -> Optional[Any]:
if timestamp is None:
timestamp = int(time.time() * 1000)
with self.lock:
window_start = self._get_window_start(timestamp)
window_data = self.windows.get(window_start, {})
return window_data.pop(key, None)
def all(self) -> Iterator[Tuple[str, Any]]:
with self.lock:
for window_start, window_data in self.windows.items():
for key, value in window_data.items():
yield f"{key}@{window_start}", value
def close(self) -> None:
with self.lock:
self.windows.clear()
def flush(self) -> None:
# 窗口存储通常在内存中,不需要刷新
pass
class StateStoreManager:
"""状态存储管理器"""
def __init__(self, state_dir: str):
self.state_dir = state_dir
self.stores = {}
self.lock = threading.RLock()
def create_key_value_store(self, name: str, persistent: bool = True) -> StateStore:
"""创建键值存储"""
with self.lock:
if name in self.stores:
return self.stores[name]
if persistent:
store = PersistentKeyValueStore(name, self.state_dir)
else:
store = InMemoryKeyValueStore(name)
self.stores[name] = store
return store
def create_window_store(self, name: str, window_size_ms: int,
retention_ms: int) -> WindowStore:
"""创建窗口存储"""
with self.lock:
if name in self.stores:
return self.stores[name]
store = WindowStore(name, window_size_ms, retention_ms)
self.stores[name] = store
return store
def get_store(self, name: str) -> Optional[StateStore]:
"""获取存储"""
with self.lock:
return self.stores.get(name)
def close_all(self):
"""关闭所有存储"""
with self.lock:
for store in self.stores.values():
store.close()
self.stores.clear()
def flush_all(self):
"""刷新所有存储"""
with self.lock:
for store in self.stores.values():
store.flush()
# 使用示例
if __name__ == "__main__":
# 创建状态存储管理器
store_manager = StateStoreManager("/tmp/kafka-streams-state")
# 创建键值存储
kv_store = store_manager.create_key_value_store("user-counts")
# 存储数据
kv_store.put("user1", 10)
kv_store.put("user2", 20)
# 读取数据
print(f"user1 count: {kv_store.get('user1')}")
print(f"user2 count: {kv_store.get('user2')}")
# 创建窗口存储
window_store = store_manager.create_window_store(
"hourly-counts",
window_size_ms=3600000, # 1小时
retention_ms=86400000 # 24小时
)
# 存储窗口数据
current_time = int(time.time() * 1000)
window_store.put("event1", 5, current_time)
window_store.put("event2", 3, current_time)
# 读取窗口数据
print(f"event1 in current window: {window_store.get('event1', current_time)}")
# 关闭所有存储
store_manager.close_all()
4.2 Kafka Connect
4.2.1 Connect概述
Kafka Connect是一个用于在Kafka和其他系统之间可扩展且可靠地流式传输数据的工具。它使得快速定义连接器变得简单,这些连接器将大量数据集合移入和移出Kafka。
import json
import time
import threading
from abc import ABC, abstractmethod
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, asdict
from enum import Enum
class ConnectorType(Enum):
"""连接器类型"""
SOURCE = "source"
SINK = "sink"
class TaskState(Enum):
"""任务状态"""
UNASSIGNED = "UNASSIGNED"
RUNNING = "RUNNING"
PAUSED = "PAUSED"
FAILED = "FAILED"
DESTROYED = "DESTROYED"
@dataclass
class ConnectorConfig:
"""连接器配置"""
name: str
connector_class: str
tasks_max: int = 1
topics: Optional[str] = None
topics_regex: Optional[str] = None
key_converter: str = "org.apache.kafka.connect.json.JsonConverter"
value_converter: str = "org.apache.kafka.connect.json.JsonConverter"
key_converter_schemas_enable: bool = False
value_converter_schemas_enable: bool = False
transforms: Optional[str] = None
errors_tolerance: str = "none" # none, all
errors_deadletterqueue_topic_name: Optional[str] = None
def to_dict(self) -> Dict[str, Any]:
"""转换为字典"""
config = asdict(self)
# 移除None值
return {k: v for k, v in config.items() if v is not None}
@dataclass
class SourceRecord:
"""源记录"""
source_partition: Dict[str, Any]
source_offset: Dict[str, Any]
topic: str
partition: Optional[int] = None
key_schema: Optional[Dict] = None
key: Any = None
value_schema: Optional[Dict] = None
value: Any = None
timestamp: Optional[int] = None
headers: Optional[Dict[str, str]] = None
def __post_init__(self):
if self.timestamp is None:
self.timestamp = int(time.time() * 1000)
if self.headers is None:
self.headers = {}
@dataclass
class SinkRecord:
"""接收记录"""
topic: str
partition: int
offset: int
key_schema: Optional[Dict] = None
key: Any = None
value_schema: Optional[Dict] = None
value: Any = None
timestamp: Optional[int] = None
headers: Optional[Dict[str, str]] = None
def __post_init__(self):
if self.headers is None:
self.headers = {}
class Connector(ABC):
"""连接器基类"""
def __init__(self):
self.config = {}
self.context = None
@abstractmethod
def version(self) -> str:
"""返回连接器版本"""
pass
@abstractmethod
def start(self, config: Dict[str, str]) -> None:
"""启动连接器"""
pass
@abstractmethod
def task_class(self) -> type:
"""返回任务类"""
pass
@abstractmethod
def task_configs(self, max_tasks: int) -> List[Dict[str, str]]:
"""生成任务配置"""
pass
@abstractmethod
def stop(self) -> None:
"""停止连接器"""
pass
def config_def(self) -> Dict[str, Any]:
"""配置定义"""
return {}
def validate(self, config: Dict[str, str]) -> List[Dict[str, str]]:
"""验证配置"""
return []
class Task(ABC):
"""任务基类"""
def __init__(self):
self.config = {}
self.context = None
@abstractmethod
def version(self) -> str:
"""返回任务版本"""
pass
@abstractmethod
def start(self, config: Dict[str, str]) -> None:
"""启动任务"""
pass
@abstractmethod
def stop(self) -> None:
"""停止任务"""
pass
class SourceTask(Task):
"""源任务"""
@abstractmethod
def poll(self) -> List[SourceRecord]:
"""轮询数据"""
pass
def commit(self) -> None:
"""提交偏移量"""
pass
def commit_record(self, record: SourceRecord, metadata: Dict[str, Any]) -> None:
"""提交单个记录"""
pass
class SinkTask(Task):
"""接收任务"""
@abstractmethod
def put(self, records: List[SinkRecord]) -> None:
"""处理记录"""
pass
def flush(self, offsets: Dict[str, Dict[int, int]]) -> None:
"""刷新数据"""
pass
def preCommit(self, offsets: Dict[str, Dict[int, int]]) -> Dict[str, Dict[int, int]]:
"""预提交"""
return offsets
# 示例:文件源连接器
class FileSourceConnector(Connector):
"""文件源连接器"""
def version(self) -> str:
return "1.0.0"
def start(self, config: Dict[str, str]) -> None:
self.config = config
self.filename = config.get("file")
self.topic = config.get("topic")
if not self.filename:
raise ValueError("file配置项是必需的")
if not self.topic:
raise ValueError("topic配置项是必需的")
def task_class(self) -> type:
return FileSourceTask
def task_configs(self, max_tasks: int) -> List[Dict[str, str]]:
# 文件源连接器通常只需要一个任务
return [{
"file": self.config["file"],
"topic": self.config["topic"]
}]
def stop(self) -> None:
pass
def config_def(self) -> Dict[str, Any]:
return {
"file": {
"type": "string",
"required": True,
"documentation": "要读取的文件路径"
},
"topic": {
"type": "string",
"required": True,
"documentation": "目标Kafka主题"
}
}
class FileSourceTask(SourceTask):
"""文件源任务"""
def __init__(self):
super().__init__()
self.filename = None
self.topic = None
self.file_handle = None
self.position = 0
def version(self) -> str:
return "1.0.0"
def start(self, config: Dict[str, str]) -> None:
self.config = config
self.filename = config["file"]
self.topic = config["topic"]
try:
self.file_handle = open(self.filename, 'r', encoding='utf-8')
# 如果有保存的偏移量,跳转到该位置
if hasattr(self.context, 'offset_storage'):
saved_offset = self.context.offset_storage.get({"filename": self.filename})
if saved_offset and "position" in saved_offset:
self.position = saved_offset["position"]
self.file_handle.seek(self.position)
except IOError as e:
raise RuntimeError(f"无法打开文件 {self.filename}: {e}")
def poll(self) -> List[SourceRecord]:
if not self.file_handle:
return []
records = []
try:
# 读取一行
line = self.file_handle.readline()
if line:
# 更新位置
self.position = self.file_handle.tell()
# 创建源记录
record = SourceRecord(
source_partition={"filename": self.filename},
source_offset={"position": self.position},
topic=self.topic,
key=None,
value=line.strip(),
timestamp=int(time.time() * 1000)
)
records.append(record)
else:
# 文件结束,等待新内容
time.sleep(1)
except IOError as e:
print(f"读取文件错误: {e}")
return records
def stop(self) -> None:
if self.file_handle:
self.file_handle.close()
self.file_handle = None
# 示例:文件接收连接器
class FileSinkConnector(Connector):
"""文件接收连接器"""
def version(self) -> str:
return "1.0.0"
def start(self, config: Dict[str, str]) -> None:
self.config = config
self.filename = config.get("file")
self.topics = config.get("topics", "").split(",")
if not self.filename:
raise ValueError("file配置项是必需的")
if not self.topics or not self.topics[0]:
raise ValueError("topics配置项是必需的")
def task_class(self) -> type:
return FileSinkTask
def task_configs(self, max_tasks: int) -> List[Dict[str, str]]:
return [{
"file": self.config["file"],
"topics": self.config["topics"]
}]
def stop(self) -> None:
pass
def config_def(self) -> Dict[str, Any]:
return {
"file": {
"type": "string",
"required": True,
"documentation": "要写入的文件路径"
},
"topics": {
"type": "string",
"required": True,
"documentation": "要消费的Kafka主题列表(逗号分隔)"
}
}
class FileSinkTask(SinkTask):
"""文件接收任务"""
def __init__(self):
super().__init__()
self.filename = None
self.file_handle = None
def version(self) -> str:
return "1.0.0"
def start(self, config: Dict[str, str]) -> None:
self.config = config
self.filename = config["file"]
try:
self.file_handle = open(self.filename, 'a', encoding='utf-8')
except IOError as e:
raise RuntimeError(f"无法打开文件 {self.filename}: {e}")
def put(self, records: List[SinkRecord]) -> None:
if not self.file_handle:
return
try:
for record in records:
# 写入记录到文件
line = f"{record.timestamp},{record.topic},{record.partition},{record.offset},{record.key},{record.value}\n"
self.file_handle.write(line)
# 刷新到磁盘
self.file_handle.flush()
except IOError as e:
print(f"写入文件错误: {e}")
raise
def flush(self, offsets: Dict[str, Dict[int, int]]) -> None:
if self.file_handle:
self.file_handle.flush()
def stop(self) -> None:
if self.file_handle:
self.file_handle.close()
self.file_handle = None
class ConnectWorker:
"""Connect工作节点"""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.connectors = {}
self.tasks = {}
self.running = False
self.worker_thread = None
def start(self):
"""启动工作节点"""
self.running = True
self.worker_thread = threading.Thread(target=self._run_worker)
self.worker_thread.start()
print("Kafka Connect工作节点已启动")
def stop(self):
"""停止工作节点"""
self.running = False
# 停止所有连接器和任务
for connector in self.connectors.values():
connector.stop()
for task in self.tasks.values():
task.stop()
if self.worker_thread:
self.worker_thread.join()
print("Kafka Connect工作节点已停止")
def _run_worker(self):
"""运行工作节点"""
while self.running:
try:
# 处理源任务
for task_id, task in self.tasks.items():
if isinstance(task, SourceTask):
records = task.poll()
if records:
# 这里应该发送到Kafka
print(f"任务 {task_id} 产生了 {len(records)} 条记录")
time.sleep(1)
except Exception as e:
print(f"工作节点错误: {e}")
def create_connector(self, config: ConnectorConfig) -> str:
"""创建连接器"""
connector_name = config.name
# 根据连接器类创建实例
if config.connector_class == "FileSourceConnector":
connector = FileSourceConnector()
elif config.connector_class == "FileSinkConnector":
connector = FileSinkConnector()
else:
raise ValueError(f"未知的连接器类: {config.connector_class}")
# 启动连接器
connector.start(config.to_dict())
self.connectors[connector_name] = connector
# 创建任务
task_configs = connector.task_configs(config.tasks_max)
task_class = connector.task_class()
for i, task_config in enumerate(task_configs):
task_id = f"{connector_name}-{i}"
task = task_class()
task.start(task_config)
self.tasks[task_id] = task
print(f"连接器 {connector_name} 已创建,包含 {len(task_configs)} 个任务")
return connector_name
def delete_connector(self, connector_name: str) -> bool:
"""删除连接器"""
if connector_name not in self.connectors:
return False
# 停止连接器
connector = self.connectors[connector_name]
connector.stop()
del self.connectors[connector_name]
# 停止相关任务
tasks_to_remove = []
for task_id, task in self.tasks.items():
if task_id.startswith(f"{connector_name}-"):
task.stop()
tasks_to_remove.append(task_id)
for task_id in tasks_to_remove:
del self.tasks[task_id]
print(f"连接器 {connector_name} 已删除")
return True
def get_connector_status(self, connector_name: str) -> Dict[str, Any]:
"""获取连接器状态"""
if connector_name not in self.connectors:
return {"error": "连接器不存在"}
# 获取任务状态
task_states = []
for task_id, task in self.tasks.items():
if task_id.startswith(f"{connector_name}-"):
task_states.append({
"id": task_id,
"state": "RUNNING", # 简化状态
"worker_id": "worker-1"
})
return {
"name": connector_name,
"connector": {
"state": "RUNNING",
"worker_id": "worker-1"
},
"tasks": task_states
}
def list_connectors(self) -> List[str]:
"""列出所有连接器"""
return list(self.connectors.keys())
# 使用示例
if __name__ == "__main__":
# 创建Connect工作节点
worker_config = {
"bootstrap.servers": "localhost:9092",
"group.id": "connect-cluster",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter"
}
worker = ConnectWorker(worker_config)
worker.start()
try:
# 创建文件源连接器
source_config = ConnectorConfig(
name="file-source",
connector_class="FileSourceConnector",
tasks_max=1
)
source_config.file = "/tmp/test-input.txt"
source_config.topic = "file-topic"
worker.create_connector(source_config)
# 创建文件接收连接器
sink_config = ConnectorConfig(
name="file-sink",
connector_class="FileSinkConnector",
tasks_max=1
)
sink_config.file = "/tmp/test-output.txt"
sink_config.topics = "file-topic"
worker.create_connector(sink_config)
# 查看连接器状态
print("连接器列表:", worker.list_connectors())
print("源连接器状态:", worker.get_connector_status("file-source"))
print("接收连接器状态:", worker.get_connector_status("file-sink"))
# 运行一段时间
time.sleep(30)
finally:
worker.stop()
4.3 Schema Registry
4.3.1 Schema管理
import json
import hashlib
from typing import Dict, List, Any, Optional, Union
from dataclasses import dataclass
from enum import Enum
import threading
class SchemaType(Enum):
"""Schema类型"""
AVRO = "AVRO"
JSON = "JSON"
PROTOBUF = "PROTOBUF"
class CompatibilityLevel(Enum):
"""兼容性级别"""
BACKWARD = "BACKWARD"
BACKWARD_TRANSITIVE = "BACKWARD_TRANSITIVE"
FORWARD = "FORWARD"
FORWARD_TRANSITIVE = "FORWARD_TRANSITIVE"
FULL = "FULL"
FULL_TRANSITIVE = "FULL_TRANSITIVE"
NONE = "NONE"
@dataclass
class Schema:
"""Schema定义"""
id: int
version: int
schema_type: SchemaType
schema_str: str
subject: str
references: List[Dict[str, Any]] = None
def __post_init__(self):
if self.references is None:
self.references = []
def get_hash(self) -> str:
"""获取Schema哈希值"""
content = f"{self.schema_type.value}:{self.schema_str}"
return hashlib.sha256(content.encode()).hexdigest()
def to_dict(self) -> Dict[str, Any]:
"""转换为字典"""
return {
"id": self.id,
"version": self.version,
"schemaType": self.schema_type.value,
"schema": self.schema_str,
"subject": self.subject,
"references": self.references
}
class SchemaCompatibilityChecker:
"""Schema兼容性检查器"""
@staticmethod
def check_compatibility(reader_schema: Schema, writer_schema: Schema,
compatibility_level: CompatibilityLevel) -> bool:
"""检查Schema兼容性"""
if reader_schema.schema_type != writer_schema.schema_type:
return False
if compatibility_level == CompatibilityLevel.NONE:
return True
# 简化的兼容性检查逻辑
if compatibility_level in [CompatibilityLevel.BACKWARD, CompatibilityLevel.BACKWARD_TRANSITIVE]:
return SchemaCompatibilityChecker._check_backward_compatibility(reader_schema, writer_schema)
elif compatibility_level in [CompatibilityLevel.FORWARD, CompatibilityLevel.FORWARD_TRANSITIVE]:
return SchemaCompatibilityChecker._check_forward_compatibility(reader_schema, writer_schema)
elif compatibility_level in [CompatibilityLevel.FULL, CompatibilityLevel.FULL_TRANSITIVE]:
return (SchemaCompatibilityChecker._check_backward_compatibility(reader_schema, writer_schema) and
SchemaCompatibilityChecker._check_forward_compatibility(reader_schema, writer_schema))
return False
@staticmethod
def _check_backward_compatibility(reader_schema: Schema, writer_schema: Schema) -> bool:
"""检查向后兼容性"""
# 简化实现:检查字段是否可以安全读取
if reader_schema.schema_type == SchemaType.JSON:
try:
reader_json = json.loads(reader_schema.schema_str)
writer_json = json.loads(writer_schema.schema_str)
# 检查必需字段是否存在
reader_required = set(reader_json.get("required", []))
writer_properties = set(writer_json.get("properties", {}).keys())
return reader_required.issubset(writer_properties)
except json.JSONDecodeError:
return False
return True # 简化处理
@staticmethod
def _check_forward_compatibility(reader_schema: Schema, writer_schema: Schema) -> bool:
"""检查向前兼容性"""
# 简化实现:检查新字段是否有默认值
if reader_schema.schema_type == SchemaType.JSON:
try:
reader_json = json.loads(reader_schema.schema_str)
writer_json = json.loads(writer_schema.schema_str)
# 检查新增字段是否有默认值
reader_properties = set(reader_json.get("properties", {}).keys())
writer_required = set(writer_json.get("required", []))
new_required = writer_required - reader_properties
return len(new_required) == 0
except json.JSONDecodeError:
return False
return True # 简化处理
class SchemaRegistry:
"""Schema注册中心"""
def __init__(self):
self.schemas = {} # {id: Schema}
self.subjects = {} # {subject: {version: schema_id}}
self.next_id = 1
self.compatibility_levels = {} # {subject: CompatibilityLevel}
self.global_compatibility = CompatibilityLevel.BACKWARD
self.lock = threading.RLock()
def register_schema(self, subject: str, schema_str: str,
schema_type: SchemaType = SchemaType.AVRO,
references: List[Dict[str, Any]] = None) -> int:
"""注册Schema"""
with self.lock:
# 检查是否已存在相同的Schema
existing_id = self._find_existing_schema(subject, schema_str, schema_type)
if existing_id:
return existing_id
# 获取下一个版本号
next_version = self._get_next_version(subject)
# 创建新Schema
schema = Schema(
id=self.next_id,
version=next_version,
schema_type=schema_type,
schema_str=schema_str,
subject=subject,
references=references or []
)
# 兼容性检查
if not self._check_compatibility_for_subject(subject, schema):
raise ValueError(f"Schema与主题 {subject} 的兼容性检查失败")
# 存储Schema
self.schemas[self.next_id] = schema
if subject not in self.subjects:
self.subjects[subject] = {}
self.subjects[subject][next_version] = self.next_id
schema_id = self.next_id
self.next_id += 1
return schema_id
def get_schema_by_id(self, schema_id: int) -> Optional[Schema]:
"""根据ID获取Schema"""
with self.lock:
return self.schemas.get(schema_id)
def get_latest_schema(self, subject: str) -> Optional[Schema]:
"""获取主题的最新Schema"""
with self.lock:
if subject not in self.subjects:
return None
latest_version = max(self.subjects[subject].keys())
schema_id = self.subjects[subject][latest_version]
return self.schemas.get(schema_id)
def get_schema_by_version(self, subject: str, version: int) -> Optional[Schema]:
"""根据版本获取Schema"""
with self.lock:
if subject not in self.subjects or version not in self.subjects[subject]:
return None
schema_id = self.subjects[subject][version]
return self.schemas.get(schema_id)
def get_all_versions(self, subject: str) -> List[int]:
"""获取主题的所有版本"""
with self.lock:
if subject not in self.subjects:
return []
return sorted(self.subjects[subject].keys())
def delete_subject(self, subject: str, permanent: bool = False) -> List[int]:
"""删除主题"""
with self.lock:
if subject not in self.subjects:
return []
deleted_versions = list(self.subjects[subject].keys())
if permanent:
# 永久删除
for version, schema_id in self.subjects[subject].items():
if schema_id in self.schemas:
del self.schemas[schema_id]
del self.subjects[subject]
else:
# 软删除(标记为删除)
for version, schema_id in self.subjects[subject].items():
if schema_id in self.schemas:
self.schemas[schema_id].deleted = True
return deleted_versions
def delete_schema_version(self, subject: str, version: int,
permanent: bool = False) -> Optional[int]:
"""删除特定版本的Schema"""
with self.lock:
if subject not in self.subjects or version not in self.subjects[subject]:
return None
schema_id = self.subjects[subject][version]
if permanent:
# 永久删除
if schema_id in self.schemas:
del self.schemas[schema_id]
del self.subjects[subject][version]
else:
# 软删除
if schema_id in self.schemas:
self.schemas[schema_id].deleted = True
return schema_id
def set_compatibility_level(self, subject: str,
compatibility: CompatibilityLevel) -> None:
"""设置主题兼容性级别"""
with self.lock:
self.compatibility_levels[subject] = compatibility
def get_compatibility_level(self, subject: str) -> CompatibilityLevel:
"""获取主题兼容性级别"""
with self.lock:
return self.compatibility_levels.get(subject, self.global_compatibility)
def test_compatibility(self, subject: str, schema_str: str,
schema_type: SchemaType = SchemaType.AVRO,
version: Optional[int] = None) -> bool:
"""测试Schema兼容性"""
with self.lock:
test_schema = Schema(
id=-1, # 临时ID
version=-1, # 临时版本
schema_type=schema_type,
schema_str=schema_str,
subject=subject
)
if version is None:
# 与最新版本比较
latest_schema = self.get_latest_schema(subject)
if latest_schema is None:
return True # 没有现有Schema,兼容
return self._check_schema_compatibility(latest_schema, test_schema, subject)
else:
# 与指定版本比较
existing_schema = self.get_schema_by_version(subject, version)
if existing_schema is None:
return False # 指定版本不存在
return self._check_schema_compatibility(existing_schema, test_schema, subject)
def list_subjects(self) -> List[str]:
"""列出所有主题"""
with self.lock:
return list(self.subjects.keys())
def _find_existing_schema(self, subject: str, schema_str: str,
schema_type: SchemaType) -> Optional[int]:
"""查找已存在的相同Schema"""
if subject not in self.subjects:
return None
for version, schema_id in self.subjects[subject].items():
schema = self.schemas.get(schema_id)
if (schema and schema.schema_str == schema_str and
schema.schema_type == schema_type):
return schema_id
return None
def _get_next_version(self, subject: str) -> int:
"""获取下一个版本号"""
if subject not in self.subjects or not self.subjects[subject]:
return 1
return max(self.subjects[subject].keys()) + 1
def _check_compatibility_for_subject(self, subject: str, new_schema: Schema) -> bool:
"""检查主题的兼容性"""
latest_schema = self.get_latest_schema(subject)
if latest_schema is None:
return True # 第一个Schema,总是兼容
return self._check_schema_compatibility(latest_schema, new_schema, subject)
def _check_schema_compatibility(self, existing_schema: Schema,
new_schema: Schema, subject: str) -> bool:
"""检查两个Schema的兼容性"""
compatibility_level = self.get_compatibility_level(subject)
return SchemaCompatibilityChecker.check_compatibility(
existing_schema, new_schema, compatibility_level
)
class SchemaRegistryClient:
"""Schema Registry客户端"""
def __init__(self, base_url: str = "http://localhost:8081"):
self.base_url = base_url.rstrip('/')
self.cache = {} # {schema_id: Schema}
self.subject_cache = {} # {subject: {version: schema_id}}
self.lock = threading.RLock()
def register_schema(self, subject: str, schema: Dict[str, Any]) -> int:
"""注册Schema"""
# 这里应该发送HTTP请求到Schema Registry
# 为了演示,我们使用本地注册中心
registry = SchemaRegistry()
schema_type = SchemaType(schema.get("schemaType", "AVRO"))
schema_str = schema["schema"]
references = schema.get("references", [])
return registry.register_schema(subject, schema_str, schema_type, references)
def get_schema_by_id(self, schema_id: int) -> Optional[Dict[str, Any]]:
"""根据ID获取Schema"""
with self.lock:
if schema_id in self.cache:
return self.cache[schema_id].to_dict()
# 这里应该发送HTTP请求
# 为了演示,返回None
return None
def get_latest_schema(self, subject: str) -> Optional[Dict[str, Any]]:
"""获取最新Schema"""
# 这里应该发送HTTP请求
return None
def get_schema_by_version(self, subject: str, version: int) -> Optional[Dict[str, Any]]:
"""根据版本获取Schema"""
# 这里应该发送HTTP请求
return None
def test_compatibility(self, subject: str, schema: Dict[str, Any],
version: Optional[int] = None) -> bool:
"""测试兼容性"""
# 这里应该发送HTTP请求
return True
def set_compatibility_level(self, subject: str, level: str) -> Dict[str, str]:
"""设置兼容性级别"""
# 这里应该发送HTTP请求
return {"compatibility": level}
def get_compatibility_level(self, subject: str) -> Dict[str, str]:
"""获取兼容性级别"""
# 这里应该发送HTTP请求
return {"compatibilityLevel": "BACKWARD"}
# 使用示例
if __name__ == "__main__":
# 创建Schema Registry
registry = SchemaRegistry()
# 定义JSON Schema
user_schema_v1 = {
"type": "object",
"properties": {
"id": {"type": "integer"},
"name": {"type": "string"},
"email": {"type": "string"}
},
"required": ["id", "name"]
}
user_schema_v2 = {
"type": "object",
"properties": {
"id": {"type": "integer"},
"name": {"type": "string"},
"email": {"type": "string"},
"age": {"type": "integer"}
},
"required": ["id", "name"]
}
# 注册Schema
schema_id_v1 = registry.register_schema(
"user",
json.dumps(user_schema_v1),
SchemaType.JSON
)
print(f"注册Schema v1,ID: {schema_id_v1}")
# 测试兼容性
is_compatible = registry.test_compatibility(
"user",
json.dumps(user_schema_v2),
SchemaType.JSON
)
print(f"Schema v2兼容性: {is_compatible}")
# 注册新版本
if is_compatible:
schema_id_v2 = registry.register_schema(
"user",
json.dumps(user_schema_v2),
SchemaType.JSON
)
print(f"注册Schema v2,ID: {schema_id_v2}")
# 获取Schema信息
latest_schema = registry.get_latest_schema("user")
if latest_schema:
print(f"最新Schema版本: {latest_schema.version}")
print(f"Schema内容: {latest_schema.schema_str}")
# 列出所有版本
versions = registry.get_all_versions("user")
print(f"所有版本: {versions}")
4.4 事务支持
4.4.1 事务概述
Kafka事务提供了跨多个主题分区的原子性写入能力,确保一组消息要么全部成功,要么全部失败。
import time
import threading
from typing import Dict, List, Any, Optional, Set
from dataclasses import dataclass
from enum import Enum
import uuid
class TransactionState(Enum):
"""事务状态"""
EMPTY = "Empty"
ONGOING = "Ongoing"
PREPARE_COMMIT = "PrepareCommit"
PREPARE_ABORT = "PrepareAbort"
COMPLETE_COMMIT = "CompleteCommit"
COMPLETE_ABORT = "CompleteAbort"
DEAD = "Dead"
@dataclass
class TransactionMetadata:
"""事务元数据"""
transactional_id: str
producer_id: int
producer_epoch: int
state: TransactionState
timeout_ms: int
last_update_timestamp: int
partitions: Set[str] = None # topic-partition
def __post_init__(self):
if self.partitions is None:
self.partitions = set()
class TransactionCoordinator:
"""事务协调器"""
def __init__(self):
self.transactions = {} # {transactional_id: TransactionMetadata}
self.producer_ids = {} # {transactional_id: producer_id}
self.next_producer_id = 1000
self.lock = threading.RLock()
self.cleanup_thread = None
self.running = False
def start(self):
"""启动协调器"""
self.running = True
self.cleanup_thread = threading.Thread(target=self._cleanup_expired_transactions)
self.cleanup_thread.daemon = True
self.cleanup_thread.start()
def stop(self):
"""停止协调器"""
self.running = False
if self.cleanup_thread:
self.cleanup_thread.join()
def init_producer_id(self, transactional_id: Optional[str] = None,
timeout_ms: int = 60000) -> Dict[str, int]:
"""初始化生产者ID"""
with self.lock:
if transactional_id:
# 事务性生产者
if transactional_id in self.producer_ids:
producer_id = self.producer_ids[transactional_id]
# 增加epoch以防止僵尸生产者
if transactional_id in self.transactions:
self.transactions[transactional_id].producer_epoch += 1
epoch = self.transactions[transactional_id].producer_epoch
else:
epoch = 0
self.transactions[transactional_id] = TransactionMetadata(
transactional_id=transactional_id,
producer_id=producer_id,
producer_epoch=epoch,
state=TransactionState.EMPTY,
timeout_ms=timeout_ms,
last_update_timestamp=int(time.time() * 1000)
)
else:
producer_id = self.next_producer_id
self.next_producer_id += 1
self.producer_ids[transactional_id] = producer_id
self.transactions[transactional_id] = TransactionMetadata(
transactional_id=transactional_id,
producer_id=producer_id,
producer_epoch=0,
state=TransactionState.EMPTY,
timeout_ms=timeout_ms,
last_update_timestamp=int(time.time() * 1000)
)
return {
"producer_id": producer_id,
"producer_epoch": self.transactions[transactional_id].producer_epoch
}
else:
# 非事务性生产者
producer_id = self.next_producer_id
self.next_producer_id += 1
return {
"producer_id": producer_id,
"producer_epoch": 0
}
def begin_transaction(self, transactional_id: str) -> bool:
"""开始事务"""
with self.lock:
if transactional_id not in self.transactions:
return False
transaction = self.transactions[transactional_id]
if transaction.state != TransactionState.EMPTY:
return False
transaction.state = TransactionState.ONGOING
transaction.last_update_timestamp = int(time.time() * 1000)
transaction.partitions.clear()
return True
def add_partitions_to_transaction(self, transactional_id: str,
partitions: List[str]) -> bool:
"""添加分区到事务"""
with self.lock:
if transactional_id not in self.transactions:
return False
transaction = self.transactions[transactional_id]
if transaction.state != TransactionState.ONGOING:
return False
transaction.partitions.update(partitions)
transaction.last_update_timestamp = int(time.time() * 1000)
return True
def prepare_commit_transaction(self, transactional_id: str) -> bool:
"""准备提交事务"""
with self.lock:
if transactional_id not in self.transactions:
return False
transaction = self.transactions[transactional_id]
if transaction.state != TransactionState.ONGOING:
return False
transaction.state = TransactionState.PREPARE_COMMIT
transaction.last_update_timestamp = int(time.time() * 1000)
return True
def commit_transaction(self, transactional_id: str) -> bool:
"""提交事务"""
with self.lock:
if transactional_id not in self.transactions:
return False
transaction = self.transactions[transactional_id]
if transaction.state != TransactionState.PREPARE_COMMIT:
return False
# 向所有分区发送提交标记
self._send_transaction_markers(transaction, commit=True)
transaction.state = TransactionState.COMPLETE_COMMIT
transaction.last_update_timestamp = int(time.time() * 1000)
# 重置为空状态
transaction.state = TransactionState.EMPTY
transaction.partitions.clear()
return True
def prepare_abort_transaction(self, transactional_id: str) -> bool:
"""准备中止事务"""
with self.lock:
if transactional_id not in self.transactions:
return False
transaction = self.transactions[transactional_id]
if transaction.state not in [TransactionState.ONGOING, TransactionState.PREPARE_COMMIT]:
return False
transaction.state = TransactionState.PREPARE_ABORT
transaction.last_update_timestamp = int(time.time() * 1000)
return True
def abort_transaction(self, transactional_id: str) -> bool:
"""中止事务"""
with self.lock:
if transactional_id not in self.transactions:
return False
transaction = self.transactions[transactional_id]
if transaction.state != TransactionState.PREPARE_ABORT:
return False
# 向所有分区发送中止标记
self._send_transaction_markers(transaction, commit=False)
transaction.state = TransactionState.COMPLETE_ABORT
transaction.last_update_timestamp = int(time.time() * 1000)
# 重置为空状态
transaction.state = TransactionState.EMPTY
transaction.partitions.clear()
return True
def get_transaction_state(self, transactional_id: str) -> Optional[TransactionState]:
"""获取事务状态"""
with self.lock:
if transactional_id in self.transactions:
return self.transactions[transactional_id].state
return None
def _send_transaction_markers(self, transaction: TransactionMetadata, commit: bool):
"""发送事务标记"""
marker_type = "COMMIT" if commit else "ABORT"
for partition in transaction.partitions:
# 这里应该向分区发送事务标记
print(f"发送{marker_type}标记到分区 {partition},事务ID: {transaction.transactional_id}")
def _cleanup_expired_transactions(self):
"""清理过期事务"""
while self.running:
try:
current_time = int(time.time() * 1000)
expired_transactions = []
with self.lock:
for transactional_id, transaction in self.transactions.items():
if (current_time - transaction.last_update_timestamp > transaction.timeout_ms and
transaction.state in [TransactionState.ONGOING, TransactionState.PREPARE_COMMIT]):
expired_transactions.append(transactional_id)
# 中止过期事务
for transactional_id in expired_transactions:
print(f"中止过期事务: {transactional_id}")
self.prepare_abort_transaction(transactional_id)
self.abort_transaction(transactional_id)
time.sleep(10) # 每10秒检查一次
except Exception as e:
print(f"清理过期事务时发生错误: {e}")
class TransactionalProducer:
"""事务性生产者"""
def __init__(self, transactional_id: str, coordinator: TransactionCoordinator):
self.transactional_id = transactional_id
self.coordinator = coordinator
self.producer_id = None
self.producer_epoch = None
self.in_transaction = False
self.transaction_partitions = set()
def init_transactions(self, timeout_ms: int = 60000):
"""初始化事务"""
result = self.coordinator.init_producer_id(self.transactional_id, timeout_ms)
self.producer_id = result["producer_id"]
self.producer_epoch = result["producer_epoch"]
print(f"事务性生产者初始化完成,ID: {self.producer_id}, Epoch: {self.producer_epoch}")
def begin_transaction(self):
"""开始事务"""
if self.in_transaction:
raise RuntimeError("事务已经开始")
success = self.coordinator.begin_transaction(self.transactional_id)
if not success:
raise RuntimeError("开始事务失败")
self.in_transaction = True
self.transaction_partitions.clear()
print(f"事务开始: {self.transactional_id}")
def send(self, topic: str, partition: int, key: Any, value: Any) -> Dict[str, Any]:
"""发送消息"""
if not self.in_transaction:
raise RuntimeError("必须在事务中发送消息")
partition_key = f"{topic}-{partition}"
# 如果是新分区,添加到事务中
if partition_key not in self.transaction_partitions:
success = self.coordinator.add_partitions_to_transaction(
self.transactional_id, [partition_key]
)
if not success:
raise RuntimeError(f"添加分区到事务失败: {partition_key}")
self.transaction_partitions.add(partition_key)
# 模拟发送消息
record_metadata = {
"topic": topic,
"partition": partition,
"offset": int(time.time() * 1000), # 模拟offset
"timestamp": int(time.time() * 1000),
"producer_id": self.producer_id,
"producer_epoch": self.producer_epoch
}
print(f"发送事务消息: {topic}-{partition}, key={key}, value={value}")
return record_metadata
def commit_transaction(self):
"""提交事务"""
if not self.in_transaction:
raise RuntimeError("没有活跃的事务")
# 准备提交
success = self.coordinator.prepare_commit_transaction(self.transactional_id)
if not success:
raise RuntimeError("准备提交事务失败")
# 提交事务
success = self.coordinator.commit_transaction(self.transactional_id)
if not success:
raise RuntimeError("提交事务失败")
self.in_transaction = False
self.transaction_partitions.clear()
print(f"事务提交成功: {self.transactional_id}")
def abort_transaction(self):
"""中止事务"""
if not self.in_transaction:
raise RuntimeError("没有活跃的事务")
# 准备中止
success = self.coordinator.prepare_abort_transaction(self.transactional_id)
if not success:
raise RuntimeError("准备中止事务失败")
# 中止事务
success = self.coordinator.abort_transaction(self.transactional_id)
if not success:
raise RuntimeError("中止事务失败")
self.in_transaction = False
self.transaction_partitions.clear()
print(f"事务中止成功: {self.transactional_id}")
def close(self):
"""关闭生产者"""
if self.in_transaction:
self.abort_transaction()
# 使用示例
if __name__ == "__main__":
# 创建事务协调器
coordinator = TransactionCoordinator()
coordinator.start()
try:
# 创建事务性生产者
producer = TransactionalProducer("order-processor", coordinator)
producer.init_transactions()
# 执行事务
producer.begin_transaction()
try:
# 发送多条消息
producer.send("orders", 0, "order1", {"id": 1, "amount": 100})
producer.send("inventory", 0, "item1", {"id": 1, "quantity": -1})
producer.send("payments", 0, "payment1", {"order_id": 1, "amount": 100})
# 提交事务
producer.commit_transaction()
except Exception as e:
print(f"事务执行失败: {e}")
producer.abort_transaction()
# 另一个事务示例
producer.begin_transaction()
try:
producer.send("orders", 0, "order2", {"id": 2, "amount": 200})
# 模拟错误
raise ValueError("模拟业务错误")
except Exception as e:
print(f"事务执行失败: {e}")
producer.abort_transaction()
producer.close()
finally:
coordinator.stop()
4.4.2 事务消费者
事务消费者可以配置为只读取已提交的事务消息,确保数据一致性。
class TransactionalConsumer:
"""事务性消费者"""
def __init__(self, group_id: str, isolation_level: str = "read_committed"):
self.group_id = group_id
self.isolation_level = isolation_level # read_committed 或 read_uncommitted
self.consumed_offsets = {} # {topic-partition: offset}
self.committed_offsets = {} # {topic-partition: offset}
self.transaction_markers = {} # {topic-partition: [markers]}
self.lock = threading.RLock()
def subscribe(self, topics: List[str]):
"""订阅主题"""
self.topics = topics
print(f"订阅主题: {topics}, 隔离级别: {self.isolation_level}")
def poll(self, timeout_ms: int = 1000) -> List[Dict[str, Any]]:
"""拉取消息"""
messages = []
# 模拟拉取消息
for topic in self.topics:
for partition in range(3): # 假设每个主题有3个分区
partition_key = f"{topic}-{partition}"
# 模拟消息
mock_messages = [
{
"topic": topic,
"partition": partition,
"offset": 100,
"key": "key1",
"value": {"data": "message1"},
"timestamp": int(time.time() * 1000),
"producer_id": 1001,
"producer_epoch": 0,
"is_transaction": True,
"transaction_committed": True
},
{
"topic": topic,
"partition": partition,
"offset": 101,
"key": "key2",
"value": {"data": "message2"},
"timestamp": int(time.time() * 1000),
"producer_id": 1002,
"producer_epoch": 0,
"is_transaction": True,
"transaction_committed": False # 未提交的事务
},
{
"topic": topic,
"partition": partition,
"offset": 102,
"key": "key3",
"value": {"data": "message3"},
"timestamp": int(time.time() * 1000),
"producer_id": 1003,
"producer_epoch": 0,
"is_transaction": False,
"transaction_committed": None # 非事务消息
}
]
# 根据隔离级别过滤消息
for msg in mock_messages:
if self._should_include_message(msg):
messages.append(msg)
self.consumed_offsets[partition_key] = msg["offset"]
return messages
def _should_include_message(self, message: Dict[str, Any]) -> bool:
"""判断是否应该包含消息"""
if self.isolation_level == "read_uncommitted":
return True
# read_committed 模式
if not message["is_transaction"]:
return True # 非事务消息总是可见
return message["transaction_committed"] # 只返回已提交的事务消息
def commit_sync(self, offsets: Optional[Dict[str, int]] = None):
"""同步提交偏移量"""
with self.lock:
if offsets:
self.committed_offsets.update(offsets)
else:
self.committed_offsets.update(self.consumed_offsets)
print(f"同步提交偏移量: {self.committed_offsets}")
def commit_async(self, offsets: Optional[Dict[str, int]] = None,
callback=None):
"""异步提交偏移量"""
def commit_task():
try:
self.commit_sync(offsets)
if callback:
callback(None, self.committed_offsets)
except Exception as e:
if callback:
callback(e, None)
thread = threading.Thread(target=commit_task)
thread.daemon = True
thread.start()
def seek_to_beginning(self, topic_partitions: List[str]):
"""跳转到分区开始"""
for tp in topic_partitions:
self.consumed_offsets[tp] = 0
print(f"跳转到开始位置: {topic_partitions}")
def seek_to_end(self, topic_partitions: List[str]):
"""跳转到分区末尾"""
for tp in topic_partitions:
self.consumed_offsets[tp] = 999999 # 模拟末尾偏移量
print(f"跳转到末尾位置: {topic_partitions}")
def close(self):
"""关闭消费者"""
print(f"关闭事务性消费者: {self.group_id}")
class ExactlyOnceProcessor:
"""精确一次处理器"""
def __init__(self, consumer_group: str, transactional_id: str):
self.consumer_group = consumer_group
self.transactional_id = transactional_id
self.coordinator = TransactionCoordinator()
self.consumer = None
self.producer = None
self.processed_offsets = {} # {topic-partition: offset}
self.running = False
def start(self):
"""启动处理器"""
self.coordinator.start()
# 创建事务性消费者
self.consumer = TransactionalConsumer(
group_id=self.consumer_group,
isolation_level="read_committed"
)
# 创建事务性生产者
self.producer = TransactionalProducer(
transactional_id=self.transactional_id,
coordinator=self.coordinator
)
self.producer.init_transactions()
self.running = True
print(f"精确一次处理器启动: {self.transactional_id}")
def process_messages(self, input_topics: List[str], output_topic: str):
"""处理消息"""
self.consumer.subscribe(input_topics)
while self.running:
try:
# 拉取消息
messages = self.consumer.poll(timeout_ms=1000)
if not messages:
continue
# 开始事务
self.producer.begin_transaction()
try:
# 处理每条消息
for message in messages:
processed_data = self._process_message(message)
# 发送处理结果
if processed_data:
self.producer.send(
topic=output_topic,
partition=message["partition"],
key=message["key"],
value=processed_data
)
# 提交消费者偏移量到事务
self._send_offsets_to_transaction(messages)
# 提交事务
self.producer.commit_transaction()
print(f"成功处理 {len(messages)} 条消息")
except Exception as e:
print(f"处理消息失败: {e}")
self.producer.abort_transaction()
except Exception as e:
print(f"处理循环错误: {e}")
time.sleep(1)
def _process_message(self, message: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""处理单条消息"""
try:
# 模拟消息处理逻辑
original_data = message["value"]
# 简单的数据转换
processed_data = {
"original": original_data,
"processed_at": int(time.time() * 1000),
"processor_id": self.transactional_id,
"source_topic": message["topic"],
"source_partition": message["partition"],
"source_offset": message["offset"]
}
return processed_data
except Exception as e:
print(f"处理消息失败: {e}")
return None
def _send_offsets_to_transaction(self, messages: List[Dict[str, Any]]):
"""将偏移量发送到事务"""
# 计算每个分区的最新偏移量
partition_offsets = {}
for message in messages:
partition_key = f"{message['topic']}-{message['partition']}"
partition_offsets[partition_key] = max(
partition_offsets.get(partition_key, -1),
message["offset"]
)
# 在实际实现中,这里会将偏移量作为事务的一部分提交
# 确保消息处理和偏移量提交的原子性
self.processed_offsets.update(partition_offsets)
print(f"将偏移量添加到事务: {partition_offsets}")
def stop(self):
"""停止处理器"""
self.running = False
if self.producer:
self.producer.close()
if self.consumer:
self.consumer.close()
if self.coordinator:
self.coordinator.stop()
print(f"精确一次处理器停止: {self.transactional_id}")
# 使用示例
if __name__ == "__main__":
# 事务性消费者示例
print("=== 事务性消费者示例 ===")
consumer = TransactionalConsumer(
group_id="transaction-consumer-group",
isolation_level="read_committed"
)
consumer.subscribe(["input-topic"])
# 拉取消息
messages = consumer.poll(timeout_ms=1000)
print(f"拉取到 {len(messages)} 条消息")
for msg in messages:
print(f"消息: {msg['topic']}-{msg['partition']}-{msg['offset']}, 值: {msg['value']}")
# 提交偏移量
consumer.commit_sync()
consumer.close()
print("\n=== 精确一次处理器示例 ===")
# 精确一次处理器示例
processor = ExactlyOnceProcessor(
consumer_group="exactly-once-group",
transactional_id="message-processor-1"
)
processor.start()
try:
# 模拟处理一批消息
input_topics = ["raw-events"]
output_topic = "processed-events"
# 在实际应用中,这会是一个长期运行的循环
processor.process_messages(input_topics, output_topic)
except KeyboardInterrupt:
print("收到停止信号")
finally:
processor.stop()
本章总结
核心知识点
Kafka Streams
- 流处理框架,支持有状态和无状态操作
- 提供高级DSL和低级Processor API
- 支持时间窗口、连接和聚合操作
- 内置容错和状态管理
Kafka Connect
- 数据集成框架,连接Kafka与外部系统
- 支持Source和Sink连接器
- 提供分布式、可扩展的数据管道
- 内置错误处理和重试机制
Schema Registry
- 集中式Schema管理服务
- 支持Schema演进和兼容性检查
- 提供多种兼容性级别
- 支持Avro、JSON Schema和Protobuf
事务支持
- 提供跨分区的原子性写入
- 支持精确一次语义(Exactly-Once Semantics)
- 事务协调器管理事务状态
- 消费者可配置隔离级别
最佳实践
流处理设计
- 合理设计拓扑结构,避免过度复杂
- 选择合适的时间窗口大小
- 正确处理乱序数据和延迟数据
- 监控流处理应用的性能指标
连接器配置
- 根据数据量选择合适的任务数量
- 配置适当的错误处理策略
- 使用死信队列处理无法处理的记录
- 定期监控连接器状态
Schema管理
- 建立Schema演进策略
- 选择合适的兼容性级别
- 定期清理不再使用的Schema版本
- 在生产环境中启用Schema验证
事务使用
- 仅在需要强一致性时使用事务
- 合理设置事务超时时间
- 监控事务协调器性能
- 正确处理事务失败和重试
练习题
基础练习
- 实现一个简单的Kafka Streams应用,统计每分钟的消息数量
- 创建一个文件Source连接器,将CSV文件数据导入Kafka
- 设计一个JSON Schema,并实现兼容性检查
- 编写一个事务性生产者,确保多条消息的原子性发送
进阶练习
- 实现一个复杂的流处理应用,包含多个操作和状态存储
- 开发一个自定义Sink连接器,将数据写入数据库
- 实现Schema Registry的高可用部署
- 设计一个精确一次处理的消息处理系统
项目练习
- 构建一个实时数据管道,包含数据采集、处理和存储
- 实现一个多租户的Schema管理系统
- 开发一个事务性的订单处理系统
- 设计一个支持Schema演进的微服务架构