4.1 Kafka Streams

4.1.1 Streams概述

Kafka Streams是一个用于构建实时流处理应用程序的客户端库。它提供了高级的流处理抽象,使开发者能够轻松构建复杂的流处理拓扑。

import json
import time
from typing import Dict, List, Any, Callable, Optional
from collections import defaultdict, deque
from dataclasses import dataclass
from datetime import datetime, timedelta
import threading
import queue

@dataclass
class StreamRecord:
    """流记录"""
    key: str
    value: Any
    timestamp: int
    topic: str
    partition: int
    offset: int
    headers: Dict[str, str] = None
    
    def __post_init__(self):
        if self.headers is None:
            self.headers = {}

class KafkaStreamsConfig:
    """Kafka Streams配置"""
    
    def __init__(self):
        self.application_id = "kafka-streams-app"
        self.bootstrap_servers = "localhost:9092"
        self.default_key_serde = "string"
        self.default_value_serde = "string"
        self.commit_interval_ms = 30000
        self.cache_max_bytes_buffering = 10485760  # 10MB
        self.num_stream_threads = 1
        self.processing_guarantee = "at_least_once"  # at_least_once, exactly_once
        self.state_dir = "/tmp/kafka-streams"
        self.replication_factor = 1
        self.window_store_change_log_additional_retention_ms = 86400000  # 1 day
    
    def to_dict(self) -> Dict[str, Any]:
        """转换为字典"""
        return {
            "application.id": self.application_id,
            "bootstrap.servers": self.bootstrap_servers,
            "default.key.serde": self.default_key_serde,
            "default.value.serde": self.default_value_serde,
            "commit.interval.ms": self.commit_interval_ms,
            "cache.max.bytes.buffering": self.cache_max_bytes_buffering,
            "num.stream.threads": self.num_stream_threads,
            "processing.guarantee": self.processing_guarantee,
            "state.dir": self.state_dir,
            "replication.factor": self.replication_factor,
            "windowstore.changelog.additional.retention.ms": self.window_store_change_log_additional_retention_ms
        }

class KStream:
    """Kafka流"""
    
    def __init__(self, topic: str, config: KafkaStreamsConfig):
        self.topic = topic
        self.config = config
        self.processors = []
        self.branches = []
        self.joined_streams = []
    
    def filter(self, predicate: Callable[[StreamRecord], bool]) -> 'KStream':
        """过滤流"""
        new_stream = KStream(f"{self.topic}-filtered", self.config)
        new_stream.processors = self.processors.copy()
        new_stream.processors.append(('filter', predicate))
        return new_stream
    
    def map(self, mapper: Callable[[StreamRecord], StreamRecord]) -> 'KStream':
        """映射流"""
        new_stream = KStream(f"{self.topic}-mapped", self.config)
        new_stream.processors = self.processors.copy()
        new_stream.processors.append(('map', mapper))
        return new_stream
    
    def map_values(self, mapper: Callable[[Any], Any]) -> 'KStream':
        """映射值"""
        def value_mapper(record: StreamRecord) -> StreamRecord:
            new_record = StreamRecord(
                key=record.key,
                value=mapper(record.value),
                timestamp=record.timestamp,
                topic=record.topic,
                partition=record.partition,
                offset=record.offset,
                headers=record.headers
            )
            return new_record
        
        return self.map(value_mapper)
    
    def flat_map(self, mapper: Callable[[StreamRecord], List[StreamRecord]]) -> 'KStream':
        """扁平映射"""
        new_stream = KStream(f"{self.topic}-flatmapped", self.config)
        new_stream.processors = self.processors.copy()
        new_stream.processors.append(('flatmap', mapper))
        return new_stream
    
    def group_by_key(self) -> 'KGroupedStream':
        """按键分组"""
        return KGroupedStream(self.topic, self.config, self.processors)
    
    def group_by(self, key_selector: Callable[[StreamRecord], str]) -> 'KGroupedStream':
        """按指定键分组"""
        def rekey_mapper(record: StreamRecord) -> StreamRecord:
            new_key = key_selector(record)
            return StreamRecord(
                key=new_key,
                value=record.value,
                timestamp=record.timestamp,
                topic=record.topic,
                partition=record.partition,
                offset=record.offset,
                headers=record.headers
            )
        
        rekeyed_stream = self.map(rekey_mapper)
        return KGroupedStream(f"{self.topic}-rekeyed", self.config, rekeyed_stream.processors)
    
    def join(self, other_stream: 'KStream', joiner: Callable[[Any, Any], Any], 
             window_size_ms: int = 60000) -> 'KStream':
        """流连接"""
        new_stream = KStream(f"{self.topic}-joined", self.config)
        new_stream.processors = self.processors.copy()
        new_stream.processors.append(('join', other_stream, joiner, window_size_ms))
        return new_stream
    
    def branch(self, *predicates: Callable[[StreamRecord], bool]) -> List['KStream']:
        """分支流"""
        branches = []
        for i, predicate in enumerate(predicates):
            branch_stream = KStream(f"{self.topic}-branch-{i}", self.config)
            branch_stream.processors = self.processors.copy()
            branch_stream.processors.append(('branch', predicate))
            branches.append(branch_stream)
        
        return branches
    
    def to(self, topic: str):
        """输出到主题"""
        self.processors.append(('to', topic))
    
    def foreach(self, action: Callable[[StreamRecord], None]):
        """对每个记录执行操作"""
        self.processors.append(('foreach', action))

class KGroupedStream:
    """分组流"""
    
    def __init__(self, topic: str, config: KafkaStreamsConfig, processors: List):
        self.topic = topic
        self.config = config
        self.processors = processors
    
    def count(self, window_size_ms: int = None) -> 'KTable':
        """计数聚合"""
        if window_size_ms:
            return self.windowed_by(window_size_ms).count()
        
        table = KTable(f"{self.topic}-count", self.config)
        table.processors = self.processors.copy()
        table.processors.append(('count', None))
        return table
    
    def aggregate(self, initializer: Callable[[], Any], 
                 aggregator: Callable[[Any, StreamRecord], Any],
                 window_size_ms: int = None) -> 'KTable':
        """自定义聚合"""
        if window_size_ms:
            return self.windowed_by(window_size_ms).aggregate(initializer, aggregator)
        
        table = KTable(f"{self.topic}-aggregated", self.config)
        table.processors = self.processors.copy()
        table.processors.append(('aggregate', initializer, aggregator))
        return table
    
    def reduce(self, reducer: Callable[[Any, Any], Any],
              window_size_ms: int = None) -> 'KTable':
        """归约聚合"""
        if window_size_ms:
            return self.windowed_by(window_size_ms).reduce(reducer)
        
        table = KTable(f"{self.topic}-reduced", self.config)
        table.processors = self.processors.copy()
        table.processors.append(('reduce', reducer))
        return table
    
    def windowed_by(self, window_size_ms: int) -> 'TimeWindowedKStream':
        """时间窗口"""
        return TimeWindowedKStream(self.topic, self.config, self.processors, window_size_ms)

class TimeWindowedKStream:
    """时间窗口流"""
    
    def __init__(self, topic: str, config: KafkaStreamsConfig, 
                 processors: List, window_size_ms: int):
        self.topic = topic
        self.config = config
        self.processors = processors
        self.window_size_ms = window_size_ms
    
    def count(self) -> 'KTable':
        """窗口计数"""
        table = KTable(f"{self.topic}-windowed-count", self.config)
        table.processors = self.processors.copy()
        table.processors.append(('windowed_count', self.window_size_ms))
        return table
    
    def aggregate(self, initializer: Callable[[], Any], 
                 aggregator: Callable[[Any, StreamRecord], Any]) -> 'KTable':
        """窗口聚合"""
        table = KTable(f"{self.topic}-windowed-aggregated", self.config)
        table.processors = self.processors.copy()
        table.processors.append(('windowed_aggregate', self.window_size_ms, initializer, aggregator))
        return table
    
    def reduce(self, reducer: Callable[[Any, Any], Any]) -> 'KTable':
        """窗口归约"""
        table = KTable(f"{self.topic}-windowed-reduced", self.config)
        table.processors = self.processors.copy()
        table.processors.append(('windowed_reduce', self.window_size_ms, reducer))
        return table

class KTable:
    """Kafka表"""
    
    def __init__(self, topic: str, config: KafkaStreamsConfig):
        self.topic = topic
        self.config = config
        self.processors = []
    
    def filter(self, predicate: Callable[[str, Any], bool]) -> 'KTable':
        """过滤表"""
        new_table = KTable(f"{self.topic}-filtered", self.config)
        new_table.processors = self.processors.copy()
        new_table.processors.append(('table_filter', predicate))
        return new_table
    
    def map_values(self, mapper: Callable[[Any], Any]) -> 'KTable':
        """映射值"""
        new_table = KTable(f"{self.topic}-mapped", self.config)
        new_table.processors = self.processors.copy()
        new_table.processors.append(('table_map_values', mapper))
        return new_table
    
    def join(self, other_table: 'KTable', joiner: Callable[[Any, Any], Any]) -> 'KTable':
        """表连接"""
        new_table = KTable(f"{self.topic}-joined", self.config)
        new_table.processors = self.processors.copy()
        new_table.processors.append(('table_join', other_table, joiner))
        return new_table
    
    def to_stream(self) -> KStream:
        """转换为流"""
        stream = KStream(f"{self.topic}-stream", self.config)
        stream.processors = self.processors.copy()
        stream.processors.append(('to_stream',))
        return stream
    
    def to(self, topic: str):
        """输出到主题"""
        self.processors.append(('table_to', topic))

class StreamsBuilder:
    """流构建器"""
    
    def __init__(self, config: KafkaStreamsConfig):
        self.config = config
        self.streams = {}
        self.tables = {}
    
    def stream(self, topic: str) -> KStream:
        """创建流"""
        if topic not in self.streams:
            self.streams[topic] = KStream(topic, self.config)
        return self.streams[topic]
    
    def table(self, topic: str) -> KTable:
        """创建表"""
        if topic not in self.tables:
            self.tables[topic] = KTable(topic, self.config)
        return self.tables[topic]
    
    def build(self) -> 'Topology':
        """构建拓扑"""
        return Topology(self.config, self.streams, self.tables)

class Topology:
    """流处理拓扑"""
    
    def __init__(self, config: KafkaStreamsConfig, streams: Dict[str, KStream], 
                 tables: Dict[str, KTable]):
        self.config = config
        self.streams = streams
        self.tables = tables
        self.state_stores = {}
        self.processors_graph = self._build_processors_graph()
    
    def _build_processors_graph(self) -> Dict[str, List]:
        """构建处理器图"""
        graph = {}
        
        # 添加流处理器
        for topic, stream in self.streams.items():
            graph[topic] = stream.processors
        
        # 添加表处理器
        for topic, table in self.tables.items():
            graph[topic] = table.processors
        
        return graph
    
    def describe(self) -> str:
        """描述拓扑"""
        description = f"Topology for application: {self.config.application_id}\n"
        description += "Streams:\n"
        
        for topic, processors in self.processors_graph.items():
            description += f"  {topic}:\n"
            for i, processor in enumerate(processors):
                if isinstance(processor, tuple):
                    description += f"    {i}: {processor[0]}\n"
                else:
                    description += f"    {i}: {processor}\n"
        
        return description

class KafkaStreams:
    """Kafka Streams应用程序"""
    
    def __init__(self, topology: Topology):
        self.topology = topology
        self.config = topology.config
        self.state = "CREATED"
        self.stream_threads = []
        self.state_stores = {}
        self.metrics = {
            "records_processed": 0,
            "records_failed": 0,
            "processing_rate": 0.0,
            "error_rate": 0.0
        }
        self.running = False
        self.exception_handler = None
    
    def set_uncaught_exception_handler(self, handler: Callable[[Exception], None]):
        """设置异常处理器"""
        self.exception_handler = handler
    
    def start(self):
        """启动流处理"""
        if self.state != "CREATED":
            raise RuntimeError(f"Cannot start streams in state: {self.state}")
        
        self.state = "STARTING"
        self.running = True
        
        # 创建流处理线程
        for i in range(self.config.num_stream_threads):
            thread = StreamThread(f"stream-thread-{i}", self)
            self.stream_threads.append(thread)
            thread.start()
        
        self.state = "RUNNING"
        print(f"Kafka Streams应用程序已启动: {self.config.application_id}")
    
    def close(self, timeout_ms: int = 30000):
        """关闭流处理"""
        if self.state not in ["RUNNING", "ERROR"]:
            return
        
        self.state = "PENDING_SHUTDOWN"
        self.running = False
        
        # 等待所有线程结束
        for thread in self.stream_threads:
            thread.join(timeout_ms / 1000)
        
        self.state = "NOT_RUNNING"
        print(f"Kafka Streams应用程序已关闭: {self.config.application_id}")
    
    def get_metrics(self) -> Dict[str, Any]:
        """获取指标"""
        return self.metrics.copy()
    
    def process_record(self, record: StreamRecord) -> List[StreamRecord]:
        """处理记录"""
        try:
            results = [record]
            
            # 获取对应的处理器链
            processors = self.topology.processors_graph.get(record.topic, [])
            
            for processor in processors:
                new_results = []
                
                for result_record in results:
                    processed = self._apply_processor(processor, result_record)
                    if processed:
                        if isinstance(processed, list):
                            new_results.extend(processed)
                        else:
                            new_results.append(processed)
                
                results = new_results
            
            self.metrics["records_processed"] += 1
            return results
            
        except Exception as e:
            self.metrics["records_failed"] += 1
            if self.exception_handler:
                self.exception_handler(e)
            else:
                print(f"处理记录时发生错误: {e}")
            return []
    
    def _apply_processor(self, processor, record: StreamRecord):
        """应用处理器"""
        if not isinstance(processor, tuple):
            return record
        
        processor_type = processor[0]
        
        if processor_type == "filter":
            predicate = processor[1]
            return record if predicate(record) else None
        
        elif processor_type == "map":
            mapper = processor[1]
            return mapper(record)
        
        elif processor_type == "flatmap":
            mapper = processor[1]
            return mapper(record)
        
        elif processor_type == "foreach":
            action = processor[1]
            action(record)
            return record
        
        elif processor_type == "to":
            # 这里应该发送到指定主题
            topic = processor[1]
            print(f"发送记录到主题 {topic}: {record}")
            return record
        
        # 其他处理器类型的实现...
        return record

class StreamThread(threading.Thread):
    """流处理线程"""
    
    def __init__(self, name: str, streams_app: KafkaStreams):
        super().__init__(name=name)
        self.streams_app = streams_app
        self.daemon = True
    
    def run(self):
        """运行流处理"""
        print(f"流处理线程启动: {self.name}")
        
        # 模拟消费和处理消息
        while self.streams_app.running:
            try:
                # 这里应该从Kafka消费消息
                # 为了演示,我们创建一些模拟记录
                time.sleep(1)
                
                # 模拟处理记录
                sample_record = StreamRecord(
                    key="test-key",
                    value={"message": "test message", "timestamp": int(time.time())},
                    timestamp=int(time.time() * 1000),
                    topic="input-topic",
                    partition=0,
                    offset=1
                )
                
                self.streams_app.process_record(sample_record)
                
            except Exception as e:
                print(f"流处理线程错误: {e}")
                if self.streams_app.exception_handler:
                    self.streams_app.exception_handler(e)
        
        print(f"流处理线程结束: {self.name}")

# 使用示例
if __name__ == "__main__":
    # 配置
    config = KafkaStreamsConfig()
    config.application_id = "word-count-app"
    config.bootstrap_servers = "localhost:9092"
    
    # 构建拓扑
    builder = StreamsBuilder(config)
    
    # 创建流处理拓扑
    input_stream = builder.stream("input-topic")
    
    # 单词计数示例
    word_counts = (input_stream
                  .map_values(lambda text: text.lower())
                  .flat_map(lambda record: [
                      StreamRecord(
                          key=word,
                          value=1,
                          timestamp=record.timestamp,
                          topic=record.topic,
                          partition=record.partition,
                          offset=record.offset
                      ) for word in record.value.split()
                  ])
                  .group_by_key()
                  .count())
    
    word_counts.to("word-counts-output")
    
    # 构建并启动应用程序
    topology = builder.build()
    streams = KafkaStreams(topology)
    
    print("拓扑描述:")
    print(topology.describe())
    
    try:
        streams.start()
        time.sleep(30)  # 运行30秒
    finally:
        streams.close()

4.1.2 状态存储

import os
import pickle
import threading
from abc import ABC, abstractmethod
from typing import Dict, Any, Optional, Iterator, Tuple

class StateStore(ABC):
    """状态存储接口"""
    
    @abstractmethod
    def put(self, key: str, value: Any) -> None:
        """存储键值对"""
        pass
    
    @abstractmethod
    def get(self, key: str) -> Optional[Any]:
        """获取值"""
        pass
    
    @abstractmethod
    def delete(self, key: str) -> Optional[Any]:
        """删除键值对"""
        pass
    
    @abstractmethod
    def all(self) -> Iterator[Tuple[str, Any]]:
        """获取所有键值对"""
        pass
    
    @abstractmethod
    def close(self) -> None:
        """关闭存储"""
        pass
    
    @abstractmethod
    def flush(self) -> None:
        """刷新到持久化存储"""
        pass

class InMemoryKeyValueStore(StateStore):
    """内存键值存储"""
    
    def __init__(self, name: str):
        self.name = name
        self.store = {}
        self.lock = threading.RLock()
    
    def put(self, key: str, value: Any) -> None:
        with self.lock:
            self.store[key] = value
    
    def get(self, key: str) -> Optional[Any]:
        with self.lock:
            return self.store.get(key)
    
    def delete(self, key: str) -> Optional[Any]:
        with self.lock:
            return self.store.pop(key, None)
    
    def all(self) -> Iterator[Tuple[str, Any]]:
        with self.lock:
            for key, value in self.store.items():
                yield key, value
    
    def close(self) -> None:
        with self.lock:
            self.store.clear()
    
    def flush(self) -> None:
        # 内存存储不需要刷新
        pass
    
    def size(self) -> int:
        with self.lock:
            return len(self.store)

class PersistentKeyValueStore(StateStore):
    """持久化键值存储"""
    
    def __init__(self, name: str, state_dir: str):
        self.name = name
        self.state_dir = state_dir
        self.store_path = os.path.join(state_dir, f"{name}.db")
        self.memory_store = {}
        self.lock = threading.RLock()
        
        # 确保状态目录存在
        os.makedirs(state_dir, exist_ok=True)
        
        # 加载现有数据
        self._load_from_disk()
    
    def _load_from_disk(self):
        """从磁盘加载数据"""
        if os.path.exists(self.store_path):
            try:
                with open(self.store_path, 'rb') as f:
                    self.memory_store = pickle.load(f)
            except Exception as e:
                print(f"加载状态存储失败: {e}")
                self.memory_store = {}
    
    def _save_to_disk(self):
        """保存数据到磁盘"""
        try:
            with open(self.store_path, 'wb') as f:
                pickle.dump(self.memory_store, f)
        except Exception as e:
            print(f"保存状态存储失败: {e}")
    
    def put(self, key: str, value: Any) -> None:
        with self.lock:
            self.memory_store[key] = value
    
    def get(self, key: str) -> Optional[Any]:
        with self.lock:
            return self.memory_store.get(key)
    
    def delete(self, key: str) -> Optional[Any]:
        with self.lock:
            return self.memory_store.pop(key, None)
    
    def all(self) -> Iterator[Tuple[str, Any]]:
        with self.lock:
            for key, value in self.memory_store.items():
                yield key, value
    
    def close(self) -> None:
        self.flush()
        with self.lock:
            self.memory_store.clear()
    
    def flush(self) -> None:
        with self.lock:
            self._save_to_disk()
    
    def size(self) -> int:
        with self.lock:
            return len(self.memory_store)

class WindowStore(StateStore):
    """窗口存储"""
    
    def __init__(self, name: str, window_size_ms: int, retention_ms: int):
        self.name = name
        self.window_size_ms = window_size_ms
        self.retention_ms = retention_ms
        self.windows = {}  # {window_start: {key: value}}
        self.lock = threading.RLock()
    
    def _get_window_start(self, timestamp: int) -> int:
        """获取窗口开始时间"""
        return (timestamp // self.window_size_ms) * self.window_size_ms
    
    def _cleanup_old_windows(self, current_time: int):
        """清理过期窗口"""
        cutoff_time = current_time - self.retention_ms
        expired_windows = []
        
        for window_start in self.windows:
            if window_start < cutoff_time:
                expired_windows.append(window_start)
        
        for window_start in expired_windows:
            del self.windows[window_start]
    
    def put(self, key: str, value: Any, timestamp: int = None) -> None:
        if timestamp is None:
            timestamp = int(time.time() * 1000)
        
        with self.lock:
            window_start = self._get_window_start(timestamp)
            
            if window_start not in self.windows:
                self.windows[window_start] = {}
            
            self.windows[window_start][key] = value
            
            # 清理过期窗口
            self._cleanup_old_windows(timestamp)
    
    def get(self, key: str, timestamp: int = None) -> Optional[Any]:
        if timestamp is None:
            timestamp = int(time.time() * 1000)
        
        with self.lock:
            window_start = self._get_window_start(timestamp)
            window_data = self.windows.get(window_start, {})
            return window_data.get(key)
    
    def get_window(self, window_start: int) -> Dict[str, Any]:
        """获取整个窗口的数据"""
        with self.lock:
            return self.windows.get(window_start, {}).copy()
    
    def get_windows(self, key: str, from_time: int, to_time: int) -> Iterator[Tuple[int, Any]]:
        """获取指定时间范围内的窗口数据"""
        with self.lock:
            for window_start in sorted(self.windows.keys()):
                if from_time <= window_start < to_time:
                    window_data = self.windows[window_start]
                    if key in window_data:
                        yield window_start, window_data[key]
    
    def delete(self, key: str, timestamp: int = None) -> Optional[Any]:
        if timestamp is None:
            timestamp = int(time.time() * 1000)
        
        with self.lock:
            window_start = self._get_window_start(timestamp)
            window_data = self.windows.get(window_start, {})
            return window_data.pop(key, None)
    
    def all(self) -> Iterator[Tuple[str, Any]]:
        with self.lock:
            for window_start, window_data in self.windows.items():
                for key, value in window_data.items():
                    yield f"{key}@{window_start}", value
    
    def close(self) -> None:
        with self.lock:
            self.windows.clear()
    
    def flush(self) -> None:
        # 窗口存储通常在内存中,不需要刷新
        pass

class StateStoreManager:
    """状态存储管理器"""
    
    def __init__(self, state_dir: str):
        self.state_dir = state_dir
        self.stores = {}
        self.lock = threading.RLock()
    
    def create_key_value_store(self, name: str, persistent: bool = True) -> StateStore:
        """创建键值存储"""
        with self.lock:
            if name in self.stores:
                return self.stores[name]
            
            if persistent:
                store = PersistentKeyValueStore(name, self.state_dir)
            else:
                store = InMemoryKeyValueStore(name)
            
            self.stores[name] = store
            return store
    
    def create_window_store(self, name: str, window_size_ms: int, 
                           retention_ms: int) -> WindowStore:
        """创建窗口存储"""
        with self.lock:
            if name in self.stores:
                return self.stores[name]
            
            store = WindowStore(name, window_size_ms, retention_ms)
            self.stores[name] = store
            return store
    
    def get_store(self, name: str) -> Optional[StateStore]:
        """获取存储"""
        with self.lock:
            return self.stores.get(name)
    
    def close_all(self):
        """关闭所有存储"""
        with self.lock:
            for store in self.stores.values():
                store.close()
            self.stores.clear()
    
    def flush_all(self):
        """刷新所有存储"""
        with self.lock:
            for store in self.stores.values():
                store.flush()

# 使用示例
if __name__ == "__main__":
    # 创建状态存储管理器
    store_manager = StateStoreManager("/tmp/kafka-streams-state")
    
    # 创建键值存储
    kv_store = store_manager.create_key_value_store("user-counts")
    
    # 存储数据
    kv_store.put("user1", 10)
    kv_store.put("user2", 20)
    
    # 读取数据
    print(f"user1 count: {kv_store.get('user1')}")
    print(f"user2 count: {kv_store.get('user2')}")
    
    # 创建窗口存储
    window_store = store_manager.create_window_store(
        "hourly-counts", 
        window_size_ms=3600000,  # 1小时
        retention_ms=86400000    # 24小时
    )
    
    # 存储窗口数据
    current_time = int(time.time() * 1000)
    window_store.put("event1", 5, current_time)
    window_store.put("event2", 3, current_time)
    
    # 读取窗口数据
    print(f"event1 in current window: {window_store.get('event1', current_time)}")
    
    # 关闭所有存储
    store_manager.close_all()

4.2 Kafka Connect

4.2.1 Connect概述

Kafka Connect是一个用于在Kafka和其他系统之间可扩展且可靠地流式传输数据的工具。它使得快速定义连接器变得简单,这些连接器将大量数据集合移入和移出Kafka。

import json
import time
import threading
from abc import ABC, abstractmethod
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, asdict
from enum import Enum

class ConnectorType(Enum):
    """连接器类型"""
    SOURCE = "source"
    SINK = "sink"

class TaskState(Enum):
    """任务状态"""
    UNASSIGNED = "UNASSIGNED"
    RUNNING = "RUNNING"
    PAUSED = "PAUSED"
    FAILED = "FAILED"
    DESTROYED = "DESTROYED"

@dataclass
class ConnectorConfig:
    """连接器配置"""
    name: str
    connector_class: str
    tasks_max: int = 1
    topics: Optional[str] = None
    topics_regex: Optional[str] = None
    key_converter: str = "org.apache.kafka.connect.json.JsonConverter"
    value_converter: str = "org.apache.kafka.connect.json.JsonConverter"
    key_converter_schemas_enable: bool = False
    value_converter_schemas_enable: bool = False
    transforms: Optional[str] = None
    errors_tolerance: str = "none"  # none, all
    errors_deadletterqueue_topic_name: Optional[str] = None
    
    def to_dict(self) -> Dict[str, Any]:
        """转换为字典"""
        config = asdict(self)
        # 移除None值
        return {k: v for k, v in config.items() if v is not None}

@dataclass
class SourceRecord:
    """源记录"""
    source_partition: Dict[str, Any]
    source_offset: Dict[str, Any]
    topic: str
    partition: Optional[int] = None
    key_schema: Optional[Dict] = None
    key: Any = None
    value_schema: Optional[Dict] = None
    value: Any = None
    timestamp: Optional[int] = None
    headers: Optional[Dict[str, str]] = None
    
    def __post_init__(self):
        if self.timestamp is None:
            self.timestamp = int(time.time() * 1000)
        if self.headers is None:
            self.headers = {}

@dataclass
class SinkRecord:
    """接收记录"""
    topic: str
    partition: int
    offset: int
    key_schema: Optional[Dict] = None
    key: Any = None
    value_schema: Optional[Dict] = None
    value: Any = None
    timestamp: Optional[int] = None
    headers: Optional[Dict[str, str]] = None
    
    def __post_init__(self):
        if self.headers is None:
            self.headers = {}

class Connector(ABC):
    """连接器基类"""
    
    def __init__(self):
        self.config = {}
        self.context = None
    
    @abstractmethod
    def version(self) -> str:
        """返回连接器版本"""
        pass
    
    @abstractmethod
    def start(self, config: Dict[str, str]) -> None:
        """启动连接器"""
        pass
    
    @abstractmethod
    def task_class(self) -> type:
        """返回任务类"""
        pass
    
    @abstractmethod
    def task_configs(self, max_tasks: int) -> List[Dict[str, str]]:
        """生成任务配置"""
        pass
    
    @abstractmethod
    def stop(self) -> None:
        """停止连接器"""
        pass
    
    def config_def(self) -> Dict[str, Any]:
        """配置定义"""
        return {}
    
    def validate(self, config: Dict[str, str]) -> List[Dict[str, str]]:
        """验证配置"""
        return []

class Task(ABC):
    """任务基类"""
    
    def __init__(self):
        self.config = {}
        self.context = None
    
    @abstractmethod
    def version(self) -> str:
        """返回任务版本"""
        pass
    
    @abstractmethod
    def start(self, config: Dict[str, str]) -> None:
        """启动任务"""
        pass
    
    @abstractmethod
    def stop(self) -> None:
        """停止任务"""
        pass

class SourceTask(Task):
    """源任务"""
    
    @abstractmethod
    def poll(self) -> List[SourceRecord]:
        """轮询数据"""
        pass
    
    def commit(self) -> None:
        """提交偏移量"""
        pass
    
    def commit_record(self, record: SourceRecord, metadata: Dict[str, Any]) -> None:
        """提交单个记录"""
        pass

class SinkTask(Task):
    """接收任务"""
    
    @abstractmethod
    def put(self, records: List[SinkRecord]) -> None:
        """处理记录"""
        pass
    
    def flush(self, offsets: Dict[str, Dict[int, int]]) -> None:
        """刷新数据"""
        pass
    
    def preCommit(self, offsets: Dict[str, Dict[int, int]]) -> Dict[str, Dict[int, int]]:
        """预提交"""
        return offsets

# 示例:文件源连接器
class FileSourceConnector(Connector):
    """文件源连接器"""
    
    def version(self) -> str:
        return "1.0.0"
    
    def start(self, config: Dict[str, str]) -> None:
        self.config = config
        self.filename = config.get("file")
        self.topic = config.get("topic")
        
        if not self.filename:
            raise ValueError("file配置项是必需的")
        if not self.topic:
            raise ValueError("topic配置项是必需的")
    
    def task_class(self) -> type:
        return FileSourceTask
    
    def task_configs(self, max_tasks: int) -> List[Dict[str, str]]:
        # 文件源连接器通常只需要一个任务
        return [{
            "file": self.config["file"],
            "topic": self.config["topic"]
        }]
    
    def stop(self) -> None:
        pass
    
    def config_def(self) -> Dict[str, Any]:
        return {
            "file": {
                "type": "string",
                "required": True,
                "documentation": "要读取的文件路径"
            },
            "topic": {
                "type": "string",
                "required": True,
                "documentation": "目标Kafka主题"
            }
        }

class FileSourceTask(SourceTask):
    """文件源任务"""
    
    def __init__(self):
        super().__init__()
        self.filename = None
        self.topic = None
        self.file_handle = None
        self.position = 0
    
    def version(self) -> str:
        return "1.0.0"
    
    def start(self, config: Dict[str, str]) -> None:
        self.config = config
        self.filename = config["file"]
        self.topic = config["topic"]
        
        try:
            self.file_handle = open(self.filename, 'r', encoding='utf-8')
            # 如果有保存的偏移量,跳转到该位置
            if hasattr(self.context, 'offset_storage'):
                saved_offset = self.context.offset_storage.get({"filename": self.filename})
                if saved_offset and "position" in saved_offset:
                    self.position = saved_offset["position"]
                    self.file_handle.seek(self.position)
        except IOError as e:
            raise RuntimeError(f"无法打开文件 {self.filename}: {e}")
    
    def poll(self) -> List[SourceRecord]:
        if not self.file_handle:
            return []
        
        records = []
        
        try:
            # 读取一行
            line = self.file_handle.readline()
            if line:
                # 更新位置
                self.position = self.file_handle.tell()
                
                # 创建源记录
                record = SourceRecord(
                    source_partition={"filename": self.filename},
                    source_offset={"position": self.position},
                    topic=self.topic,
                    key=None,
                    value=line.strip(),
                    timestamp=int(time.time() * 1000)
                )
                records.append(record)
            else:
                # 文件结束,等待新内容
                time.sleep(1)
        
        except IOError as e:
            print(f"读取文件错误: {e}")
        
        return records
    
    def stop(self) -> None:
        if self.file_handle:
            self.file_handle.close()
            self.file_handle = None

# 示例:文件接收连接器
class FileSinkConnector(Connector):
    """文件接收连接器"""
    
    def version(self) -> str:
        return "1.0.0"
    
    def start(self, config: Dict[str, str]) -> None:
        self.config = config
        self.filename = config.get("file")
        self.topics = config.get("topics", "").split(",")
        
        if not self.filename:
            raise ValueError("file配置项是必需的")
        if not self.topics or not self.topics[0]:
            raise ValueError("topics配置项是必需的")
    
    def task_class(self) -> type:
        return FileSinkTask
    
    def task_configs(self, max_tasks: int) -> List[Dict[str, str]]:
        return [{
            "file": self.config["file"],
            "topics": self.config["topics"]
        }]
    
    def stop(self) -> None:
        pass
    
    def config_def(self) -> Dict[str, Any]:
        return {
            "file": {
                "type": "string",
                "required": True,
                "documentation": "要写入的文件路径"
            },
            "topics": {
                "type": "string",
                "required": True,
                "documentation": "要消费的Kafka主题列表(逗号分隔)"
            }
        }

class FileSinkTask(SinkTask):
    """文件接收任务"""
    
    def __init__(self):
        super().__init__()
        self.filename = None
        self.file_handle = None
    
    def version(self) -> str:
        return "1.0.0"
    
    def start(self, config: Dict[str, str]) -> None:
        self.config = config
        self.filename = config["file"]
        
        try:
            self.file_handle = open(self.filename, 'a', encoding='utf-8')
        except IOError as e:
            raise RuntimeError(f"无法打开文件 {self.filename}: {e}")
    
    def put(self, records: List[SinkRecord]) -> None:
        if not self.file_handle:
            return
        
        try:
            for record in records:
                # 写入记录到文件
                line = f"{record.timestamp},{record.topic},{record.partition},{record.offset},{record.key},{record.value}\n"
                self.file_handle.write(line)
            
            # 刷新到磁盘
            self.file_handle.flush()
        
        except IOError as e:
            print(f"写入文件错误: {e}")
            raise
    
    def flush(self, offsets: Dict[str, Dict[int, int]]) -> None:
        if self.file_handle:
            self.file_handle.flush()
    
    def stop(self) -> None:
        if self.file_handle:
            self.file_handle.close()
            self.file_handle = None

class ConnectWorker:
    """Connect工作节点"""
    
    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.connectors = {}
        self.tasks = {}
        self.running = False
        self.worker_thread = None
    
    def start(self):
        """启动工作节点"""
        self.running = True
        self.worker_thread = threading.Thread(target=self._run_worker)
        self.worker_thread.start()
        print("Kafka Connect工作节点已启动")
    
    def stop(self):
        """停止工作节点"""
        self.running = False
        
        # 停止所有连接器和任务
        for connector in self.connectors.values():
            connector.stop()
        
        for task in self.tasks.values():
            task.stop()
        
        if self.worker_thread:
            self.worker_thread.join()
        
        print("Kafka Connect工作节点已停止")
    
    def _run_worker(self):
        """运行工作节点"""
        while self.running:
            try:
                # 处理源任务
                for task_id, task in self.tasks.items():
                    if isinstance(task, SourceTask):
                        records = task.poll()
                        if records:
                            # 这里应该发送到Kafka
                            print(f"任务 {task_id} 产生了 {len(records)} 条记录")
                
                time.sleep(1)
            
            except Exception as e:
                print(f"工作节点错误: {e}")
    
    def create_connector(self, config: ConnectorConfig) -> str:
        """创建连接器"""
        connector_name = config.name
        
        # 根据连接器类创建实例
        if config.connector_class == "FileSourceConnector":
            connector = FileSourceConnector()
        elif config.connector_class == "FileSinkConnector":
            connector = FileSinkConnector()
        else:
            raise ValueError(f"未知的连接器类: {config.connector_class}")
        
        # 启动连接器
        connector.start(config.to_dict())
        self.connectors[connector_name] = connector
        
        # 创建任务
        task_configs = connector.task_configs(config.tasks_max)
        task_class = connector.task_class()
        
        for i, task_config in enumerate(task_configs):
            task_id = f"{connector_name}-{i}"
            task = task_class()
            task.start(task_config)
            self.tasks[task_id] = task
        
        print(f"连接器 {connector_name} 已创建,包含 {len(task_configs)} 个任务")
        return connector_name
    
    def delete_connector(self, connector_name: str) -> bool:
        """删除连接器"""
        if connector_name not in self.connectors:
            return False
        
        # 停止连接器
        connector = self.connectors[connector_name]
        connector.stop()
        del self.connectors[connector_name]
        
        # 停止相关任务
        tasks_to_remove = []
        for task_id, task in self.tasks.items():
            if task_id.startswith(f"{connector_name}-"):
                task.stop()
                tasks_to_remove.append(task_id)
        
        for task_id in tasks_to_remove:
            del self.tasks[task_id]
        
        print(f"连接器 {connector_name} 已删除")
        return True
    
    def get_connector_status(self, connector_name: str) -> Dict[str, Any]:
        """获取连接器状态"""
        if connector_name not in self.connectors:
            return {"error": "连接器不存在"}
        
        # 获取任务状态
        task_states = []
        for task_id, task in self.tasks.items():
            if task_id.startswith(f"{connector_name}-"):
                task_states.append({
                    "id": task_id,
                    "state": "RUNNING",  # 简化状态
                    "worker_id": "worker-1"
                })
        
        return {
            "name": connector_name,
            "connector": {
                "state": "RUNNING",
                "worker_id": "worker-1"
            },
            "tasks": task_states
        }
    
    def list_connectors(self) -> List[str]:
        """列出所有连接器"""
        return list(self.connectors.keys())

# 使用示例
if __name__ == "__main__":
    # 创建Connect工作节点
    worker_config = {
        "bootstrap.servers": "localhost:9092",
        "group.id": "connect-cluster",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter"
    }
    
    worker = ConnectWorker(worker_config)
    worker.start()
    
    try:
        # 创建文件源连接器
        source_config = ConnectorConfig(
            name="file-source",
            connector_class="FileSourceConnector",
            tasks_max=1
        )
        source_config.file = "/tmp/test-input.txt"
        source_config.topic = "file-topic"
        
        worker.create_connector(source_config)
        
        # 创建文件接收连接器
        sink_config = ConnectorConfig(
            name="file-sink",
            connector_class="FileSinkConnector",
            tasks_max=1
        )
        sink_config.file = "/tmp/test-output.txt"
        sink_config.topics = "file-topic"
        
        worker.create_connector(sink_config)
        
        # 查看连接器状态
        print("连接器列表:", worker.list_connectors())
        print("源连接器状态:", worker.get_connector_status("file-source"))
        print("接收连接器状态:", worker.get_connector_status("file-sink"))
        
        # 运行一段时间
        time.sleep(30)
    
    finally:
        worker.stop()

4.3 Schema Registry

4.3.1 Schema管理

import json
import hashlib
from typing import Dict, List, Any, Optional, Union
from dataclasses import dataclass
from enum import Enum
import threading

class SchemaType(Enum):
    """Schema类型"""
    AVRO = "AVRO"
    JSON = "JSON"
    PROTOBUF = "PROTOBUF"

class CompatibilityLevel(Enum):
    """兼容性级别"""
    BACKWARD = "BACKWARD"
    BACKWARD_TRANSITIVE = "BACKWARD_TRANSITIVE"
    FORWARD = "FORWARD"
    FORWARD_TRANSITIVE = "FORWARD_TRANSITIVE"
    FULL = "FULL"
    FULL_TRANSITIVE = "FULL_TRANSITIVE"
    NONE = "NONE"

@dataclass
class Schema:
    """Schema定义"""
    id: int
    version: int
    schema_type: SchemaType
    schema_str: str
    subject: str
    references: List[Dict[str, Any]] = None
    
    def __post_init__(self):
        if self.references is None:
            self.references = []
    
    def get_hash(self) -> str:
        """获取Schema哈希值"""
        content = f"{self.schema_type.value}:{self.schema_str}"
        return hashlib.sha256(content.encode()).hexdigest()
    
    def to_dict(self) -> Dict[str, Any]:
        """转换为字典"""
        return {
            "id": self.id,
            "version": self.version,
            "schemaType": self.schema_type.value,
            "schema": self.schema_str,
            "subject": self.subject,
            "references": self.references
        }

class SchemaCompatibilityChecker:
    """Schema兼容性检查器"""
    
    @staticmethod
    def check_compatibility(reader_schema: Schema, writer_schema: Schema, 
                          compatibility_level: CompatibilityLevel) -> bool:
        """检查Schema兼容性"""
        if reader_schema.schema_type != writer_schema.schema_type:
            return False
        
        if compatibility_level == CompatibilityLevel.NONE:
            return True
        
        # 简化的兼容性检查逻辑
        if compatibility_level in [CompatibilityLevel.BACKWARD, CompatibilityLevel.BACKWARD_TRANSITIVE]:
            return SchemaCompatibilityChecker._check_backward_compatibility(reader_schema, writer_schema)
        
        elif compatibility_level in [CompatibilityLevel.FORWARD, CompatibilityLevel.FORWARD_TRANSITIVE]:
            return SchemaCompatibilityChecker._check_forward_compatibility(reader_schema, writer_schema)
        
        elif compatibility_level in [CompatibilityLevel.FULL, CompatibilityLevel.FULL_TRANSITIVE]:
            return (SchemaCompatibilityChecker._check_backward_compatibility(reader_schema, writer_schema) and
                   SchemaCompatibilityChecker._check_forward_compatibility(reader_schema, writer_schema))
        
        return False
    
    @staticmethod
    def _check_backward_compatibility(reader_schema: Schema, writer_schema: Schema) -> bool:
        """检查向后兼容性"""
        # 简化实现:检查字段是否可以安全读取
        if reader_schema.schema_type == SchemaType.JSON:
            try:
                reader_json = json.loads(reader_schema.schema_str)
                writer_json = json.loads(writer_schema.schema_str)
                
                # 检查必需字段是否存在
                reader_required = set(reader_json.get("required", []))
                writer_properties = set(writer_json.get("properties", {}).keys())
                
                return reader_required.issubset(writer_properties)
            except json.JSONDecodeError:
                return False
        
        return True  # 简化处理
    
    @staticmethod
    def _check_forward_compatibility(reader_schema: Schema, writer_schema: Schema) -> bool:
        """检查向前兼容性"""
        # 简化实现:检查新字段是否有默认值
        if reader_schema.schema_type == SchemaType.JSON:
            try:
                reader_json = json.loads(reader_schema.schema_str)
                writer_json = json.loads(writer_schema.schema_str)
                
                # 检查新增字段是否有默认值
                reader_properties = set(reader_json.get("properties", {}).keys())
                writer_required = set(writer_json.get("required", []))
                
                new_required = writer_required - reader_properties
                return len(new_required) == 0
            except json.JSONDecodeError:
                return False
        
        return True  # 简化处理

class SchemaRegistry:
    """Schema注册中心"""
    
    def __init__(self):
        self.schemas = {}  # {id: Schema}
        self.subjects = {}  # {subject: {version: schema_id}}
        self.next_id = 1
        self.compatibility_levels = {}  # {subject: CompatibilityLevel}
        self.global_compatibility = CompatibilityLevel.BACKWARD
        self.lock = threading.RLock()
    
    def register_schema(self, subject: str, schema_str: str, 
                       schema_type: SchemaType = SchemaType.AVRO,
                       references: List[Dict[str, Any]] = None) -> int:
        """注册Schema"""
        with self.lock:
            # 检查是否已存在相同的Schema
            existing_id = self._find_existing_schema(subject, schema_str, schema_type)
            if existing_id:
                return existing_id
            
            # 获取下一个版本号
            next_version = self._get_next_version(subject)
            
            # 创建新Schema
            schema = Schema(
                id=self.next_id,
                version=next_version,
                schema_type=schema_type,
                schema_str=schema_str,
                subject=subject,
                references=references or []
            )
            
            # 兼容性检查
            if not self._check_compatibility_for_subject(subject, schema):
                raise ValueError(f"Schema与主题 {subject} 的兼容性检查失败")
            
            # 存储Schema
            self.schemas[self.next_id] = schema
            
            if subject not in self.subjects:
                self.subjects[subject] = {}
            self.subjects[subject][next_version] = self.next_id
            
            schema_id = self.next_id
            self.next_id += 1
            
            return schema_id
    
    def get_schema_by_id(self, schema_id: int) -> Optional[Schema]:
        """根据ID获取Schema"""
        with self.lock:
            return self.schemas.get(schema_id)
    
    def get_latest_schema(self, subject: str) -> Optional[Schema]:
        """获取主题的最新Schema"""
        with self.lock:
            if subject not in self.subjects:
                return None
            
            latest_version = max(self.subjects[subject].keys())
            schema_id = self.subjects[subject][latest_version]
            return self.schemas.get(schema_id)
    
    def get_schema_by_version(self, subject: str, version: int) -> Optional[Schema]:
        """根据版本获取Schema"""
        with self.lock:
            if subject not in self.subjects or version not in self.subjects[subject]:
                return None
            
            schema_id = self.subjects[subject][version]
            return self.schemas.get(schema_id)
    
    def get_all_versions(self, subject: str) -> List[int]:
        """获取主题的所有版本"""
        with self.lock:
            if subject not in self.subjects:
                return []
            return sorted(self.subjects[subject].keys())
    
    def delete_subject(self, subject: str, permanent: bool = False) -> List[int]:
        """删除主题"""
        with self.lock:
            if subject not in self.subjects:
                return []
            
            deleted_versions = list(self.subjects[subject].keys())
            
            if permanent:
                # 永久删除
                for version, schema_id in self.subjects[subject].items():
                    if schema_id in self.schemas:
                        del self.schemas[schema_id]
                del self.subjects[subject]
            else:
                # 软删除(标记为删除)
                for version, schema_id in self.subjects[subject].items():
                    if schema_id in self.schemas:
                        self.schemas[schema_id].deleted = True
            
            return deleted_versions
    
    def delete_schema_version(self, subject: str, version: int, 
                            permanent: bool = False) -> Optional[int]:
        """删除特定版本的Schema"""
        with self.lock:
            if subject not in self.subjects or version not in self.subjects[subject]:
                return None
            
            schema_id = self.subjects[subject][version]
            
            if permanent:
                # 永久删除
                if schema_id in self.schemas:
                    del self.schemas[schema_id]
                del self.subjects[subject][version]
            else:
                # 软删除
                if schema_id in self.schemas:
                    self.schemas[schema_id].deleted = True
            
            return schema_id
    
    def set_compatibility_level(self, subject: str, 
                              compatibility: CompatibilityLevel) -> None:
        """设置主题兼容性级别"""
        with self.lock:
            self.compatibility_levels[subject] = compatibility
    
    def get_compatibility_level(self, subject: str) -> CompatibilityLevel:
        """获取主题兼容性级别"""
        with self.lock:
            return self.compatibility_levels.get(subject, self.global_compatibility)
    
    def test_compatibility(self, subject: str, schema_str: str, 
                         schema_type: SchemaType = SchemaType.AVRO,
                         version: Optional[int] = None) -> bool:
        """测试Schema兼容性"""
        with self.lock:
            test_schema = Schema(
                id=-1,  # 临时ID
                version=-1,  # 临时版本
                schema_type=schema_type,
                schema_str=schema_str,
                subject=subject
            )
            
            if version is None:
                # 与最新版本比较
                latest_schema = self.get_latest_schema(subject)
                if latest_schema is None:
                    return True  # 没有现有Schema,兼容
                return self._check_schema_compatibility(latest_schema, test_schema, subject)
            else:
                # 与指定版本比较
                existing_schema = self.get_schema_by_version(subject, version)
                if existing_schema is None:
                    return False  # 指定版本不存在
                return self._check_schema_compatibility(existing_schema, test_schema, subject)
    
    def list_subjects(self) -> List[str]:
        """列出所有主题"""
        with self.lock:
            return list(self.subjects.keys())
    
    def _find_existing_schema(self, subject: str, schema_str: str, 
                            schema_type: SchemaType) -> Optional[int]:
        """查找已存在的相同Schema"""
        if subject not in self.subjects:
            return None
        
        for version, schema_id in self.subjects[subject].items():
            schema = self.schemas.get(schema_id)
            if (schema and schema.schema_str == schema_str and 
                schema.schema_type == schema_type):
                return schema_id
        
        return None
    
    def _get_next_version(self, subject: str) -> int:
        """获取下一个版本号"""
        if subject not in self.subjects or not self.subjects[subject]:
            return 1
        return max(self.subjects[subject].keys()) + 1
    
    def _check_compatibility_for_subject(self, subject: str, new_schema: Schema) -> bool:
        """检查主题的兼容性"""
        latest_schema = self.get_latest_schema(subject)
        if latest_schema is None:
            return True  # 第一个Schema,总是兼容
        
        return self._check_schema_compatibility(latest_schema, new_schema, subject)
    
    def _check_schema_compatibility(self, existing_schema: Schema, 
                                  new_schema: Schema, subject: str) -> bool:
        """检查两个Schema的兼容性"""
        compatibility_level = self.get_compatibility_level(subject)
        return SchemaCompatibilityChecker.check_compatibility(
            existing_schema, new_schema, compatibility_level
        )

class SchemaRegistryClient:
    """Schema Registry客户端"""
    
    def __init__(self, base_url: str = "http://localhost:8081"):
        self.base_url = base_url.rstrip('/')
        self.cache = {}  # {schema_id: Schema}
        self.subject_cache = {}  # {subject: {version: schema_id}}
        self.lock = threading.RLock()
    
    def register_schema(self, subject: str, schema: Dict[str, Any]) -> int:
        """注册Schema"""
        # 这里应该发送HTTP请求到Schema Registry
        # 为了演示,我们使用本地注册中心
        registry = SchemaRegistry()
        
        schema_type = SchemaType(schema.get("schemaType", "AVRO"))
        schema_str = schema["schema"]
        references = schema.get("references", [])
        
        return registry.register_schema(subject, schema_str, schema_type, references)
    
    def get_schema_by_id(self, schema_id: int) -> Optional[Dict[str, Any]]:
        """根据ID获取Schema"""
        with self.lock:
            if schema_id in self.cache:
                return self.cache[schema_id].to_dict()
        
        # 这里应该发送HTTP请求
        # 为了演示,返回None
        return None
    
    def get_latest_schema(self, subject: str) -> Optional[Dict[str, Any]]:
        """获取最新Schema"""
        # 这里应该发送HTTP请求
        return None
    
    def get_schema_by_version(self, subject: str, version: int) -> Optional[Dict[str, Any]]:
        """根据版本获取Schema"""
        # 这里应该发送HTTP请求
        return None
    
    def test_compatibility(self, subject: str, schema: Dict[str, Any], 
                         version: Optional[int] = None) -> bool:
        """测试兼容性"""
        # 这里应该发送HTTP请求
        return True
    
    def set_compatibility_level(self, subject: str, level: str) -> Dict[str, str]:
        """设置兼容性级别"""
        # 这里应该发送HTTP请求
        return {"compatibility": level}
    
    def get_compatibility_level(self, subject: str) -> Dict[str, str]:
        """获取兼容性级别"""
        # 这里应该发送HTTP请求
        return {"compatibilityLevel": "BACKWARD"}

# 使用示例
if __name__ == "__main__":
    # 创建Schema Registry
    registry = SchemaRegistry()
    
    # 定义JSON Schema
    user_schema_v1 = {
        "type": "object",
        "properties": {
            "id": {"type": "integer"},
            "name": {"type": "string"},
            "email": {"type": "string"}
        },
        "required": ["id", "name"]
    }
    
    user_schema_v2 = {
        "type": "object",
        "properties": {
            "id": {"type": "integer"},
            "name": {"type": "string"},
            "email": {"type": "string"},
            "age": {"type": "integer"}
        },
        "required": ["id", "name"]
    }
    
    # 注册Schema
    schema_id_v1 = registry.register_schema(
        "user", 
        json.dumps(user_schema_v1), 
        SchemaType.JSON
    )
    print(f"注册Schema v1,ID: {schema_id_v1}")
    
    # 测试兼容性
    is_compatible = registry.test_compatibility(
        "user", 
        json.dumps(user_schema_v2), 
        SchemaType.JSON
    )
    print(f"Schema v2兼容性: {is_compatible}")
    
    # 注册新版本
    if is_compatible:
        schema_id_v2 = registry.register_schema(
            "user", 
            json.dumps(user_schema_v2), 
            SchemaType.JSON
        )
        print(f"注册Schema v2,ID: {schema_id_v2}")
    
    # 获取Schema信息
    latest_schema = registry.get_latest_schema("user")
    if latest_schema:
        print(f"最新Schema版本: {latest_schema.version}")
        print(f"Schema内容: {latest_schema.schema_str}")
    
    # 列出所有版本
    versions = registry.get_all_versions("user")
    print(f"所有版本: {versions}")

4.4 事务支持

4.4.1 事务概述

Kafka事务提供了跨多个主题分区的原子性写入能力,确保一组消息要么全部成功,要么全部失败。

import time
import threading
from typing import Dict, List, Any, Optional, Set
from dataclasses import dataclass
from enum import Enum
import uuid

class TransactionState(Enum):
    """事务状态"""
    EMPTY = "Empty"
    ONGOING = "Ongoing"
    PREPARE_COMMIT = "PrepareCommit"
    PREPARE_ABORT = "PrepareAbort"
    COMPLETE_COMMIT = "CompleteCommit"
    COMPLETE_ABORT = "CompleteAbort"
    DEAD = "Dead"

@dataclass
class TransactionMetadata:
    """事务元数据"""
    transactional_id: str
    producer_id: int
    producer_epoch: int
    state: TransactionState
    timeout_ms: int
    last_update_timestamp: int
    partitions: Set[str] = None  # topic-partition
    
    def __post_init__(self):
        if self.partitions is None:
            self.partitions = set()

class TransactionCoordinator:
    """事务协调器"""
    
    def __init__(self):
        self.transactions = {}  # {transactional_id: TransactionMetadata}
        self.producer_ids = {}  # {transactional_id: producer_id}
        self.next_producer_id = 1000
        self.lock = threading.RLock()
        self.cleanup_thread = None
        self.running = False
    
    def start(self):
        """启动协调器"""
        self.running = True
        self.cleanup_thread = threading.Thread(target=self._cleanup_expired_transactions)
        self.cleanup_thread.daemon = True
        self.cleanup_thread.start()
    
    def stop(self):
        """停止协调器"""
        self.running = False
        if self.cleanup_thread:
            self.cleanup_thread.join()
    
    def init_producer_id(self, transactional_id: Optional[str] = None, 
                        timeout_ms: int = 60000) -> Dict[str, int]:
        """初始化生产者ID"""
        with self.lock:
            if transactional_id:
                # 事务性生产者
                if transactional_id in self.producer_ids:
                    producer_id = self.producer_ids[transactional_id]
                    # 增加epoch以防止僵尸生产者
                    if transactional_id in self.transactions:
                        self.transactions[transactional_id].producer_epoch += 1
                        epoch = self.transactions[transactional_id].producer_epoch
                    else:
                        epoch = 0
                        self.transactions[transactional_id] = TransactionMetadata(
                            transactional_id=transactional_id,
                            producer_id=producer_id,
                            producer_epoch=epoch,
                            state=TransactionState.EMPTY,
                            timeout_ms=timeout_ms,
                            last_update_timestamp=int(time.time() * 1000)
                        )
                else:
                    producer_id = self.next_producer_id
                    self.next_producer_id += 1
                    self.producer_ids[transactional_id] = producer_id
                    
                    self.transactions[transactional_id] = TransactionMetadata(
                        transactional_id=transactional_id,
                        producer_id=producer_id,
                        producer_epoch=0,
                        state=TransactionState.EMPTY,
                        timeout_ms=timeout_ms,
                        last_update_timestamp=int(time.time() * 1000)
                    )
                
                return {
                    "producer_id": producer_id,
                    "producer_epoch": self.transactions[transactional_id].producer_epoch
                }
            else:
                # 非事务性生产者
                producer_id = self.next_producer_id
                self.next_producer_id += 1
                return {
                    "producer_id": producer_id,
                    "producer_epoch": 0
                }
    
    def begin_transaction(self, transactional_id: str) -> bool:
        """开始事务"""
        with self.lock:
            if transactional_id not in self.transactions:
                return False
            
            transaction = self.transactions[transactional_id]
            
            if transaction.state != TransactionState.EMPTY:
                return False
            
            transaction.state = TransactionState.ONGOING
            transaction.last_update_timestamp = int(time.time() * 1000)
            transaction.partitions.clear()
            
            return True
    
    def add_partitions_to_transaction(self, transactional_id: str, 
                                    partitions: List[str]) -> bool:
        """添加分区到事务"""
        with self.lock:
            if transactional_id not in self.transactions:
                return False
            
            transaction = self.transactions[transactional_id]
            
            if transaction.state != TransactionState.ONGOING:
                return False
            
            transaction.partitions.update(partitions)
            transaction.last_update_timestamp = int(time.time() * 1000)
            
            return True
    
    def prepare_commit_transaction(self, transactional_id: str) -> bool:
        """准备提交事务"""
        with self.lock:
            if transactional_id not in self.transactions:
                return False
            
            transaction = self.transactions[transactional_id]
            
            if transaction.state != TransactionState.ONGOING:
                return False
            
            transaction.state = TransactionState.PREPARE_COMMIT
            transaction.last_update_timestamp = int(time.time() * 1000)
            
            return True
    
    def commit_transaction(self, transactional_id: str) -> bool:
        """提交事务"""
        with self.lock:
            if transactional_id not in self.transactions:
                return False
            
            transaction = self.transactions[transactional_id]
            
            if transaction.state != TransactionState.PREPARE_COMMIT:
                return False
            
            # 向所有分区发送提交标记
            self._send_transaction_markers(transaction, commit=True)
            
            transaction.state = TransactionState.COMPLETE_COMMIT
            transaction.last_update_timestamp = int(time.time() * 1000)
            
            # 重置为空状态
            transaction.state = TransactionState.EMPTY
            transaction.partitions.clear()
            
            return True
    
    def prepare_abort_transaction(self, transactional_id: str) -> bool:
        """准备中止事务"""
        with self.lock:
            if transactional_id not in self.transactions:
                return False
            
            transaction = self.transactions[transactional_id]
            
            if transaction.state not in [TransactionState.ONGOING, TransactionState.PREPARE_COMMIT]:
                return False
            
            transaction.state = TransactionState.PREPARE_ABORT
            transaction.last_update_timestamp = int(time.time() * 1000)
            
            return True
    
    def abort_transaction(self, transactional_id: str) -> bool:
        """中止事务"""
        with self.lock:
            if transactional_id not in self.transactions:
                return False
            
            transaction = self.transactions[transactional_id]
            
            if transaction.state != TransactionState.PREPARE_ABORT:
                return False
            
            # 向所有分区发送中止标记
            self._send_transaction_markers(transaction, commit=False)
            
            transaction.state = TransactionState.COMPLETE_ABORT
            transaction.last_update_timestamp = int(time.time() * 1000)
            
            # 重置为空状态
            transaction.state = TransactionState.EMPTY
            transaction.partitions.clear()
            
            return True
    
    def get_transaction_state(self, transactional_id: str) -> Optional[TransactionState]:
        """获取事务状态"""
        with self.lock:
            if transactional_id in self.transactions:
                return self.transactions[transactional_id].state
            return None
    
    def _send_transaction_markers(self, transaction: TransactionMetadata, commit: bool):
        """发送事务标记"""
        marker_type = "COMMIT" if commit else "ABORT"
        
        for partition in transaction.partitions:
            # 这里应该向分区发送事务标记
            print(f"发送{marker_type}标记到分区 {partition},事务ID: {transaction.transactional_id}")
    
    def _cleanup_expired_transactions(self):
        """清理过期事务"""
        while self.running:
            try:
                current_time = int(time.time() * 1000)
                expired_transactions = []
                
                with self.lock:
                    for transactional_id, transaction in self.transactions.items():
                        if (current_time - transaction.last_update_timestamp > transaction.timeout_ms and
                            transaction.state in [TransactionState.ONGOING, TransactionState.PREPARE_COMMIT]):
                            expired_transactions.append(transactional_id)
                
                # 中止过期事务
                for transactional_id in expired_transactions:
                    print(f"中止过期事务: {transactional_id}")
                    self.prepare_abort_transaction(transactional_id)
                    self.abort_transaction(transactional_id)
                
                time.sleep(10)  # 每10秒检查一次
            
            except Exception as e:
                print(f"清理过期事务时发生错误: {e}")

class TransactionalProducer:
    """事务性生产者"""
    
    def __init__(self, transactional_id: str, coordinator: TransactionCoordinator):
        self.transactional_id = transactional_id
        self.coordinator = coordinator
        self.producer_id = None
        self.producer_epoch = None
        self.in_transaction = False
        self.transaction_partitions = set()
    
    def init_transactions(self, timeout_ms: int = 60000):
        """初始化事务"""
        result = self.coordinator.init_producer_id(self.transactional_id, timeout_ms)
        self.producer_id = result["producer_id"]
        self.producer_epoch = result["producer_epoch"]
        print(f"事务性生产者初始化完成,ID: {self.producer_id}, Epoch: {self.producer_epoch}")
    
    def begin_transaction(self):
        """开始事务"""
        if self.in_transaction:
            raise RuntimeError("事务已经开始")
        
        success = self.coordinator.begin_transaction(self.transactional_id)
        if not success:
            raise RuntimeError("开始事务失败")
        
        self.in_transaction = True
        self.transaction_partitions.clear()
        print(f"事务开始: {self.transactional_id}")
    
    def send(self, topic: str, partition: int, key: Any, value: Any) -> Dict[str, Any]:
        """发送消息"""
        if not self.in_transaction:
            raise RuntimeError("必须在事务中发送消息")
        
        partition_key = f"{topic}-{partition}"
        
        # 如果是新分区,添加到事务中
        if partition_key not in self.transaction_partitions:
            success = self.coordinator.add_partitions_to_transaction(
                self.transactional_id, [partition_key]
            )
            if not success:
                raise RuntimeError(f"添加分区到事务失败: {partition_key}")
            
            self.transaction_partitions.add(partition_key)
        
        # 模拟发送消息
        record_metadata = {
            "topic": topic,
            "partition": partition,
            "offset": int(time.time() * 1000),  # 模拟offset
            "timestamp": int(time.time() * 1000),
            "producer_id": self.producer_id,
            "producer_epoch": self.producer_epoch
        }
        
        print(f"发送事务消息: {topic}-{partition}, key={key}, value={value}")
        return record_metadata
    
    def commit_transaction(self):
        """提交事务"""
        if not self.in_transaction:
            raise RuntimeError("没有活跃的事务")
        
        # 准备提交
        success = self.coordinator.prepare_commit_transaction(self.transactional_id)
        if not success:
            raise RuntimeError("准备提交事务失败")
        
        # 提交事务
        success = self.coordinator.commit_transaction(self.transactional_id)
        if not success:
            raise RuntimeError("提交事务失败")
        
        self.in_transaction = False
        self.transaction_partitions.clear()
        print(f"事务提交成功: {self.transactional_id}")
    
    def abort_transaction(self):
        """中止事务"""
        if not self.in_transaction:
            raise RuntimeError("没有活跃的事务")
        
        # 准备中止
        success = self.coordinator.prepare_abort_transaction(self.transactional_id)
        if not success:
            raise RuntimeError("准备中止事务失败")
        
        # 中止事务
        success = self.coordinator.abort_transaction(self.transactional_id)
        if not success:
            raise RuntimeError("中止事务失败")
        
        self.in_transaction = False
        self.transaction_partitions.clear()
        print(f"事务中止成功: {self.transactional_id}")
    
    def close(self):
        """关闭生产者"""
        if self.in_transaction:
            self.abort_transaction()

# 使用示例
if __name__ == "__main__":
    # 创建事务协调器
    coordinator = TransactionCoordinator()
    coordinator.start()
    
    try:
        # 创建事务性生产者
        producer = TransactionalProducer("order-processor", coordinator)
        producer.init_transactions()
        
        # 执行事务
        producer.begin_transaction()
        
        try:
            # 发送多条消息
            producer.send("orders", 0, "order1", {"id": 1, "amount": 100})
            producer.send("inventory", 0, "item1", {"id": 1, "quantity": -1})
            producer.send("payments", 0, "payment1", {"order_id": 1, "amount": 100})
            
            # 提交事务
            producer.commit_transaction()
            
        except Exception as e:
            print(f"事务执行失败: {e}")
            producer.abort_transaction()
        
        # 另一个事务示例
        producer.begin_transaction()
        
        try:
            producer.send("orders", 0, "order2", {"id": 2, "amount": 200})
            # 模拟错误
            raise ValueError("模拟业务错误")
            
        except Exception as e:
            print(f"事务执行失败: {e}")
            producer.abort_transaction()
        
        producer.close()
    
    finally:
        coordinator.stop()

4.4.2 事务消费者

事务消费者可以配置为只读取已提交的事务消息,确保数据一致性。

class TransactionalConsumer:
    """事务性消费者"""
    
    def __init__(self, group_id: str, isolation_level: str = "read_committed"):
        self.group_id = group_id
        self.isolation_level = isolation_level  # read_committed 或 read_uncommitted
        self.consumed_offsets = {}  # {topic-partition: offset}
        self.committed_offsets = {}  # {topic-partition: offset}
        self.transaction_markers = {}  # {topic-partition: [markers]}
        self.lock = threading.RLock()
    
    def subscribe(self, topics: List[str]):
        """订阅主题"""
        self.topics = topics
        print(f"订阅主题: {topics}, 隔离级别: {self.isolation_level}")
    
    def poll(self, timeout_ms: int = 1000) -> List[Dict[str, Any]]:
        """拉取消息"""
        messages = []
        
        # 模拟拉取消息
        for topic in self.topics:
            for partition in range(3):  # 假设每个主题有3个分区
                partition_key = f"{topic}-{partition}"
                
                # 模拟消息
                mock_messages = [
                    {
                        "topic": topic,
                        "partition": partition,
                        "offset": 100,
                        "key": "key1",
                        "value": {"data": "message1"},
                        "timestamp": int(time.time() * 1000),
                        "producer_id": 1001,
                        "producer_epoch": 0,
                        "is_transaction": True,
                        "transaction_committed": True
                    },
                    {
                        "topic": topic,
                        "partition": partition,
                        "offset": 101,
                        "key": "key2",
                        "value": {"data": "message2"},
                        "timestamp": int(time.time() * 1000),
                        "producer_id": 1002,
                        "producer_epoch": 0,
                        "is_transaction": True,
                        "transaction_committed": False  # 未提交的事务
                    },
                    {
                        "topic": topic,
                        "partition": partition,
                        "offset": 102,
                        "key": "key3",
                        "value": {"data": "message3"},
                        "timestamp": int(time.time() * 1000),
                        "producer_id": 1003,
                        "producer_epoch": 0,
                        "is_transaction": False,
                        "transaction_committed": None  # 非事务消息
                    }
                ]
                
                # 根据隔离级别过滤消息
                for msg in mock_messages:
                    if self._should_include_message(msg):
                        messages.append(msg)
                        self.consumed_offsets[partition_key] = msg["offset"]
        
        return messages
    
    def _should_include_message(self, message: Dict[str, Any]) -> bool:
        """判断是否应该包含消息"""
        if self.isolation_level == "read_uncommitted":
            return True
        
        # read_committed 模式
        if not message["is_transaction"]:
            return True  # 非事务消息总是可见
        
        return message["transaction_committed"]  # 只返回已提交的事务消息
    
    def commit_sync(self, offsets: Optional[Dict[str, int]] = None):
        """同步提交偏移量"""
        with self.lock:
            if offsets:
                self.committed_offsets.update(offsets)
            else:
                self.committed_offsets.update(self.consumed_offsets)
            
            print(f"同步提交偏移量: {self.committed_offsets}")
    
    def commit_async(self, offsets: Optional[Dict[str, int]] = None, 
                    callback=None):
        """异步提交偏移量"""
        def commit_task():
            try:
                self.commit_sync(offsets)
                if callback:
                    callback(None, self.committed_offsets)
            except Exception as e:
                if callback:
                    callback(e, None)
        
        thread = threading.Thread(target=commit_task)
        thread.daemon = True
        thread.start()
    
    def seek_to_beginning(self, topic_partitions: List[str]):
        """跳转到分区开始"""
        for tp in topic_partitions:
            self.consumed_offsets[tp] = 0
        print(f"跳转到开始位置: {topic_partitions}")
    
    def seek_to_end(self, topic_partitions: List[str]):
        """跳转到分区末尾"""
        for tp in topic_partitions:
            self.consumed_offsets[tp] = 999999  # 模拟末尾偏移量
        print(f"跳转到末尾位置: {topic_partitions}")
    
    def close(self):
        """关闭消费者"""
        print(f"关闭事务性消费者: {self.group_id}")

class ExactlyOnceProcessor:
    """精确一次处理器"""
    
    def __init__(self, consumer_group: str, transactional_id: str):
        self.consumer_group = consumer_group
        self.transactional_id = transactional_id
        self.coordinator = TransactionCoordinator()
        self.consumer = None
        self.producer = None
        self.processed_offsets = {}  # {topic-partition: offset}
        self.running = False
    
    def start(self):
        """启动处理器"""
        self.coordinator.start()
        
        # 创建事务性消费者
        self.consumer = TransactionalConsumer(
            group_id=self.consumer_group,
            isolation_level="read_committed"
        )
        
        # 创建事务性生产者
        self.producer = TransactionalProducer(
            transactional_id=self.transactional_id,
            coordinator=self.coordinator
        )
        self.producer.init_transactions()
        
        self.running = True
        print(f"精确一次处理器启动: {self.transactional_id}")
    
    def process_messages(self, input_topics: List[str], output_topic: str):
        """处理消息"""
        self.consumer.subscribe(input_topics)
        
        while self.running:
            try:
                # 拉取消息
                messages = self.consumer.poll(timeout_ms=1000)
                
                if not messages:
                    continue
                
                # 开始事务
                self.producer.begin_transaction()
                
                try:
                    # 处理每条消息
                    for message in messages:
                        processed_data = self._process_message(message)
                        
                        # 发送处理结果
                        if processed_data:
                            self.producer.send(
                                topic=output_topic,
                                partition=message["partition"],
                                key=message["key"],
                                value=processed_data
                            )
                    
                    # 提交消费者偏移量到事务
                    self._send_offsets_to_transaction(messages)
                    
                    # 提交事务
                    self.producer.commit_transaction()
                    
                    print(f"成功处理 {len(messages)} 条消息")
                
                except Exception as e:
                    print(f"处理消息失败: {e}")
                    self.producer.abort_transaction()
            
            except Exception as e:
                print(f"处理循环错误: {e}")
                time.sleep(1)
    
    def _process_message(self, message: Dict[str, Any]) -> Optional[Dict[str, Any]]:
        """处理单条消息"""
        try:
            # 模拟消息处理逻辑
            original_data = message["value"]
            
            # 简单的数据转换
            processed_data = {
                "original": original_data,
                "processed_at": int(time.time() * 1000),
                "processor_id": self.transactional_id,
                "source_topic": message["topic"],
                "source_partition": message["partition"],
                "source_offset": message["offset"]
            }
            
            return processed_data
        
        except Exception as e:
            print(f"处理消息失败: {e}")
            return None
    
    def _send_offsets_to_transaction(self, messages: List[Dict[str, Any]]):
        """将偏移量发送到事务"""
        # 计算每个分区的最新偏移量
        partition_offsets = {}
        for message in messages:
            partition_key = f"{message['topic']}-{message['partition']}"
            partition_offsets[partition_key] = max(
                partition_offsets.get(partition_key, -1),
                message["offset"]
            )
        
        # 在实际实现中,这里会将偏移量作为事务的一部分提交
        # 确保消息处理和偏移量提交的原子性
        self.processed_offsets.update(partition_offsets)
        print(f"将偏移量添加到事务: {partition_offsets}")
    
    def stop(self):
        """停止处理器"""
        self.running = False
        
        if self.producer:
            self.producer.close()
        
        if self.consumer:
            self.consumer.close()
        
        if self.coordinator:
            self.coordinator.stop()
        
        print(f"精确一次处理器停止: {self.transactional_id}")

# 使用示例
if __name__ == "__main__":
    # 事务性消费者示例
    print("=== 事务性消费者示例 ===")
    consumer = TransactionalConsumer(
        group_id="transaction-consumer-group",
        isolation_level="read_committed"
    )
    
    consumer.subscribe(["input-topic"])
    
    # 拉取消息
    messages = consumer.poll(timeout_ms=1000)
    print(f"拉取到 {len(messages)} 条消息")
    
    for msg in messages:
        print(f"消息: {msg['topic']}-{msg['partition']}-{msg['offset']}, 值: {msg['value']}")
    
    # 提交偏移量
    consumer.commit_sync()
    consumer.close()
    
    print("\n=== 精确一次处理器示例 ===")
    # 精确一次处理器示例
    processor = ExactlyOnceProcessor(
        consumer_group="exactly-once-group",
        transactional_id="message-processor-1"
    )
    
    processor.start()
    
    try:
        # 模拟处理一批消息
        input_topics = ["raw-events"]
        output_topic = "processed-events"
        
        # 在实际应用中,这会是一个长期运行的循环
        processor.process_messages(input_topics, output_topic)
        
    except KeyboardInterrupt:
        print("收到停止信号")
    
    finally:
        processor.stop()

本章总结

核心知识点

  1. Kafka Streams

    • 流处理框架,支持有状态和无状态操作
    • 提供高级DSL和低级Processor API
    • 支持时间窗口、连接和聚合操作
    • 内置容错和状态管理
  2. Kafka Connect

    • 数据集成框架,连接Kafka与外部系统
    • 支持Source和Sink连接器
    • 提供分布式、可扩展的数据管道
    • 内置错误处理和重试机制
  3. Schema Registry

    • 集中式Schema管理服务
    • 支持Schema演进和兼容性检查
    • 提供多种兼容性级别
    • 支持Avro、JSON Schema和Protobuf
  4. 事务支持

    • 提供跨分区的原子性写入
    • 支持精确一次语义(Exactly-Once Semantics)
    • 事务协调器管理事务状态
    • 消费者可配置隔离级别

最佳实践

  1. 流处理设计

    • 合理设计拓扑结构,避免过度复杂
    • 选择合适的时间窗口大小
    • 正确处理乱序数据和延迟数据
    • 监控流处理应用的性能指标
  2. 连接器配置

    • 根据数据量选择合适的任务数量
    • 配置适当的错误处理策略
    • 使用死信队列处理无法处理的记录
    • 定期监控连接器状态
  3. Schema管理

    • 建立Schema演进策略
    • 选择合适的兼容性级别
    • 定期清理不再使用的Schema版本
    • 在生产环境中启用Schema验证
  4. 事务使用

    • 仅在需要强一致性时使用事务
    • 合理设置事务超时时间
    • 监控事务协调器性能
    • 正确处理事务失败和重试

练习题

基础练习

  1. 实现一个简单的Kafka Streams应用,统计每分钟的消息数量
  2. 创建一个文件Source连接器,将CSV文件数据导入Kafka
  3. 设计一个JSON Schema,并实现兼容性检查
  4. 编写一个事务性生产者,确保多条消息的原子性发送

进阶练习

  1. 实现一个复杂的流处理应用,包含多个操作和状态存储
  2. 开发一个自定义Sink连接器,将数据写入数据库
  3. 实现Schema Registry的高可用部署
  4. 设计一个精确一次处理的消息处理系统

项目练习

  1. 构建一个实时数据管道,包含数据采集、处理和存储
  2. 实现一个多租户的Schema管理系统
  3. 开发一个事务性的订单处理系统
  4. 设计一个支持Schema演进的微服务架构