4.1 流处理基础概念

流处理概述

class StreamProcessingOverview:
    """
    流处理基础概念演示
    """
    
    def __init__(self):
        print("流处理基础概念演示")
        print("=" * 20)
    
    def explain_stream_processing(self):
        """
        解释流处理基本概念
        """
        print("\n1. 流处理定义:")
        print("流处理是对连续数据流进行实时或近实时处理的计算模式")
        
        concepts = {
            "数据流": "连续不断产生的数据序列",
            "实时处理": "数据到达后立即处理,延迟通常在毫秒到秒级",
            "近实时处理": "数据到达后在可接受的时间窗口内处理,延迟通常在秒到分钟级",
            "窗口操作": "对时间窗口内的数据进行聚合计算",
            "状态管理": "维护跨批次的计算状态",
            "容错机制": "确保数据处理的可靠性和一致性"
        }
        
        print("\n核心概念:")
        for concept, description in concepts.items():
            print(f"  - {concept}: {description}")
    
    def compare_batch_vs_stream(self):
        """
        批处理与流处理对比
        """
        print("\n\n2. 批处理 vs 流处理:")
        
        comparison = {
            "数据特性": {
                "批处理": "有界数据集,数据量固定",
                "流处理": "无界数据流,数据持续产生"
            },
            "处理延迟": {
                "批处理": "高延迟,通常小时或天级别",
                "流处理": "低延迟,毫秒到秒级别"
            },
            "数据完整性": {
                "批处理": "处理完整数据集,结果准确",
                "流处理": "处理部分数据,可能需要近似结果"
            },
            "资源使用": {
                "批处理": "周期性使用大量资源",
                "流处理": "持续使用稳定资源"
            },
            "应用场景": {
                "批处理": "历史数据分析、报表生成、ETL",
                "流处理": "实时监控、欺诈检测、推荐系统"
            }
        }
        
        for aspect, details in comparison.items():
            print(f"\n{aspect}:")
            print(f"  批处理: {details['批处理']}")
            print(f"  流处理: {details['流处理']}")
    
    def explain_spark_streaming_architecture(self):
        """
        Spark Streaming架构说明
        """
        print("\n\n3. Spark Streaming架构:")
        
        architecture_components = {
            "数据源": "Kafka、Flume、TCP Socket、文件系统等",
            "接收器(Receiver)": "负责从数据源接收数据",
            "DStream": "离散化流,将连续数据流分割成小批次",
            "批处理引擎": "使用Spark Core处理每个微批次",
            "输出操作": "将处理结果输出到外部系统"
        }
        
        print("架构组件:")
        for component, description in architecture_components.items():
            print(f"  - {component}: {description}")
        
        print("\n处理流程:")
        process_flow = [
            "1. 数据源产生连续数据流",
            "2. 接收器接收数据并缓存",
            "3. 将数据流分割成微批次(DStream)",
            "4. 使用Spark Core处理每个批次",
            "5. 输出处理结果到目标系统",
            "6. 重复步骤2-5形成连续处理"
        ]
        
        for step in process_flow:
            print(f"  {step}")
    
    def demonstrate_streaming_concepts(self):
        """
        演示流处理概念
        """
        print("\n\n4. 流处理概念演示:")
        
        # 模拟数据流
        import time
        import random
        from datetime import datetime
        
        print("\n模拟实时数据流:")
        
        # 模拟传感器数据
        sensor_data = []
        for i in range(10):
            timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
            temperature = round(random.uniform(20, 35), 2)
            humidity = round(random.uniform(40, 80), 2)
            
            data_point = {
                "timestamp": timestamp,
                "sensor_id": f"sensor_{random.randint(1, 5)}",
                "temperature": temperature,
                "humidity": humidity
            }
            
            sensor_data.append(data_point)
            print(f"  时间: {timestamp}, 传感器: {data_point['sensor_id']}, "
                  f"温度: {temperature}°C, 湿度: {humidity}%")
            
            time.sleep(0.5)  # 模拟数据到达间隔
        
        print(f"\n共接收到 {len(sensor_data)} 条数据")
        
        # 模拟窗口聚合
        print("\n窗口聚合示例:")
        
        # 按传感器分组计算平均值
        sensor_groups = {}
        for data in sensor_data:
            sensor_id = data['sensor_id']
            if sensor_id not in sensor_groups:
                sensor_groups[sensor_id] = []
            sensor_groups[sensor_id].append(data)
        
        for sensor_id, readings in sensor_groups.items():
            avg_temp = sum(r['temperature'] for r in readings) / len(readings)
            avg_humidity = sum(r['humidity'] for r in readings) / len(readings)
            
            print(f"  {sensor_id}: 平均温度 {avg_temp:.2f}°C, 平均湿度 {avg_humidity:.2f}%")

# 流处理概述演示
stream_overview = StreamProcessingOverview()

# 基础概念解释
stream_overview.explain_stream_processing()

# 批处理与流处理对比
stream_overview.compare_batch_vs_stream()

# Spark Streaming架构
stream_overview.explain_spark_streaming_architecture()

# 概念演示
stream_overview.demonstrate_streaming_concepts()

流处理应用场景

class StreamProcessingUseCases:
    """
    流处理应用场景演示
    """
    
    def __init__(self):
        print("\n\n流处理应用场景")
        print("=" * 18)
    
    def real_time_monitoring(self):
        """
        实时监控场景
        """
        print("\n1. 实时监控场景:")
        
        monitoring_scenarios = {
            "系统监控": {
                "描述": "监控服务器性能、网络状态、应用健康度",
                "数据源": "系统日志、性能指标、网络流量",
                "处理逻辑": "异常检测、阈值告警、趋势分析",
                "输出": "实时仪表板、告警通知、自动化响应"
            },
            "业务监控": {
                "描述": "监控业务指标、用户行为、交易状态",
                "数据源": "用户点击流、交易记录、业务事件",
                "处理逻辑": "指标计算、异常识别、模式发现",
                "输出": "业务仪表板、KPI报告、预警信息"
            },
            "IoT监控": {
                "描述": "监控物联网设备状态和环境参数",
                "数据源": "传感器数据、设备状态、环境信息",
                "处理逻辑": "数据聚合、状态分析、预测维护",
                "输出": "设备仪表板、维护建议、自动控制"
            }
        }
        
        for scenario, details in monitoring_scenarios.items():
            print(f"\n{scenario}:")
            for key, value in details.items():
                print(f"  {key}: {value}")
    
    def fraud_detection(self):
        """
        欺诈检测场景
        """
        print("\n\n2. 欺诈检测场景:")
        
        fraud_detection_aspects = {
            "金融欺诈": {
                "特征": "异常交易金额、频率、地理位置",
                "算法": "规则引擎、机器学习模型、异常检测",
                "响应": "实时阻断、风险评分、人工审核"
            },
            "网络安全": {
                "特征": "异常网络流量、访问模式、攻击特征",
                "算法": "模式匹配、行为分析、威胁情报",
                "响应": "自动防护、告警通知、流量阻断"
            },
            "电商反作弊": {
                "特征": "刷单行为、虚假评价、恶意爬虫",
                "算法": "图算法、时序分析、集群检测",
                "响应": "账号限制、内容过滤、风险标记"
            }
        }
        
        for fraud_type, details in fraud_detection_aspects.items():
            print(f"\n{fraud_type}:")
            for aspect, description in details.items():
                print(f"  {aspect}: {description}")
    
    def recommendation_systems(self):
        """
        推荐系统场景
        """
        print("\n\n3. 实时推荐系统:")
        
        recommendation_types = {
            "内容推荐": {
                "数据": "用户浏览、点击、搜索行为",
                "算法": "协同过滤、内容过滤、深度学习",
                "实时性": "毫秒级响应,个性化内容"
            },
            "商品推荐": {
                "数据": "购买历史、浏览记录、用户画像",
                "算法": "矩阵分解、关联规则、序列模型",
                "实时性": "实时更新推荐列表"
            },
            "广告推荐": {
                "数据": "用户特征、上下文信息、广告特征",
                "算法": "CTR预估、竞价算法、多目标优化",
                "实时性": "实时竞价、动态调价"
            }
        }
        
        for rec_type, details in recommendation_types.items():
            print(f"\n{rec_type}:")
            for aspect, description in details.items():
                print(f"  {aspect}: {description}")
    
    def real_time_analytics(self):
        """
        实时分析场景
        """
        print("\n\n4. 实时分析场景:")
        
        analytics_scenarios = {
            "用户行为分析": {
                "目标": "理解用户实时行为模式",
                "指标": "页面停留时间、转化率、跳出率",
                "应用": "A/B测试、用户体验优化"
            },
            "运营分析": {
                "目标": "监控业务运营状况",
                "指标": "销售额、订单量、库存状态",
                "应用": "动态定价、库存管理、促销优化"
            },
            "市场分析": {
                "目标": "捕捉市场变化趋势",
                "指标": "搜索热度、社交媒体情感、新闻事件",
                "应用": "趋势预测、舆情监控、投资决策"
            }
        }
        
        for scenario, details in analytics_scenarios.items():
            print(f"\n{scenario}:")
            for aspect, description in details.items():
                print(f"  {aspect}: {description}")
    
    def demonstrate_use_case_selection(self):
        """
        演示用例选择决策
        """
        print("\n\n5. 流处理用例选择决策:")
        
        decision_factors = {
            "延迟要求": {
                "毫秒级": "高频交易、实时竞价",
                "秒级": "欺诈检测、实时推荐",
                "分钟级": "运营监控、趋势分析"
            },
            "数据量": {
                "小数据量": "简单规则处理、阈值检测",
                "中等数据量": "复杂聚合、模式识别",
                "大数据量": "机器学习、深度分析"
            },
            "准确性要求": {
                "高准确性": "金融交易、安全检测",
                "中等准确性": "推荐系统、内容过滤",
                "可接受近似": "趋势分析、统计监控"
            }
        }
        
        print("选择因素:")
        for factor, levels in decision_factors.items():
            print(f"\n{factor}:")
            for level, examples in levels.items():
                print(f"  {level}: {examples}")
        
        print("\n决策建议:")
        recommendations = [
            "1. 明确业务需求和技术约束",
            "2. 评估数据特征和处理复杂度",
            "3. 考虑系统可扩展性和维护成本",
            "4. 选择合适的技术栈和架构",
            "5. 设计容错和监控机制"
        ]
        
        for rec in recommendations:
            print(f"  {rec}")

# 应用场景演示
use_cases = StreamProcessingUseCases()

# 实时监控
use_cases.real_time_monitoring()

# 欺诈检测
use_cases.fraud_detection()

# 推荐系统
use_cases.recommendation_systems()

# 实时分析
use_cases.real_time_analytics()

# 用例选择
use_cases.demonstrate_use_case_selection()

4.2 DStream编程

DStream基础操作

class DStreamBasics:
    """
    DStream基础操作演示
    """
    
    def __init__(self):
        from pyspark import SparkContext, SparkConf
        from pyspark.streaming import StreamingContext
        
        # 配置Spark
        conf = SparkConf().setAppName("DStreamBasics").setMaster("local[2]")
        self.sc = SparkContext(conf=conf)
        self.sc.setLogLevel("WARN")
        
        # 创建StreamingContext,批处理间隔为2秒
        self.ssc = StreamingContext(self.sc, 2)
        
        print("\nDStream基础操作演示")
        print("=" * 20)
    
    def create_dstreams(self):
        """
        创建DStream的不同方式
        """
        print("\n1. DStream创建方式:")
        
        # 1. 从TCP Socket创建DStream
        print("\n从TCP Socket创建DStream:")
        print("# 需要先启动netcat服务: nc -lk 9999")
        
        socket_stream_code = '''
        # TCP Socket DStream
        lines = ssc.socketTextStream("localhost", 9999)
        
        # 处理每行数据
        words = lines.flatMap(lambda line: line.split(" "))
        word_counts = words.map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
        
        # 输出结果
        word_counts.pprint()
        '''
        
        print(socket_stream_code)
        
        # 2. 从文件系统创建DStream
        print("\n从文件系统创建DStream:")
        
        file_stream_code = '''
        # 文件系统DStream
        file_stream = ssc.textFileStream("/path/to/directory")
        
        # 处理新文件
        processed_stream = file_stream.map(lambda line: line.upper())
        processed_stream.pprint()
        '''
        
        print(file_stream_code)
        
        # 3. 从队列创建DStream(用于测试)
        print("\n从队列创建DStream(测试用):")
        
        import time
        from pyspark.streaming import StreamingContext
        
        # 创建测试数据队列
        test_data = [
            self.sc.parallelize(["hello world", "spark streaming"]),
            self.sc.parallelize(["real time", "data processing"]),
            self.sc.parallelize(["apache spark", "big data"])
        ]
        
        # 从队列创建DStream
        queue_stream = self.ssc.queueStream(test_data, oneAtATime=True)
        
        print("测试数据队列创建完成")
        return queue_stream
    
    def basic_transformations(self):
        """
        DStream基础转换操作
        """
        print("\n\n2. DStream基础转换操作:")
        
        # 创建测试DStream
        test_data = [
            self.sc.parallelize(["hello world spark", "streaming data processing"]),
            self.sc.parallelize(["real time analytics", "big data platform"]),
            self.sc.parallelize(["apache spark streaming", "distributed computing"])
        ]
        
        input_stream = self.ssc.queueStream(test_data, oneAtATime=True)
        
        # 1. map转换
        print("\nmap转换 - 转换为大写:")
        upper_stream = input_stream.map(lambda line: line.upper())
        upper_stream.pprint(prefix="Upper: ")
        
        # 2. flatMap转换
        print("\nflatMap转换 - 分割单词:")
        words_stream = input_stream.flatMap(lambda line: line.split(" "))
        words_stream.pprint(prefix="Words: ")
        
        # 3. filter转换
        print("\nfilter转换 - 过滤包含'spark'的行:")
        spark_lines = input_stream.filter(lambda line: "spark" in line.lower())
        spark_lines.pprint(prefix="Spark lines: ")
        
        # 4. union转换
        print("\nunion转换 - 合并流:")
        
        # 创建另一个测试流
        test_data2 = [
            self.sc.parallelize(["machine learning", "artificial intelligence"]),
            self.sc.parallelize(["deep learning", "neural networks"]),
            self.sc.parallelize(["data science", "predictive analytics"])
        ]
        
        input_stream2 = self.ssc.queueStream(test_data2, oneAtATime=True)
        union_stream = input_stream.union(input_stream2)
        union_stream.pprint(prefix="Union: ")
        
        return words_stream
    
    def window_operations(self):
        """
        DStream窗口操作
        """
        print("\n\n3. DStream窗口操作:")
        
        # 创建数值测试数据
        numeric_data = [
            self.sc.parallelize([1, 2, 3, 4, 5]),
            self.sc.parallelize([6, 7, 8, 9, 10]),
            self.sc.parallelize([11, 12, 13, 14, 15]),
            self.sc.parallelize([16, 17, 18, 19, 20])
        ]
        
        numeric_stream = self.ssc.queueStream(numeric_data, oneAtATime=True)
        
        # 1. 窗口聚合
        print("\n窗口聚合 - 6秒窗口,2秒滑动:")
        
        # 窗口长度6秒,滑动间隔2秒
        windowed_stream = numeric_stream.window(6, 2)
        windowed_sums = windowed_stream.reduce(lambda a, b: a + b)
        windowed_sums.pprint(prefix="Window Sum: ")
        
        # 2. reduceByWindow
        print("\nreduceByWindow - 直接窗口聚合:")
        window_sum = numeric_stream.reduceByWindow(
            lambda a, b: a + b,  # 聚合函数
            lambda a, b: a - b,  # 逆聚合函数(可选,用于优化)
            6, 2  # 窗口长度,滑动间隔
        )
        window_sum.pprint(prefix="ReduceByWindow: ")
        
        # 3. countByWindow
        print("\ncountByWindow - 窗口计数:")
        window_count = numeric_stream.countByWindow(6, 2)
        window_count.pprint(prefix="Window Count: ")
        
        return windowed_stream
    
    def stateful_operations(self):
        """
        DStream状态操作
        """
        print("\n\n4. DStream状态操作:")
        
        # 设置检查点目录(状态操作需要)
        self.ssc.checkpoint("/tmp/spark_streaming_checkpoint")
        
        # 创建键值对测试数据
        kv_data = [
            self.sc.parallelize([("apple", 1), ("banana", 2), ("apple", 3)]),
            self.sc.parallelize([("banana", 1), ("cherry", 4), ("apple", 2)]),
            self.sc.parallelize([("cherry", 2), ("apple", 1), ("banana", 3)])
        ]
        
        kv_stream = self.ssc.queueStream(kv_data, oneAtATime=True)
        
        # 1. updateStateByKey
        print("\nupdateStateByKey - 维护累计状态:")
        
        def update_function(new_values, running_count):
            """
            状态更新函数
            new_values: 当前批次的新值列表
            running_count: 之前的累计值
            """
            if running_count is None:
                running_count = 0
            return sum(new_values) + running_count
        
        running_counts = kv_stream.updateStateByKey(update_function)
        running_counts.pprint(prefix="Running Counts: ")
        
        # 2. mapWithState(更高效的状态操作)
        print("\nmapWithState - 高效状态管理:")
        
        from pyspark.streaming import StateSpec
        
        def mapping_function(key, value, state):
            """
            状态映射函数
            """
            if state.exists():
                current_count = state.get() + value
            else:
                current_count = value
            
            state.update(current_count)
            return (key, current_count)
        
        state_spec = StateSpec.function(mapping_function)
        stateful_stream = kv_stream.mapWithState(state_spec)
        stateful_stream.pprint(prefix="Stateful: ")
        
        return running_counts
    
    def output_operations(self):
        """
        DStream输出操作
        """
        print("\n\n5. DStream输出操作:")
        
        # 创建测试数据
        output_data = [
            self.sc.parallelize(["result1", "result2", "result3"]),
            self.sc.parallelize(["result4", "result5", "result6"]),
            self.sc.parallelize(["result7", "result8", "result9"])
        ]
        
        output_stream = self.ssc.queueStream(output_data, oneAtATime=True)
        
        # 1. print输出
        print("\nprint输出:")
        output_stream.pprint(prefix="Print: ")
        
        # 2. saveAsTextFiles输出
        print("\nsaveAsTextFiles输出:")
        output_stream.saveAsTextFiles("/tmp/streaming_output")
        
        # 3. foreachRDD自定义输出
        print("\nforeachRDD自定义输出:")
        
        def process_rdd(time, rdd):
            """
            自定义RDD处理函数
            """
            if not rdd.isEmpty():
                print(f"Time: {time}")
                print(f"Count: {rdd.count()}")
                print(f"Data: {rdd.collect()}")
                print("-" * 30)
        
        output_stream.foreachRDD(process_rdd)
        
        return output_stream
    
    def demonstrate_complete_example(self):
        """
        完整的DStream示例
        """
        print("\n\n6. 完整DStream示例 - 实时词频统计:")
        
        # 创建文本数据
        text_data = [
            self.sc.parallelize([
                "apache spark is a unified analytics engine",
                "spark streaming provides scalable fault-tolerant stream processing"
            ]),
            self.sc.parallelize([
                "spark sql provides a programming abstraction called dataframes",
                "spark mllib is a machine learning library"
            ]),
            self.sc.parallelize([
                "spark graphx is a graph processing framework",
                "spark runs on hadoop yarn kubernetes and standalone"
            ])
        ]
        
        text_stream = self.ssc.queueStream(text_data, oneAtATime=True)
        
        # 设置检查点
        self.ssc.checkpoint("/tmp/wordcount_checkpoint")
        
        # 处理流程
        words = text_stream.flatMap(lambda line: line.lower().split(" "))
        word_pairs = words.map(lambda word: (word, 1))
        word_counts = word_pairs.reduceByKey(lambda a, b: a + b)
        
        # 维护全局词频统计
        def update_word_count(new_values, running_count):
            if running_count is None:
                running_count = 0
            return sum(new_values) + running_count
        
        running_word_counts = word_counts.updateStateByKey(update_word_count)
        
        # 获取热门词汇(前5个)
        def get_top_words(time, rdd):
            if not rdd.isEmpty():
                top_words = rdd.takeOrdered(5, key=lambda x: -x[1])
                print(f"\nTime: {time}")
                print("Top 5 words:")
                for word, count in top_words:
                    print(f"  {word}: {count}")
                print("-" * 40)
        
        running_word_counts.foreachRDD(get_top_words)
        
        print("实时词频统计系统已配置")
        return running_word_counts
    
    def cleanup(self):
        """
        清理资源
        """
        print("\n\nDStream演示完成,清理资源...")
        # 注意:在实际应用中,需要调用ssc.start()和ssc.awaitTermination()
        # 这里为了演示目的,不启动流处理
        
        if hasattr(self, 'ssc'):
            self.ssc.stop(stopSparkContext=True, stopGraceFully=True)
        
        print("资源清理完成")

# DStream基础操作演示
dstream_demo = DStreamBasics()

print("\nDStream编程演示")
print("=" * 18)

# 创建DStream
queue_stream = dstream_demo.create_dstreams()

# 基础转换操作
words_stream = dstream_demo.basic_transformations()

# 窗口操作
windowed_stream = dstream_demo.window_operations()

# 状态操作
stateful_stream = dstream_demo.stateful_operations()

# 输出操作
output_stream = dstream_demo.output_operations()

# 完整示例
complete_example = dstream_demo.demonstrate_complete_example()

print("\nDStream操作总结:")
print("=" * 18)
print("✓ DStream创建方式演示完成")
print("✓ 基础转换操作演示完成")
print("✓ 窗口操作演示完成")
print("✓ 状态操作演示完成")
print("✓ 输出操作演示完成")
print("✓ 完整示例演示完成")

print("\n关键概念:")
print("- DStream是Spark Streaming的核心抽象")
print("- 支持丰富的转换和输出操作")
print("- 窗口操作用于时间窗口聚合")
print("- 状态操作维护跨批次状态")
print("- 检查点机制确保容错性")

# 清理资源
dstream_demo.cleanup()

DStream高级转换操作

class DStreamAdvancedOperations:
    """
    DStream高级转换操作演示
    """
    
    def __init__(self):
        from pyspark import SparkContext, SparkConf
        from pyspark.streaming import StreamingContext
        
        conf = SparkConf().setAppName("DStreamAdvanced").setMaster("local[2]")
        self.sc = SparkContext(conf=conf)
        self.sc.setLogLevel("WARN")
        self.ssc = StreamingContext(self.sc, 2)
        
        print("\nDStream高级转换操作演示")
        print("=" * 24)
    
    def join_operations(self):
        """
        DStream连接操作
        """
        print("\n1. DStream连接操作:")
        
        # 创建两个测试流
        user_data = [
            self.sc.parallelize([("user1", "Alice"), ("user2", "Bob")]),
            self.sc.parallelize([("user3", "Charlie"), ("user1", "Alice_Updated")]),
            self.sc.parallelize([("user4", "David"), ("user2", "Bob_Updated")])
        ]
        
        activity_data = [
            self.sc.parallelize([("user1", "login"), ("user2", "purchase")]),
            self.sc.parallelize([("user1", "view"), ("user3", "search")]),
            self.sc.parallelize([("user2", "logout"), ("user4", "register")])
        ]
        
        user_stream = self.ssc.queueStream(user_data, oneAtATime=True)
        activity_stream = self.ssc.queueStream(activity_data, oneAtATime=True)
        
        # 1. 内连接
        print("\n内连接 - 匹配用户和活动:")
        joined_stream = user_stream.join(activity_stream)
        joined_stream.pprint(prefix="Joined: ")
        
        # 2. 左外连接
        print("\n左外连接 - 保留所有用户:")
        left_joined = user_stream.leftOuterJoin(activity_stream)
        left_joined.pprint(prefix="Left Join: ")
        
        # 3. 右外连接
        print("\n右外连接 - 保留所有活动:")
        right_joined = user_stream.rightOuterJoin(activity_stream)
        right_joined.pprint(prefix="Right Join: ")
        
        # 4. 全外连接
        print("\n全外连接 - 保留所有数据:")
        full_joined = user_stream.fullOuterJoin(activity_stream)
        full_joined.pprint(prefix="Full Join: ")
        
        return joined_stream
    
    def advanced_window_operations(self):
        """
        高级窗口操作
        """
        print("\n\n2. 高级窗口操作:")
        
        # 创建时间序列数据
        import time
        from datetime import datetime
        
        time_series_data = [
            self.sc.parallelize([
                ("sensor1", 25.5, datetime.now().timestamp()),
                ("sensor2", 30.2, datetime.now().timestamp())
            ]),
            self.sc.parallelize([
                ("sensor1", 26.1, datetime.now().timestamp()),
                ("sensor3", 28.7, datetime.now().timestamp())
            ]),
            self.sc.parallelize([
                ("sensor2", 31.5, datetime.now().timestamp()),
                ("sensor1", 24.8, datetime.now().timestamp())
            ])
        ]
        
        sensor_stream = self.ssc.queueStream(time_series_data, oneAtATime=True)
        
        # 1. 滑动窗口聚合
        print("\n滑动窗口聚合 - 传感器平均值:")
        
        # 提取传感器ID和值
        sensor_values = sensor_stream.map(lambda x: (x[0], x[1]))
        
        # 窗口内按传感器分组聚合
        windowed_sensors = sensor_values.window(6, 2)
        sensor_averages = windowed_sensors.groupByKey().mapValues(
            lambda values: sum(values) / len(list(values))
        )
        sensor_averages.pprint(prefix="Sensor Avg: ")
        
        # 2. 窗口内TopK
        print("\n窗口内TopK - 最高温度传感器:")
        
        def get_top_sensors(time, rdd):
            if not rdd.isEmpty():
                # 获取温度最高的前2个传感器
                top_sensors = rdd.takeOrdered(2, key=lambda x: -x[1])
                print(f"\nTime: {time}")
                print("Top 2 sensors by temperature:")
                for sensor, temp in top_sensors:
                    print(f"  {sensor}: {temp:.1f}°C")
                print("-" * 30)
        
        windowed_sensors.foreachRDD(get_top_sensors)
        
        # 3. 窗口内趋势分析
        print("\n窗口内趋势分析:")
        
        def analyze_trends(time, rdd):
            if not rdd.isEmpty():
                sensor_data = rdd.groupByKey().collectAsMap()
                print(f"\nTime: {time}")
                print("Sensor trends:")
                
                for sensor, values in sensor_data.items():
                    value_list = list(values)
                    if len(value_list) > 1:
                        trend = "上升" if value_list[-1] > value_list[0] else "下降"
                        change = abs(value_list[-1] - value_list[0])
                        print(f"  {sensor}: {trend} {change:.1f}°C")
                print("-" * 30)
        
        windowed_sensors.foreachRDD(analyze_trends)
        
        return sensor_averages
    
    def transform_operations(self):
        """
        transform操作
        """
        print("\n\n3. transform操作:")
        
        # 创建测试数据
        raw_data = [
            self.sc.parallelize(["INFO: System started", "ERROR: Connection failed", "WARN: Low memory"]),
            self.sc.parallelize(["INFO: User login", "ERROR: Database error", "DEBUG: Query executed"]),
            self.sc.parallelize(["WARN: High CPU usage", "INFO: Backup completed", "ERROR: Disk full"])
        ]
        
        log_stream = self.ssc.queueStream(raw_data, oneAtATime=True)
        
        # 1. 基础transform
        print("\n基础transform - 日志解析:")
        
        def parse_logs(time, rdd):
            """
            解析日志RDD
            """
            if rdd.isEmpty():
                return rdd
            
            # 解析日志级别和消息
            parsed = rdd.map(lambda log: {
                'level': log.split(':')[0],
                'message': log.split(':', 1)[1].strip(),
                'timestamp': time
            })
            
            return parsed
        
        parsed_logs = log_stream.transform(parse_logs)
        parsed_logs.pprint(prefix="Parsed: ")
        
        # 2. 复杂transform - 异常检测
        print("\ntransform异常检测:")
        
        def detect_anomalies(time, rdd):
            """
            检测异常日志
            """
            if rdd.isEmpty():
                return self.sc.emptyRDD()
            
            # 统计各级别日志数量
            level_counts = rdd.map(lambda log: (log.split(':')[0], 1)).reduceByKey(lambda a, b: a + b)
            level_dict = level_counts.collectAsMap()
            
            # 检测异常(ERROR日志过多)
            error_count = level_dict.get('ERROR', 0)
            total_count = sum(level_dict.values())
            
            if total_count > 0 and error_count / total_count > 0.3:  # 错误率超过30%
                alert = self.sc.parallelize([f"ALERT: High error rate detected at {time}: {error_count}/{total_count}"])
                return alert
            else:
                return self.sc.emptyRDD()
        
        anomaly_alerts = log_stream.transform(detect_anomalies)
        anomaly_alerts.pprint(prefix="Alert: ")
        
        # 3. transformWith - 与其他流结合
        print("\ntransformWith - 多流处理:")
        
        # 创建配置流
        config_data = [
            self.sc.parallelize([("error_threshold", 0.2), ("warn_threshold", 0.5)]),
            self.sc.parallelize([("error_threshold", 0.3), ("warn_threshold", 0.4)]),
            self.sc.parallelize([("error_threshold", 0.25), ("warn_threshold", 0.45)])
        ]
        
        config_stream = self.ssc.queueStream(config_data, oneAtATime=True)
        
        def adaptive_detection(log_rdd, config_rdd):
            """
            自适应异常检测
            """
            if log_rdd.isEmpty() or config_rdd.isEmpty():
                return log_rdd.sparkContext.emptyRDD()
            
            # 获取配置
            config_dict = config_rdd.collectAsMap()
            error_threshold = config_dict.get('error_threshold', 0.3)
            
            # 统计错误率
            level_counts = log_rdd.map(lambda log: (log.split(':')[0], 1)).reduceByKey(lambda a, b: a + b)
            level_dict = level_counts.collectAsMap()
            
            error_count = level_dict.get('ERROR', 0)
            total_count = sum(level_dict.values())
            
            if total_count > 0 and error_count / total_count > error_threshold:
                return log_rdd.sparkContext.parallelize([
                    f"ADAPTIVE_ALERT: Error rate {error_count/total_count:.2f} exceeds threshold {error_threshold}"
                ])
            else:
                return log_rdd.sparkContext.emptyRDD()
        
        adaptive_alerts = log_stream.transformWith(adaptive_detection, config_stream)
        adaptive_alerts.pprint(prefix="Adaptive: ")
        
        return parsed_logs
    
    def custom_receivers(self):
        """
        自定义接收器
        """
        print("\n\n4. 自定义接收器:")
        
        from pyspark.streaming import Receiver
        from pyspark.storagelevel import StorageLevel
        import threading
        import time
        import random
        
        class CustomDataReceiver(Receiver):
            """
            自定义数据接收器
            """
            
            def __init__(self, host, port):
                super(CustomDataReceiver, self).__init__(StorageLevel.MEMORY_AND_DISK_2)
                self.host = host
                self.port = port
                self.daemon = True
            
            def onStart(self):
                """
                启动接收器
                """
                self.thread = threading.Thread(target=self.receive)
                self.thread.daemon = True
                self.thread.start()
            
            def onStop(self):
                """
                停止接收器
                """
                pass
            
            def receive(self):
                """
                接收数据
                """
                try:
                    while not self.isStopped():
                        # 模拟接收数据
                        data = f"sensor_{random.randint(1, 5)},{random.uniform(20, 40):.2f},{int(time.time())}"
                        self.store(data)
                        time.sleep(1)
                except Exception as e:
                    self.restart(f"Error receiving data: {e}")
        
        print("自定义接收器类定义完成")
        
        # 使用自定义接收器的示例代码
        custom_receiver_code = '''
        # 创建自定义接收器流
        custom_stream = ssc.receiverStream(CustomDataReceiver("localhost", 9999))
        
        # 解析接收到的数据
        parsed_data = custom_stream.map(lambda line: {
            'sensor_id': line.split(',')[0],
            'temperature': float(line.split(',')[1]),
            'timestamp': int(line.split(',')[2])
        })
        
        # 处理数据
        parsed_data.pprint()
        '''
        
        print("\n自定义接收器使用示例:")
        print(custom_receiver_code)
        
        return CustomDataReceiver
    
    def performance_optimization(self):
        """
        性能优化技巧
        """
        print("\n\n5. DStream性能优化:")
        
        optimization_tips = {
            "批处理间隔优化": {
                "原则": "平衡延迟和吞吐量",
                "建议": "通常设置为500ms-2s,根据数据量调整",
                "代码": "ssc = StreamingContext(sc, 1)  # 1秒批处理间隔"
            },
            "并行度优化": {
                "原则": "充分利用集群资源",
                "建议": "接收器数量 = 输入分区数,处理并行度 = CPU核数",
                "代码": "stream.repartition(8)  # 重新分区提高并行度"
            },
            "内存优化": {
                "原则": "减少内存使用和GC压力",
                "建议": "使用序列化存储级别,及时清理缓存",
                "代码": "stream.persist(StorageLevel.MEMORY_ONLY_SER)"
            },
            "检查点优化": {
                "原则": "平衡容错性和性能",
                "建议": "检查点间隔 = 批处理间隔 * 5-10",
                "代码": "ssc.checkpoint('/path/to/checkpoint')"
            },
            "背压控制": {
                "原则": "防止数据积压",
                "建议": "启用背压机制,动态调整接收速率",
                "代码": "spark.streaming.backpressure.enabled=true"
            }
        }
        
        for optimization, details in optimization_tips.items():
            print(f"\n{optimization}:")
            for key, value in details.items():
                print(f"  {key}: {value}")
        
        # 性能监控示例
        print("\n\n性能监控示例:")
        
        monitoring_code = '''
        # 监控批处理时间
        def monitor_batch_processing(time, rdd):
            start_time = time.time()
            
            # 处理RDD
            result = rdd.count()
            
            end_time = time.time()
            processing_time = end_time - start_time
            
            print(f"Batch at {time}: {result} records, {processing_time:.2f}s")
            
            # 检查处理时间是否超过批处理间隔
            if processing_time > batch_interval:
                print(f"WARNING: Processing time exceeds batch interval!")
        
        stream.foreachRDD(monitor_batch_processing)
        '''
        
        print(monitoring_code)
        
        return optimization_tips
    
    def cleanup(self):
        """
        清理资源
        """
        if hasattr(self, 'ssc'):
            self.ssc.stop(stopSparkContext=True, stopGraceFully=True)

# DStream高级操作演示
advanced_demo = DStreamAdvancedOperations()

# 连接操作
joined_stream = advanced_demo.join_operations()

# 高级窗口操作
sensor_stream = advanced_demo.advanced_window_operations()

# transform操作
transformed_stream = advanced_demo.transform_operations()

# 自定义接收器
custom_receiver_class = advanced_demo.custom_receivers()

# 性能优化
optimization_tips = advanced_demo.performance_optimization()

print("\n\nDStream高级操作总结:")
print("=" * 24)
print("✓ 连接操作演示完成")
print("✓ 高级窗口操作演示完成")
print("✓ transform操作演示完成")
print("✓ 自定义接收器演示完成")
print("✓ 性能优化技巧演示完成")

print("\n关键要点:")
print("- 连接操作支持多种连接类型")
print("- 窗口操作支持复杂的时间窗口分析")
print("- transform操作提供灵活的RDD级别处理")
print("- 自定义接收器支持特殊数据源")
print("- 性能优化需要综合考虑多个因素")

# 清理资源
advanced_demo.cleanup()

4.3 窗口操作详解

窗口操作基础

class WindowOperationsDetailed:
    """
    窗口操作详细演示
    """
    
    def __init__(self):
        from pyspark import SparkContext, SparkConf
        from pyspark.streaming import StreamingContext
        
        conf = SparkConf().setAppName("WindowOperations").setMaster("local[2]")
        self.sc = SparkContext(conf=conf)
        self.sc.setLogLevel("WARN")
        self.ssc = StreamingContext(self.sc, 1)  # 1秒批处理间隔
        
        print("\n窗口操作详细演示")
        print("=" * 18)
    
    def window_concepts(self):
        """
        窗口操作概念解释
        """
        print("\n1. 窗口操作概念:")
        
        concepts = {
            "窗口长度(Window Length)": "窗口包含的时间范围,必须是批处理间隔的倍数",
            "滑动间隔(Slide Interval)": "窗口滑动的时间间隔,必须是批处理间隔的倍数",
            "窗口重叠": "当滑动间隔小于窗口长度时,窗口会重叠",
            "窗口跳跃": "当滑动间隔大于窗口长度时,某些数据可能被跳过",
            "窗口对齐": "窗口边界与批处理时间对齐"
        }
        
        for concept, description in concepts.items():
            print(f"  {concept}: {description}")
        
        print("\n窗口操作示例:")
        print("  批处理间隔: 1秒")
        print("  窗口长度: 6秒 (包含6个批次)")
        print("  滑动间隔: 2秒 (每2秒计算一次)")
        
        # 可视化窗口
        print("\n窗口可视化:")
        print("  时间轴: |--1--|--2--|--3--|--4--|--5--|--6--|--7--|--8--|")
        print("  窗口1:  [------6秒窗口------]")
        print("  窗口2:          [------6秒窗口------]")
        print("  窗口3:                  [------6秒窗口------]")
    
    def basic_window_operations(self):
        """
        基础窗口操作
        """
        print("\n\n2. 基础窗口操作:")
        
        # 创建数值流
        numeric_data = []
        for i in range(10):
            batch_data = list(range(i*5 + 1, (i+1)*5 + 1))  # [1,2,3,4,5], [6,7,8,9,10], ...
            numeric_data.append(self.sc.parallelize(batch_data))
        
        numeric_stream = self.ssc.queueStream(numeric_data, oneAtATime=True)
        
        # 1. window() - 基础窗口
        print("\nwindow() - 基础窗口操作:")
        windowed_stream = numeric_stream.window(6, 2)  # 6秒窗口,2秒滑动
        
        def print_window_data(time, rdd):
            if not rdd.isEmpty():
                data = rdd.collect()
                print(f"Window at {time}: {sorted(data)}")
        
        windowed_stream.foreachRDD(print_window_data)
        
        # 2. countByWindow() - 窗口计数
        print("\ncountByWindow() - 窗口内元素计数:")
        window_counts = numeric_stream.countByWindow(6, 2)
        window_counts.pprint(prefix="Count: ")
        
        # 3. reduceByWindow() - 窗口聚合
        print("\nreduceByWindow() - 窗口内求和:")
        window_sums = numeric_stream.reduceByWindow(
            lambda a, b: a + b,  # 聚合函数
            lambda a, b: a - b,  # 逆聚合函数(优化用)
            6, 2
        )
        window_sums.pprint(prefix="Sum: ")
        
        return windowed_stream
    
    def advanced_window_aggregations(self):
        """
        高级窗口聚合
        """
        print("\n\n3. 高级窗口聚合:")
        
        # 创建键值对数据
        kv_data = []
        categories = ['A', 'B', 'C']
        
        for i in range(8):
            batch_data = []
            for j in range(3):
                category = categories[j]
                value = (i * 3 + j + 1) * 10
                batch_data.append((category, value))
            kv_data.append(self.sc.parallelize(batch_data))
        
        kv_stream = self.ssc.queueStream(kv_data, oneAtATime=True)
        
        # 1. reduceByKeyAndWindow() - 按键窗口聚合
        print("\nreduceByKeyAndWindow() - 按类别窗口求和:")
        category_sums = kv_stream.reduceByKeyAndWindow(
            lambda a, b: a + b,  # 聚合函数
            lambda a, b: a - b,  # 逆聚合函数
            6, 2  # 窗口长度,滑动间隔
        )
        category_sums.pprint(prefix="Category Sums: ")
        
        # 2. groupByKeyAndWindow() - 按键窗口分组
        print("\ngroupByKeyAndWindow() - 按类别窗口分组:")
        category_groups = kv_stream.groupByKeyAndWindow(6, 2)
        
        def analyze_groups(time, rdd):
            if not rdd.isEmpty():
                groups = rdd.collectAsMap()
                print(f"\nWindow at {time}:")
                for category, values in groups.items():
                    value_list = list(values)
                    avg = sum(value_list) / len(value_list)
                    print(f"  {category}: values={value_list}, avg={avg:.1f}")
                print("-" * 40)
        
        category_groups.foreachRDD(analyze_groups)
        
        # 3. countByKeyAndWindow() - 按键窗口计数
        print("\ncountByKeyAndWindow() - 按类别窗口计数:")
        category_counts = kv_stream.countByKeyAndWindow(6, 2)
        category_counts.pprint(prefix="Category Counts: ")
        
        return category_sums
    
    def sliding_window_analytics(self):
        """
        滑动窗口分析
        """
        print("\n\n4. 滑动窗口分析:")
        
        # 创建时间序列数据(模拟股票价格)
        import random
        
        stock_data = []
        base_price = 100.0
        
        for i in range(12):
            # 模拟价格波动
            price_change = random.uniform(-2, 2)
            base_price += price_change
            
            batch_data = [
                ('AAPL', round(base_price, 2)),
                ('GOOGL', round(base_price * 1.2, 2)),
                ('MSFT', round(base_price * 0.9, 2))
            ]
            stock_data.append(self.sc.parallelize(batch_data))
        
        stock_stream = self.ssc.queueStream(stock_data, oneAtATime=True)
        
        # 1. 移动平均线
        print("\n移动平均线计算:")
        
        def calculate_moving_average(time, rdd):
            if not rdd.isEmpty():
                stock_groups = rdd.groupByKey().collectAsMap()
                print(f"\nMoving Average at {time}:")
                
                for symbol, prices in stock_groups.items():
                    price_list = list(prices)
                    if price_list:
                        avg_price = sum(price_list) / len(price_list)
                        min_price = min(price_list)
                        max_price = max(price_list)
                        print(f"  {symbol}: Avg=${avg_price:.2f}, Min=${min_price:.2f}, Max=${max_price:.2f}")
                print("-" * 50)
        
        # 5秒窗口,1秒滑动(5个数据点的移动平均)
        ma_window = stock_stream.window(5, 1)
        ma_window.foreachRDD(calculate_moving_average)
        
        # 2. 价格趋势分析
        print("\n价格趋势分析:")
        
        def analyze_trends(time, rdd):
            if not rdd.isEmpty():
                stock_groups = rdd.groupByKey().collectAsMap()
                print(f"\nTrend Analysis at {time}:")
                
                for symbol, prices in stock_groups.items():
                    price_list = list(prices)
                    if len(price_list) >= 2:
                        # 计算趋势
                        first_price = price_list[0]
                        last_price = price_list[-1]
                        change = last_price - first_price
                        change_pct = (change / first_price) * 100
                        
                        trend = "上涨" if change > 0 else "下跌" if change < 0 else "持平"
                        print(f"  {symbol}: {trend} ${change:+.2f} ({change_pct:+.1f}%)")
                print("-" * 50)
        
        # 3秒窗口,1秒滑动(短期趋势)
        trend_window = stock_stream.window(3, 1)
        trend_window.foreachRDD(analyze_trends)
        
        # 3. 波动率计算
        print("\n波动率计算:")
        
        def calculate_volatility(time, rdd):
            if not rdd.isEmpty():
                stock_groups = rdd.groupByKey().collectAsMap()
                print(f"\nVolatility at {time}:")
                
                for symbol, prices in stock_groups.items():
                    price_list = list(prices)
                    if len(price_list) >= 2:
                        # 计算标准差作为波动率指标
                        avg_price = sum(price_list) / len(price_list)
                        variance = sum((p - avg_price) ** 2 for p in price_list) / len(price_list)
                        volatility = variance ** 0.5
                        
                        print(f"  {symbol}: 波动率 ${volatility:.2f}")
                print("-" * 50)
        
        # 6秒窗口,2秒滑动(波动率分析)
        volatility_window = stock_stream.window(6, 2)
        volatility_window.foreachRDD(calculate_volatility)
        
        return stock_stream
    
    def window_performance_optimization(self):
        """
        窗口操作性能优化
        """
        print("\n\n5. 窗口操作性能优化:")
        
        optimization_strategies = {
            "使用逆函数优化": {
                "说明": "为reduceByWindow提供逆函数,避免重复计算",
                "示例": "reduceByWindow(add_func, subtract_func, window_length, slide_interval)",
                "效果": "从O(W)降低到O(S)复杂度,W=窗口长度,S=滑动间隔"
            },
            "合理设置窗口参数": {
                "说明": "窗口长度和滑动间隔应为批处理间隔的倍数",
                "示例": "批处理间隔1s,窗口长度6s,滑动间隔2s",
                "效果": "避免不必要的数据重组和计算"
            },
            "使用检查点": {
                "说明": "窗口操作需要维护历史数据,必须启用检查点",
                "示例": "ssc.checkpoint('/path/to/checkpoint')",
                "效果": "确保容错性,避免数据丢失"
            },
            "内存管理": {
                "说明": "窗口操作会缓存多个批次数据,注意内存使用",
                "示例": "调整spark.streaming.receiver.maxRate",
                "效果": "防止内存溢出,保持系统稳定"
            }
        }
        
        for strategy, details in optimization_strategies.items():
            print(f"\n{strategy}:")
            for key, value in details.items():
                print(f"  {key}: {value}")
        
        # 性能对比示例
        print("\n\n性能对比示例:")
        
        # 创建大数据量测试
        large_data = []
        for i in range(20):
            batch_data = list(range(i*1000, (i+1)*1000))  # 每批1000个数字
            large_data.append(self.sc.parallelize(batch_data))
        
        large_stream = self.ssc.queueStream(large_data, oneAtATime=True)
        
        # 1. 无优化版本
        print("\n无优化版本(仅聚合函数):")
        unoptimized_sum = large_stream.reduceByWindow(
            lambda a, b: a + b,  # 只有聚合函数
            10, 2  # 10秒窗口,2秒滑动
        )
        
        def monitor_performance(prefix):
            def monitor(time, rdd):
                import time as time_module
                start_time = time_module.time()
                
                if not rdd.isEmpty():
                    result = rdd.collect()[0] if rdd.count() > 0 else 0
                    
                end_time = time_module.time()
                processing_time = end_time - start_time
                
                print(f"{prefix} - Time: {time}, Result: {result}, Processing: {processing_time:.3f}s")
            
            return monitor
        
        unoptimized_sum.foreachRDD(monitor_performance("Unoptimized"))
        
        # 2. 优化版本
        print("\n优化版本(聚合+逆函数):")
        optimized_sum = large_stream.reduceByWindow(
            lambda a, b: a + b,  # 聚合函数
            lambda a, b: a - b,  # 逆函数
            10, 2  # 10秒窗口,2秒滑动
        )
        
        optimized_sum.foreachRDD(monitor_performance("Optimized"))
        
        return large_stream
    
    def window_use_cases(self):
        """
        窗口操作应用场景
        """
        print("\n\n6. 窗口操作应用场景:")
        
        use_cases = {
            "实时监控": {
                "场景": "系统性能监控、网络流量分析",
                "窗口设置": "短窗口(1-5分钟),频繁更新",
                "关键指标": "平均值、最大值、异常检测"
            },
            "业务分析": {
                "场景": "销售趋势、用户活跃度分析",
                "窗口设置": "中等窗口(5-60分钟),定期更新",
                "关键指标": "总和、计数、增长率"
            },
            "金融分析": {
                "场景": "股票价格分析、风险监控",
                "窗口设置": "多时间窗口(1分钟-1小时)",
                "关键指标": "移动平均、波动率、趋势"
            },
            "IoT数据处理": {
                "场景": "传感器数据聚合、设备状态监控",
                "窗口设置": "灵活窗口(秒级-小时级)",
                "关键指标": "平均值、异常值、状态变化"
            }
        }
        
        for use_case, details in use_cases.items():
            print(f"\n{use_case}:")
            for key, value in details.items():
                print(f"  {key}: {value}")
        
        print("\n窗口选择建议:")
        recommendations = [
            "1. 根据业务需求确定窗口长度",
            "2. 平衡实时性和计算成本",
            "3. 考虑数据到达的延迟和乱序",
            "4. 使用逆函数优化性能",
            "5. 监控内存使用和处理延迟"
        ]
        
        for rec in recommendations:
            print(f"  {rec}")
    
    def cleanup(self):
        """
        清理资源
        """
        if hasattr(self, 'ssc'):
            self.ssc.stop(stopSparkContext=True, stopGraceFully=True)

# 窗口操作详细演示
window_demo = WindowOperationsDetailed()

# 窗口概念
window_demo.window_concepts()

# 基础窗口操作
basic_window = window_demo.basic_window_operations()

# 高级窗口聚合
advanced_aggregations = window_demo.advanced_window_aggregations()

# 滑动窗口分析
sliding_analytics = window_demo.sliding_window_analytics()

# 性能优化
performance_optimization = window_demo.window_performance_optimization()

# 应用场景
window_demo.window_use_cases()

print("\n\n窗口操作总结:")
print("=" * 18)
print("✓ 窗口操作概念解释完成")
print("✓ 基础窗口操作演示完成")
print("✓ 高级窗口聚合演示完成")
print("✓ 滑动窗口分析演示完成")
print("✓ 性能优化技巧演示完成")
print("✓ 应用场景分析完成")

print("\n核心要点:")
print("- 窗口操作是流处理的核心功能")
print("- 合理设置窗口参数至关重要")
print("- 使用逆函数可显著提升性能")
print("- 不同场景需要不同的窗口策略")
print("- 必须启用检查点确保容错性")

# 清理资源
window_demo.cleanup()

4.4 状态管理

状态管理基础

class StateManagement:
    """
    状态管理详细演示
    """
    
    def __init__(self):
        from pyspark import SparkContext, SparkConf
        from pyspark.streaming import StreamingContext
        
        conf = SparkConf().setAppName("StateManagement").setMaster("local[2]")
        self.sc = SparkContext(conf=conf)
        self.sc.setLogLevel("WARN")
        self.ssc = StreamingContext(self.sc, 2)
        
        # 状态管理必须设置检查点
        self.ssc.checkpoint("/tmp/state_checkpoint")
        
        print("\n状态管理详细演示")
        print("=" * 18)
    
    def state_concepts(self):
        """
        状态管理概念
        """
        print("\n1. 状态管理概念:")
        
        concepts = {
            "状态(State)": "跨批次维护的数据,如累计计数、运行总和等",
            "状态更新": "基于新数据更新现有状态的过程",
            "状态存储": "状态数据存储在内存和检查点中",
            "状态恢复": "从检查点恢复状态,确保容错性",
            "状态清理": "清理过期或不需要的状态数据"
        }
        
        for concept, description in concepts.items():
            print(f"  {concept}: {description}")
        
        print("\n状态管理的重要性:")
        importance = [
            "1. 支持累计计算(如总计数、运行平均)",
            "2. 实现复杂的业务逻辑(如用户会话跟踪)",
            "3. 提供容错能力(通过检查点机制)",
            "4. 支持长时间运行的流处理应用"
        ]
        
        for point in importance:
            print(f"  {point}")
    
    def updateStateByKey_operations(self):
        """
        updateStateByKey操作详解
        """
        print("\n\n2. updateStateByKey操作:")
        
        # 创建用户活动数据
        user_activity_data = [
            self.sc.parallelize([
                ("user1", "login"), ("user2", "view"), ("user1", "click")
            ]),
            self.sc.parallelize([
                ("user2", "purchase"), ("user3", "login"), ("user1", "logout")
            ]),
            self.sc.parallelize([
                ("user3", "view"), ("user1", "login"), ("user2", "click")
            ]),
            self.sc.parallelize([
                ("user2", "logout"), ("user3", "purchase"), ("user4", "login")
            ])
        ]
        
        activity_stream = self.ssc.queueStream(user_activity_data, oneAtATime=True)
        
        # 1. 简单计数状态
        print("\n简单计数状态:")
        
        # 转换为(user, 1)格式
        user_counts = activity_stream.map(lambda x: (x[0], 1))
        
        def update_count(new_values, running_count):
            """
            更新用户活动计数
            """
            if running_count is None:
                running_count = 0
            return sum(new_values) + running_count
        
        running_user_counts = user_counts.updateStateByKey(update_count)
        running_user_counts.pprint(prefix="User Counts: ")
        
        # 2. 复杂状态管理
        print("\n复杂状态管理 - 用户会话跟踪:")
        
        def update_user_session(new_activities, current_session):
            """
            更新用户会话状态
            """
            import time
            current_time = time.time()
            
            # 初始化会话状态
            if current_session is None:
                current_session = {
                    'total_activities': 0,
                    'last_activity_time': current_time,
                    'session_start': current_time,
                    'activities': []
                }
            
            # 更新会话信息
            if new_activities:
                current_session['total_activities'] += len(new_activities)
                current_session['last_activity_time'] = current_time
                current_session['activities'].extend(new_activities)
                
                # 保持最近10个活动
                if len(current_session['activities']) > 10:
                    current_session['activities'] = current_session['activities'][-10:]
            
            # 检查会话超时(假设5分钟超时)
            session_timeout = 300  # 5分钟
            if current_time - current_session['last_activity_time'] > session_timeout:
                # 会话超时,重新开始
                if new_activities:
                    current_session = {
                        'total_activities': len(new_activities),
                        'last_activity_time': current_time,
                        'session_start': current_time,
                        'activities': list(new_activities)
                    }
                else:
                    return None  # 删除过期会话
            
            return current_session
        
        user_sessions = activity_stream.updateStateByKey(update_user_session)
        
        def print_sessions(time, rdd):
            if not rdd.isEmpty():
                sessions = rdd.collectAsMap()
                print(f"\nUser Sessions at {time}:")
                for user, session in sessions.items():
                    if session:
                        duration = session['last_activity_time'] - session['session_start']
                        print(f"  {user}: {session['total_activities']} activities, "
                              f"duration: {duration:.0f}s, recent: {session['activities'][-3:]}")
                print("-" * 60)
        
        user_sessions.foreachRDD(print_sessions)
        
        # 3. 状态过滤和清理
        print("\n状态过滤和清理:")
        
        def update_with_cleanup(new_values, running_count):
            """
            带清理功能的状态更新
            """
            if running_count is None:
                running_count = 0
            
            new_count = sum(new_values) + running_count
            
            # 清理策略:如果计数超过100,重置为0
            if new_count > 100:
                print(f"Resetting count for key (was {new_count})")
                return 0
            
            # 如果没有新值且当前计数小于5,删除状态
            if not new_values and new_count < 5:
                return None  # 删除状态
            
            return new_count
        
        cleaned_counts = user_counts.updateStateByKey(update_with_cleanup)
        cleaned_counts.pprint(prefix="Cleaned Counts: ")
        
        return running_user_counts
    
    def mapWithState_operations(self):
        """
        mapWithState操作详解
        """
        print("\n\n3. mapWithState操作:")
        
        from pyspark.streaming import StateSpec
        
        # 创建订单数据
        order_data = [
            self.sc.parallelize([
                ("customer1", 100.0), ("customer2", 250.0), ("customer1", 75.0)
            ]),
            self.sc.parallelize([
                ("customer2", 150.0), ("customer3", 300.0), ("customer1", 200.0)
            ]),
            self.sc.parallelize([
                ("customer3", 80.0), ("customer1", 120.0), ("customer4", 400.0)
            ])
        ]
        
        order_stream = self.ssc.queueStream(order_data, oneAtATime=True)
        
        # 1. 基础mapWithState
        print("\n基础mapWithState - 客户总消费:")
        
        def track_customer_spending(customer, amount, state):
            """
            跟踪客户消费状态
            """
            # 获取当前状态
            if state.exists():
                current_total = state.get()
            else:
                current_total = 0.0
            
            # 更新状态
            new_total = current_total + amount
            state.update(new_total)
            
            # 返回结果
            return (customer, new_total)
        
        # 创建StateSpec
        state_spec = StateSpec.function(track_customer_spending)
        customer_totals = order_stream.mapWithState(state_spec)
        customer_totals.pprint(prefix="Customer Totals: ")
        
        # 2. 带超时的mapWithState
        print("\n带超时的mapWithState:")
        
        def track_with_timeout(customer, amount, state):
            """
            带超时的客户状态跟踪
            """
            import time
            current_time = time.time()
            
            # 获取状态
            if state.exists():
                customer_data = state.get()
            else:
                customer_data = {
                    'total_spent': 0.0,
                    'order_count': 0,
                    'last_order_time': current_time,
                    'first_order_time': current_time
                }
            
            # 更新状态
            customer_data['total_spent'] += amount
            customer_data['order_count'] += 1
            customer_data['last_order_time'] = current_time
            
            # 计算客户价值等级
            if customer_data['total_spent'] >= 1000:
                tier = "VIP"
            elif customer_data['total_spent'] >= 500:
                tier = "Gold"
            elif customer_data['total_spent'] >= 200:
                tier = "Silver"
            else:
                tier = "Bronze"
            
            customer_data['tier'] = tier
            
            # 更新状态
            state.update(customer_data)
            
            # 返回客户信息
            return (customer, {
                'current_order': amount,
                'total_spent': customer_data['total_spent'],
                'order_count': customer_data['order_count'],
                'tier': tier
            })
        
        # 创建带超时的StateSpec(30秒超时)
        timeout_state_spec = StateSpec.function(track_with_timeout).timeout(30)
        customer_profiles = order_stream.mapWithState(timeout_state_spec)
        
        def print_customer_profiles(time, rdd):
            if not rdd.isEmpty():
                profiles = rdd.collectAsMap()
                print(f"\nCustomer Profiles at {time}:")
                for customer, profile in profiles.items():
                    print(f"  {customer}: ${profile['current_order']:.2f} "
                          f"(Total: ${profile['total_spent']:.2f}, "
                          f"Orders: {profile['order_count']}, Tier: {profile['tier']})")
                print("-" * 70)
        
        customer_profiles.foreachRDD(print_customer_profiles)
        
        # 3. 状态快照
        print("\n状态快照:")
        
        # 获取状态快照
        state_snapshots = customer_profiles.stateSnapshots()
        
        def print_state_snapshots(time, rdd):
            if not rdd.isEmpty():
                snapshots = rdd.collectAsMap()
                print(f"\nState Snapshots at {time}:")
                for customer, state_data in snapshots.items():
                    print(f"  {customer}: {state_data}")
                print("-" * 70)
        
        state_snapshots.foreachRDD(print_state_snapshots)
        
        return customer_totals
    
    def state_performance_optimization(self):
        """
        状态管理性能优化
        """
        print("\n\n4. 状态管理性能优化:")
        
        optimization_strategies = {
            "选择合适的状态操作": {
                "updateStateByKey": "简单状态更新,但性能较低",
                "mapWithState": "高性能状态管理,支持超时和快照",
                "建议": "新应用优先使用mapWithState"
            },
            "状态数据结构优化": {
                "避免大对象": "状态对象应尽量小,避免存储大量数据",
                "使用原始类型": "优先使用基本数据类型,减少序列化开销",
                "数据压缩": "对大状态对象考虑压缩存储"
            },
            "检查点优化": {
                "检查点间隔": "设置合理的检查点间隔,平衡性能和容错",
                "存储位置": "使用高性能存储系统(如HDFS、S3)",
                "清理策略": "定期清理旧的检查点文件"
            },
            "内存管理": {
                "状态大小监控": "监控状态数据大小,防止内存溢出",
                "垃圾回收": "调优JVM垃圾回收参数",
                "分区策略": "合理设置分区数,平衡负载"
            }
        }
        
        for category, strategies in optimization_strategies.items():
            print(f"\n{category}:")
            for strategy, description in strategies.items():
                print(f"  {strategy}: {description}")
        
        # 性能监控示例
        print("\n\n性能监控示例:")
        
        monitoring_code = '''
        # 监控状态大小
        def monitor_state_size(time, rdd):
            if not rdd.isEmpty():
                state_count = rdd.count()
                sample_states = rdd.take(5)
                
                print(f"Time: {time}")
                print(f"State count: {state_count}")
                print(f"Sample states: {sample_states}")
                
                # 检查状态大小
                if state_count > 10000:
                    print("WARNING: Large number of states detected!")
        
        stateful_stream.foreachRDD(monitor_state_size)
        
        # 监控处理延迟
        def monitor_processing_delay(time, rdd):
            import time as time_module
            current_time = time_module.time()
            batch_time = time.milliseconds() / 1000.0
            delay = current_time - batch_time
            
            print(f"Processing delay: {delay:.2f} seconds")
            
            if delay > 10:  # 延迟超过10秒
                print("WARNING: High processing delay detected!")
        
        stateful_stream.foreachRDD(monitor_processing_delay)
        '''
        
        print(monitoring_code)
        
        return optimization_strategies
    
    def state_use_cases(self):
        """
        状态管理应用场景
        """
        print("\n\n5. 状态管理应用场景:")
        
        use_cases = {
            "用户会话跟踪": {
                "场景": "跟踪用户在网站或应用中的行为会话",
                "状态内容": "会话开始时间、活动列表、会话状态",
                "更新逻辑": "基于用户活动更新会话,处理超时"
            },
            "实时计数器": {
                "场景": "网站访问量、API调用次数、错误计数",
                "状态内容": "各种维度的累计计数",
                "更新逻辑": "增量更新计数,支持重置和清理"
            },
            "欺诈检测": {
                "场景": "检测异常交易模式、账户风险评估",
                "状态内容": "用户历史行为、风险评分、规则状态",
                "更新逻辑": "基于新交易更新风险模型"
            },
            "推荐系统": {
                "场景": "实时更新用户偏好、物品热度",
                "状态内容": "用户画像、物品特征、交互历史",
                "更新逻辑": "基于用户行为实时调整推荐模型"
            },
            "IoT设备监控": {
                "场景": "设备状态跟踪、异常检测、预测维护",
                "状态内容": "设备历史数据、健康状态、预测模型",
                "更新逻辑": "基于传感器数据更新设备状态"
            }
        }
        
        for use_case, details in use_cases.items():
            print(f"\n{use_case}:")
            for key, value in details.items():
                print(f"  {key}: {value}")
        
        print("\n状态管理最佳实践:")
        best_practices = [
            "1. 合理设计状态数据结构,避免过大对象",
            "2. 实现状态清理逻辑,防止内存泄漏",
            "3. 使用mapWithState替代updateStateByKey",
            "4. 设置合适的超时时间和检查点间隔",
            "5. 监控状态大小和处理延迟",
            "6. 考虑状态分区策略,平衡负载"
        ]
        
        for practice in best_practices:
            print(f"  {practice}")
    
    def cleanup(self):
        """
        清理资源
        """
        if hasattr(self, 'ssc'):
            self.ssc.stop(stopSparkContext=True, stopGraceFully=True)

# 状态管理演示
state_demo = StateManagement()

# 状态概念
state_demo.state_concepts()

# updateStateByKey操作
update_state_demo = state_demo.updateStateByKey_operations()

# mapWithState操作
map_state_demo = state_demo.mapWithState_operations()

# 性能优化
state_optimization = state_demo.state_performance_optimization()

# 应用场景
state_demo.state_use_cases()

print("\n\n状态管理总结:")
print("=" * 18)
print("✓ 状态管理概念解释完成")
print("✓ updateStateByKey操作演示完成")
print("✓ mapWithState操作演示完成")
print("✓ 性能优化技巧演示完成")
print("✓ 应用场景分析完成")

print("\n核心要点:")
print("- 状态管理是流处理的重要功能")
print("- mapWithState比updateStateByKey性能更好")
print("- 必须设置检查点确保容错性")
print("- 需要合理设计状态清理策略")
print("- 监控状态大小和处理性能")

# 清理资源
state_demo.cleanup()

4.5 数据源集成

多种数据源支持

class DataSourceIntegration:
    """
    数据源集成详细演示
    """
    
    def __init__(self):
        from pyspark import SparkContext, SparkConf
        from pyspark.streaming import StreamingContext
        
        conf = SparkConf().setAppName("DataSourceIntegration").setMaster("local[2]")
        self.sc = SparkContext(conf=conf)
        self.sc.setLogLevel("WARN")
        self.ssc = StreamingContext(self.sc, 2)
        
        print("\n数据源集成详细演示")
        print("=" * 20)
    
    def socket_data_source(self):
        """
        Socket数据源
        """
        print("\n1. Socket数据源:")
        
        print("\nSocket数据源特点:")
        features = {
            "实时性": "低延迟,适合实时数据传输",
            "简单性": "配置简单,适合开发和测试",
            "可靠性": "无容错机制,生产环境需要额外保障",
            "扩展性": "单点连接,扩展性有限"
        }
        
        for feature, description in features.items():
            print(f"  {feature}: {description}")
        
        print("\nSocket数据源使用示例:")
        socket_example = '''
        # 创建Socket DStream
        socket_stream = ssc.socketTextStream("localhost", 9999)
        
        # 处理数据
        words = socket_stream.flatMap(lambda line: line.split(" "))
        word_counts = words.map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
        
        # 输出结果
        word_counts.pprint()
        
        # 启动流处理
        ssc.start()
        ssc.awaitTermination()
        '''
        print(socket_example)
        
        print("\n使用场景:")
        use_cases = [
            "1. 开发和测试环境的数据模拟",
            "2. 简单的实时数据传输",
            "3. 原型系统的快速验证",
            "4. 教学和演示用途"
        ]
        
        for case in use_cases:
            print(f"  {case}")
    
    def file_data_source(self):
        """
        文件数据源
        """
        print("\n\n2. 文件数据源:")
        
        print("\n文件数据源类型:")
        file_types = {
            "textFileStream": "监控文本文件目录,处理新增文件",
            "binaryRecordsStream": "处理二进制记录文件",
            "fileStream": "自定义文件格式处理"
        }
        
        for file_type, description in file_types.items():
            print(f"  {file_type}: {description}")
        
        print("\n文件数据源示例:")
        file_example = '''
        # 1. 文本文件流
        text_stream = ssc.textFileStream("/path/to/directory")
        
        # 处理日志文件
        log_pattern = r'(\S+) - - \[(.*?)\] "(\S+) (\S+) (\S+)" (\d+) (\d+)'
        
        def parse_log_line(line):
            import re
            match = re.match(log_pattern, line)
            if match:
                return {
                    'ip': match.group(1),
                    'timestamp': match.group(2),
                    'method': match.group(3),
                    'url': match.group(4),
                    'status': int(match.group(6)),
                    'size': int(match.group(7))
                }
            return None
        
        parsed_logs = text_stream.map(parse_log_line).filter(lambda x: x is not None)
        
        # 2. 二进制文件流
        binary_stream = ssc.binaryRecordsStream("/path/to/binary/files", recordLength=1024)
        
        # 3. 自定义文件流
        from pyspark.streaming import StreamingContext
        
        def custom_file_processor(rdd):
            # 自定义文件处理逻辑
            return rdd.map(lambda x: process_custom_format(x))
        
        custom_stream = ssc.fileStream(
            "/path/to/custom/files",
            "org.apache.spark.input.PortableDataStream",
            custom_file_processor
        )
        '''
        print(file_example)
        
        print("\n文件数据源配置:")
        configurations = {
            "监控间隔": "默认与批处理间隔相同,可通过参数调整",
            "文件过滤": "支持文件名模式匹配和过滤",
            "处理策略": "只处理新增文件,已处理文件不会重复处理",
            "容错机制": "支持检查点恢复,确保文件处理的一致性"
        }
        
        for config, description in configurations.items():
            print(f"  {config}: {description}")
    
    def kafka_integration(self):
        """
        Kafka集成
        """
        print("\n\n3. Kafka集成:")
        
        print("\nKafka集成优势:")
        advantages = {
            "高吞吐量": "支持大规模数据流处理",
            "容错性": "内置副本机制,确保数据可靠性",
            "扩展性": "支持水平扩展,处理能力可线性增长",
            "持久化": "数据持久化存储,支持重放和恢复",
            "分区支持": "支持数据分区,提高并行处理能力"
        }
        
        for advantage, description in advantages.items():
            print(f"  {advantage}: {description}")
        
        print("\nKafka集成示例:")
        kafka_example = '''
        # 需要添加Kafka依赖
        # spark-submit --packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.x.x
        
        from pyspark.streaming.kafka import KafkaUtils
        from kafka import KafkaProducer
        import json
        
        # 1. 创建Kafka DStream
        kafka_params = {
            "bootstrap.servers": "localhost:9092",
            "key.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",
            "value.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",
            "group.id": "spark-streaming-group",
            "auto.offset.reset": "latest",
            "enable.auto.commit": False
        }
        
        topics = ["user-events", "system-logs"]
        
        kafka_stream = KafkaUtils.createDirectStream(
            ssc,
            topics,
            kafka_params
        )
        
        # 2. 处理Kafka消息
        def process_kafka_message(message):
            try:
                # 解析JSON消息
                data = json.loads(message.value)
                return {
                    'topic': message.topic,
                    'partition': message.partition,
                    'offset': message.offset,
                    'timestamp': message.timestamp,
                    'data': data
                }
            except Exception as e:
                print(f"Error processing message: {e}")
                return None
        
        processed_stream = kafka_stream.map(process_kafka_message).filter(lambda x: x is not None)
        
        # 3. 按主题分流处理
        user_events = processed_stream.filter(lambda x: x['topic'] == 'user-events')
        system_logs = processed_stream.filter(lambda x: x['topic'] == 'system-logs')
        
        # 4. 用户事件处理
        def process_user_events(rdd):
            if not rdd.isEmpty():
                events = rdd.map(lambda x: x['data']).collect()
                for event in events:
                    print(f"User Event: {event}")
        
        user_events.foreachRDD(process_user_events)
        
        # 5. 系统日志处理
        def process_system_logs(rdd):
            if not rdd.isEmpty():
                logs = rdd.map(lambda x: x['data']).collect()
                error_logs = [log for log in logs if log.get('level') == 'ERROR']
                if error_logs:
                    print(f"Error logs detected: {len(error_logs)}")
        
        system_logs.foreachRDD(process_system_logs)
        '''
        print(kafka_example)
        
        print("\nKafka配置最佳实践:")
        best_practices = [
            "1. 合理设置消费者组ID,避免重复消费",
            "2. 配置合适的批处理间隔,平衡延迟和吞吐量",
            "3. 启用检查点,确保偏移量管理的一致性",
            "4. 监控消费延迟,及时发现性能问题",
            "5. 使用Direct Stream API,获得更好的性能"
        ]
        
        for practice in best_practices:
            print(f"  {practice}")
    
    def custom_receiver(self):
        """
        自定义接收器
        """
        print("\n\n4. 自定义接收器:")
        
        print("\n自定义接收器应用场景:")
        scenarios = {
            "特殊协议": "处理非标准协议的数据源",
            "第三方API": "集成第三方服务的实时数据",
            "硬件设备": "直接从硬件设备读取数据",
            "自定义格式": "处理特殊格式的数据流"
        }
        
        for scenario, description in scenarios.items():
            print(f"  {scenario}: {description}")
        
        print("\n自定义接收器实现:")
        custom_receiver_example = '''
        from pyspark.streaming.receiver import Receiver
        import threading
        import time
        import random
        
        class CustomDataReceiver(Receiver):
            """
            自定义数据接收器示例
            """
            
            def __init__(self, host, port):
                super(CustomDataReceiver, self).__init__()
                self.host = host
                self.port = port
                self.daemon = True
            
            def onStart(self):
                """
                启动接收器
                """
                # 启动数据接收线程
                self.thread = threading.Thread(target=self.receive_data)
                self.thread.daemon = True
                self.thread.start()
            
            def onStop(self):
                """
                停止接收器
                """
                self.stopped = True
                if hasattr(self, 'thread'):
                    self.thread.join()
            
            def receive_data(self):
                """
                数据接收逻辑
                """
                self.stopped = False
                
                try:
                    # 模拟连接到数据源
                    print(f"Connecting to {self.host}:{self.port}")
                    
                    while not self.stopped:
                        # 模拟接收数据
                        data = self.generate_sample_data()
                        
                        # 存储数据到Spark Streaming
                        self.store(data)
                        
                        # 控制数据生成频率
                        time.sleep(0.1)
                        
                except Exception as e:
                    print(f"Error in receiver: {e}")
                    self.restart(f"Error in receiver: {e}")
            
            def generate_sample_data(self):
                """
                生成示例数据
                """
                return {
                    'timestamp': time.time(),
                    'sensor_id': f"sensor_{random.randint(1, 10)}",
                    'temperature': round(random.uniform(20, 35), 2),
                    'humidity': round(random.uniform(40, 80), 2),
                    'pressure': round(random.uniform(1000, 1020), 2)
                }
        
        # 使用自定义接收器
        def create_custom_stream(ssc, host, port):
            return ssc.receiverStream(CustomDataReceiver(host, port))
        
        # 创建自定义数据流
        custom_stream = create_custom_stream(ssc, "localhost", 8888)
        
        # 处理自定义数据
        def process_sensor_data(rdd):
            if not rdd.isEmpty():
                data = rdd.collect()
                for record in data:
                    print(f"Sensor Data: {record}")
                    
                    # 异常检测
                    if record['temperature'] > 30:
                        print(f"High temperature alert: {record['temperature']}°C")
        
        custom_stream.foreachRDD(process_sensor_data)
        '''
        print(custom_receiver_example)
        
        print("\n自定义接收器最佳实践:")
        best_practices = [
            "1. 实现适当的错误处理和重启机制",
            "2. 使用线程安全的数据结构",
            "3. 控制数据接收频率,避免内存溢出",
            "4. 实现优雅的启动和停止逻辑",
            "5. 添加监控和日志记录功能"
        ]
        
        for practice in best_practices:
            print(f"  {practice}")
    
    def data_source_comparison(self):
        """
        数据源对比分析
        """
        print("\n\n5. 数据源对比分析:")
        
        comparison_table = {
            "数据源": ["Socket", "File", "Kafka", "自定义接收器"],
            "实时性": ["高", "中", "高", "高"],
            "可靠性": ["低", "中", "高", "取决于实现"],
            "扩展性": ["低", "中", "高", "取决于实现"],
            "复杂度": ["低", "低", "中", "高"],
            "适用场景": [
                "开发测试",
                "批量文件处理",
                "生产环境",
                "特殊需求"
            ]
        }
        
        # 打印对比表格
        print("\n数据源对比表:")
        headers = list(comparison_table.keys())
        print(f"{'':12} | {'':8} | {'':8} | {'':8} | {'':8} | {'':15}")
        print(f"{headers[0]:12} | {headers[1]:8} | {headers[2]:8} | {headers[3]:8} | {headers[4]:8} | {headers[5]:15}")
        print("-" * 80)
        
        for i in range(len(comparison_table["数据源"])):
            row = [comparison_table[key][i] for key in headers]
            print(f"{row[0]:12} | {row[1]:8} | {row[2]:8} | {row[3]:8} | {row[4]:8} | {row[5]:15}")
        
        print("\n选择建议:")
        recommendations = {
            "开发和测试": "使用Socket数据源,简单快速",
            "文件批处理": "使用File数据源,适合日志分析",
            "生产环境": "使用Kafka,提供最佳的可靠性和扩展性",
            "特殊需求": "开发自定义接收器,满足特定要求"
        }
        
        for scenario, recommendation in recommendations.items():
            print(f"  {scenario}: {recommendation}")
    
    def cleanup(self):
        """
        清理资源
        """
        if hasattr(self, 'ssc'):
            self.ssc.stop(stopSparkContext=True, stopGraceFully=True)

# 数据源集成演示
data_source_demo = DataSourceIntegration()

# Socket数据源
data_source_demo.socket_data_source()

# 文件数据源
data_source_demo.file_data_source()

# Kafka集成
data_source_demo.kafka_integration()

# 自定义接收器
data_source_demo.custom_receiver()

# 数据源对比
data_source_demo.data_source_comparison()

print("\n\n数据源集成总结:")
print("=" * 20)
print("✓ Socket数据源演示完成")
print("✓ 文件数据源演示完成")
print("✓ Kafka集成演示完成")
print("✓ 自定义接收器演示完成")
print("✓ 数据源对比分析完成")

print("\n核心要点:")
print("- 选择合适的数据源是流处理成功的关键")
print("- Kafka是生产环境的首选数据源")
print("- 自定义接收器可以满足特殊需求")
print("- 需要考虑实时性、可靠性和扩展性")
print("- 不同数据源有不同的配置和优化策略")

# 清理资源
data_source_demo.cleanup()

4.6 实际案例:实时监控系统

系统架构设计

class RealTimeMonitoringSystem:
    """
    实时监控系统完整案例
    """
    
    def __init__(self):
        from pyspark import SparkContext, SparkConf
        from pyspark.streaming import StreamingContext
        import json
        import time
        import random
        
        # Spark配置
        conf = SparkConf().setAppName("RealTimeMonitoring").setMaster("local[4]")
        conf.set("spark.streaming.backpressure.enabled", "true")
        conf.set("spark.streaming.kafka.maxRatePerPartition", "1000")
        
        self.sc = SparkContext(conf=conf)
        self.sc.setLogLevel("WARN")
        self.ssc = StreamingContext(self.sc, 5)  # 5秒批处理间隔
        
        # 设置检查点
        self.ssc.checkpoint("/tmp/monitoring_checkpoint")
        
        # 系统状态
        self.alert_thresholds = {
            'cpu_usage': 80.0,
            'memory_usage': 85.0,
            'disk_usage': 90.0,
            'network_latency': 100.0,
            'error_rate': 5.0
        }
        
        print("\n实时监控系统")
        print("=" * 16)
        print("系统功能:")
        print("- 服务器性能监控")
        print("- 应用程序监控")
        print("- 网络监控")
        print("- 异常检测和告警")
        print("- 实时仪表板")
    
    def generate_monitoring_data(self):
        """
        生成监控数据(模拟真实环境)
        """
        print("\n1. 监控数据生成:")
        
        # 服务器列表
        servers = ["web-01", "web-02", "db-01", "cache-01", "app-01"]
        services = ["nginx", "mysql", "redis", "tomcat", "elasticsearch"]
        
        monitoring_data = []
        
        for i in range(20):  # 生成20个批次的数据
            batch_data = []
            
            for j in range(50):  # 每批50条记录
                server = random.choice(servers)
                service = random.choice(services)
                
                # 模拟不同的监控指标
                base_cpu = 30 + random.uniform(-10, 40)
                base_memory = 40 + random.uniform(-15, 45)
                
                # 偶尔生成异常数据
                if random.random() < 0.1:  # 10%概率生成异常
                    base_cpu += random.uniform(30, 50)
                    base_memory += random.uniform(20, 40)
                
                record = {
                    'timestamp': time.time() + i * 5 + j * 0.1,
                    'server': server,
                    'service': service,
                    'metrics': {
                        'cpu_usage': min(100, max(0, base_cpu)),
                        'memory_usage': min(100, max(0, base_memory)),
                        'disk_usage': random.uniform(20, 95),
                        'network_latency': random.uniform(1, 200),
                        'requests_per_second': random.randint(10, 1000),
                        'error_count': random.randint(0, 20),
                        'response_time': random.uniform(50, 2000)
                    },
                    'status': 'normal' if base_cpu < 80 and base_memory < 85 else 'warning'
                }
                
                batch_data.append(json.dumps(record))
            
            monitoring_data.append(self.sc.parallelize(batch_data))
        
        return self.ssc.queueStream(monitoring_data, oneAtATime=True)
    
    def real_time_metrics_processing(self, data_stream):
        """
        实时指标处理
        """
        print("\n2. 实时指标处理:")
        
        # 解析JSON数据
        def parse_monitoring_data(json_str):
            try:
                return json.loads(json_str)
            except:
                return None
        
        parsed_stream = data_stream.map(parse_monitoring_data).filter(lambda x: x is not None)
        
        # 1. 服务器性能聚合
        print("\n服务器性能聚合:")
        
        def extract_server_metrics(record):
            return (record['server'], {
                'cpu': record['metrics']['cpu_usage'],
                'memory': record['metrics']['memory_usage'],
                'disk': record['metrics']['disk_usage'],
                'count': 1
            })
        
        server_metrics = parsed_stream.map(extract_server_metrics)
        
        def aggregate_server_metrics(metrics1, metrics2):
            return {
                'cpu': metrics1['cpu'] + metrics2['cpu'],
                'memory': metrics1['memory'] + metrics2['memory'],
                'disk': metrics1['disk'] + metrics2['disk'],
                'count': metrics1['count'] + metrics2['count']
            }
        
        server_aggregates = server_metrics.reduceByKey(aggregate_server_metrics)
        
        def calculate_averages(server_data):
            server, metrics = server_data
            count = metrics['count']
            return (server, {
                'avg_cpu': metrics['cpu'] / count,
                'avg_memory': metrics['memory'] / count,
                'avg_disk': metrics['disk'] / count,
                'sample_count': count
            })
        
        server_averages = server_aggregates.map(calculate_averages)
        
        def print_server_metrics(time, rdd):
            if not rdd.isEmpty():
                metrics = rdd.collectAsMap()
                print(f"\n服务器性能指标 - {time}:")
                for server, data in metrics.items():
                    print(f"  {server}: CPU={data['avg_cpu']:.1f}%, "
                          f"Memory={data['avg_memory']:.1f}%, "
                          f"Disk={data['avg_disk']:.1f}% "
                          f"(samples: {data['sample_count']})")
                print("-" * 60)
        
        server_averages.foreachRDD(print_server_metrics)
        
        # 2. 服务状态监控
        print("\n服务状态监控:")
        
        def extract_service_metrics(record):
            return (record['service'], {
                'requests': record['metrics']['requests_per_second'],
                'errors': record['metrics']['error_count'],
                'response_time': record['metrics']['response_time'],
                'count': 1
            })
        
        service_metrics = parsed_stream.map(extract_service_metrics)
        service_aggregates = service_metrics.reduceByKey(
            lambda a, b: {
                'requests': a['requests'] + b['requests'],
                'errors': a['errors'] + b['errors'],
                'response_time': a['response_time'] + b['response_time'],
                'count': a['count'] + b['count']
            }
        )
        
        def calculate_service_kpis(service_data):
            service, metrics = service_data
            count = metrics['count']
            total_requests = metrics['requests']
            total_errors = metrics['errors']
            
            error_rate = (total_errors / total_requests * 100) if total_requests > 0 else 0
            avg_response_time = metrics['response_time'] / count
            
            return (service, {
                'total_requests': total_requests,
                'total_errors': total_errors,
                'error_rate': error_rate,
                'avg_response_time': avg_response_time
            })
        
        service_kpis = service_aggregates.map(calculate_service_kpis)
        
        def print_service_kpis(time, rdd):
            if not rdd.isEmpty():
                kpis = rdd.collectAsMap()
                print(f"\n服务KPI指标 - {time}:")
                for service, data in kpis.items():
                    print(f"  {service}: Requests={data['total_requests']:.0f}, "
                          f"Errors={data['total_errors']:.0f}, "
                          f"Error Rate={data['error_rate']:.2f}%, "
                          f"Avg Response={data['avg_response_time']:.1f}ms")
                print("-" * 70)
        
        service_kpis.foreachRDD(print_service_kpis)
        
        return server_averages, service_kpis
    
    def anomaly_detection(self, data_stream):
        """
        异常检测和告警
        """
        print("\n3. 异常检测和告警:")
        
        # 解析数据
        parsed_stream = data_stream.map(lambda x: json.loads(x)).filter(lambda x: x is not None)
        
        # 1. 阈值检测
        def detect_threshold_anomalies(record):
            anomalies = []
            metrics = record['metrics']
            server = record['server']
            service = record['service']
            
            # 检查各项指标
            for metric, threshold in self.alert_thresholds.items():
                if metric in metrics and metrics[metric] > threshold:
                    anomalies.append({
                        'type': 'threshold',
                        'server': server,
                        'service': service,
                        'metric': metric,
                        'value': metrics[metric],
                        'threshold': threshold,
                        'severity': 'high' if metrics[metric] > threshold * 1.2 else 'medium',
                        'timestamp': record['timestamp']
                    })
            
            return anomalies
        
        threshold_anomalies = parsed_stream.flatMap(detect_threshold_anomalies)
        
        def print_threshold_alerts(time, rdd):
            if not rdd.isEmpty():
                alerts = rdd.collect()
                if alerts:
                    print(f"\n🚨 阈值告警 - {time}:")
                    for alert in alerts:
                        print(f"  [{alert['severity'].upper()}] {alert['server']}/{alert['service']}: "
                              f"{alert['metric']} = {alert['value']:.1f} "
                              f"(threshold: {alert['threshold']})")
                    print("-" * 60)
        
        threshold_anomalies.foreachRDD(print_threshold_alerts)
        
        # 2. 趋势异常检测
        print("\n趋势异常检测:")
        
        # 使用窗口操作检测趋势
        windowed_stream = parsed_stream.window(20, 10)  # 20秒窗口,10秒滑动
        
        def detect_trend_anomalies(time, rdd):
            if not rdd.isEmpty():
                records = rdd.collect()
                
                # 按服务器分组
                server_data = {}
                for record in records:
                    server = record['server']
                    if server not in server_data:
                        server_data[server] = []
                    server_data[server].append(record)
                
                print(f"\n📈 趋势分析 - {time}:")
                
                for server, data in server_data.items():
                    if len(data) >= 3:  # 至少3个数据点
                        # 计算CPU使用率趋势
                        cpu_values = [r['metrics']['cpu_usage'] for r in data]
                        cpu_trend = (cpu_values[-1] - cpu_values[0]) / len(cpu_values)
                        
                        # 检测急剧上升趋势
                        if cpu_trend > 5:  # CPU每个时间点上升超过5%
                            print(f"  ⚠️  {server}: CPU使用率急剧上升 (趋势: +{cpu_trend:.1f}%/点)")
                        
                        # 检测内存泄漏模式
                        memory_values = [r['metrics']['memory_usage'] for r in data]
                        memory_trend = (memory_values[-1] - memory_values[0]) / len(memory_values)
                        
                        if memory_trend > 3:  # 内存持续上升
                            print(f"  🔍 {server}: 疑似内存泄漏 (趋势: +{memory_trend:.1f}%/点)")
                
                print("-" * 60)
        
        windowed_stream.foreachRDD(detect_trend_anomalies)
        
        # 3. 关联异常检测
        print("\n关联异常检测:")
        
        def detect_correlation_anomalies(time, rdd):
            if not rdd.isEmpty():
                records = rdd.collect()
                
                # 检测服务间的异常关联
                service_errors = {}
                for record in records:
                    service = record['service']
                    error_count = record['metrics']['error_count']
                    
                    if service not in service_errors:
                        service_errors[service] = []
                    service_errors[service].append(error_count)
                
                print(f"\n🔗 关联异常检测 - {time}:")
                
                # 检测多服务同时出现错误
                high_error_services = []
                for service, errors in service_errors.items():
                    avg_errors = sum(errors) / len(errors)
                    if avg_errors > 10:  # 平均错误数超过10
                        high_error_services.append(service)
                
                if len(high_error_services) >= 2:
                    print(f"  🚨 多服务异常: {', '.join(high_error_services)} 同时出现高错误率")
                    print(f"     可能原因: 网络问题、共享资源故障、级联失败")
                
                print("-" * 60)
        
        windowed_stream.foreachRDD(detect_correlation_anomalies)
        
        return threshold_anomalies
    
    def real_time_dashboard(self, server_metrics, service_kpis):
        """
        实时仪表板
        """
        print("\n4. 实时仪表板:")
        
        # 系统总览
        def generate_system_overview(time, rdd):
            if not rdd.isEmpty():
                metrics = rdd.collectAsMap()
                
                print(f"\n📊 系统总览 - {time}")
                print("=" * 50)
                
                total_servers = len(metrics)
                high_cpu_servers = sum(1 for data in metrics.values() if data['avg_cpu'] > 70)
                high_memory_servers = sum(1 for data in metrics.values() if data['avg_memory'] > 80)
                
                print(f"总服务器数: {total_servers}")
                print(f"高CPU使用率服务器: {high_cpu_servers}")
                print(f"高内存使用率服务器: {high_memory_servers}")
                
                # 健康状态
                if high_cpu_servers == 0 and high_memory_servers == 0:
                    status = "🟢 健康"
                elif high_cpu_servers <= 1 and high_memory_servers <= 1:
                    status = "🟡 警告"
                else:
                    status = "🔴 严重"
                
                print(f"系统状态: {status}")
                print("=" * 50)
        
        server_metrics.foreachRDD(generate_system_overview)
        
        # 服务性能排行
        def generate_service_ranking(time, rdd):
            if not rdd.isEmpty():
                kpis = rdd.collectAsMap()
                
                print(f"\n🏆 服务性能排行 - {time}")
                print("-" * 40)
                
                # 按错误率排序
                sorted_services = sorted(kpis.items(), key=lambda x: x[1]['error_rate'], reverse=True)
                
                print("错误率排行 (高到低):")
                for i, (service, data) in enumerate(sorted_services[:3]):
                    rank_emoji = ["🥇", "🥈", "🥉"][i] if i < 3 else "📍"
                    print(f"  {rank_emoji} {service}: {data['error_rate']:.2f}%")
                
                # 按响应时间排序
                sorted_by_response = sorted(kpis.items(), key=lambda x: x[1]['avg_response_time'], reverse=True)
                
                print("\n响应时间排行 (慢到快):")
                for i, (service, data) in enumerate(sorted_by_response[:3]):
                    rank_emoji = ["🐌", "🚶", "🏃"][i] if i < 3 else "📍"
                    print(f"  {rank_emoji} {service}: {data['avg_response_time']:.1f}ms")
                
                print("-" * 40)
        
        service_kpis.foreachRDD(generate_service_ranking)
    
    def performance_optimization(self):
        """
        性能优化建议
        """
        print("\n5. 性能优化建议:")
        
        optimization_tips = {
            "批处理间隔优化": {
                "建议": "根据数据量和延迟要求调整批处理间隔",
                "配置": "StreamingContext(sc, batch_interval)",
                "注意事项": "间隔太短会增加开销,太长会增加延迟"
            },
            "背压控制": {
                "建议": "启用背压机制,防止数据积压",
                "配置": "spark.streaming.backpressure.enabled=true",
                "注意事项": "监控处理速度,及时调整资源"
            },
            "内存管理": {
                "建议": "合理配置内存,避免频繁GC",
                "配置": "spark.executor.memory, spark.streaming.receiver.maxRate",
                "注意事项": "监控内存使用,预防OOM"
            },
            "检查点优化": {
                "建议": "设置合适的检查点间隔和存储",
                "配置": "ssc.checkpoint(), 检查点间隔=5-10个批次",
                "注意事项": "使用可靠的存储系统,定期清理"
            }
        }
        
        for category, details in optimization_tips.items():
            print(f"\n{category}:")
            for key, value in details.items():
                print(f"  {key}: {value}")
    
    def run_monitoring_system(self):
        """
        运行完整的监控系统
        """
        print("\n🚀 启动实时监控系统")
        print("=" * 30)
        
        # 生成监控数据
        data_stream = self.generate_monitoring_data()
        
        # 实时指标处理
        server_metrics, service_kpis = self.real_time_metrics_processing(data_stream)
        
        # 异常检测
        anomalies = self.anomaly_detection(data_stream)
        
        # 实时仪表板
        self.real_time_dashboard(server_metrics, service_kpis)
        
        # 性能优化建议
        self.performance_optimization()
        
        print("\n✅ 监控系统配置完成")
        print("系统将开始处理实时数据...")
        
        return data_stream, server_metrics, service_kpis, anomalies
    
    def cleanup(self):
        """
        清理资源
        """
        if hasattr(self, 'ssc'):
            self.ssc.stop(stopSparkContext=True, stopGraceFully=True)

# 实时监控系统演示
monitoring_system = RealTimeMonitoringSystem()

# 运行监控系统
streams = monitoring_system.run_monitoring_system()

print("\n\n实时监控系统总结:")
print("=" * 24)
print("✓ 系统架构设计完成")
print("✓ 监控数据生成完成")
print("✓ 实时指标处理完成")
print("✓ 异常检测告警完成")
print("✓ 实时仪表板完成")
print("✓ 性能优化建议完成")

print("\n系统特性:")
print("- 多维度监控指标")
print("- 智能异常检测")
print("- 实时告警机制")
print("- 可视化仪表板")
print("- 性能优化建议")

print("\n应用价值:")
print("- 提高系统可观测性")
print("- 快速发现和定位问题")
print("- 预防系统故障")
print("- 优化资源使用")
print("- 提升运维效率")

# 清理资源
monitoring_system.cleanup()

4.7 本章小结

核心概念回顾

本章深入学习了 Spark Streaming 流处理技术,掌握了以下核心概念:

1. 流处理基础

  • 流处理概念:理解了流处理与批处理的区别和联系
  • Spark Streaming架构:掌握了微批处理架构和核心组件
  • 应用场景:了解了实时监控、欺诈检测、推荐系统等典型应用

2. DStream编程

  • DStream基础:掌握了DStream的创建、转换和输出操作
  • 高级转换:学会了连接、窗口、transform等高级操作
  • 性能优化:了解了批处理间隔、并行度、内存管理等优化技巧

3. 窗口操作

  • 窗口概念:理解了窗口长度、滑动间隔等核心参数
  • 窗口函数:掌握了reduceByWindow、countByWindow等窗口操作
  • 性能优化:学会了增量计算和窗口优化策略

4. 状态管理

  • 状态概念:理解了有状态计算的重要性和应用场景
  • updateStateByKey:掌握了基础状态更新操作
  • mapWithState:学会了高性能状态管理API

5. 数据源集成

  • 多种数据源:掌握了Socket、File、Kafka等数据源的使用
  • 自定义接收器:学会了开发自定义数据接收器
  • 数据源选择:了解了不同数据源的特点和适用场景

实践技能总结

通过本章学习,你应该掌握以下实践技能:

1. 流处理应用开发

# 基础流处理应用结构
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext

# 1. 创建Spark Streaming上下文
conf = SparkConf().setAppName("StreamingApp").setMaster("local[2]")
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, batch_interval)

# 2. 创建DStream
data_stream = ssc.socketTextStream("localhost", 9999)

# 3. 数据处理
processed_stream = data_stream.map(process_function)

# 4. 输出结果
processed_stream.pprint()

# 5. 启动和等待
ssc.start()
ssc.awaitTermination()

2. 窗口操作实现

# 滑动窗口聚合
windowed_counts = word_counts.reduceByKeyAndWindow(
    lambda a, b: a + b,
    lambda a, b: a - b,
    windowDuration=30,
    slideDuration=10
)

# 窗口内TopK
def get_top_words(time, rdd):
    if not rdd.isEmpty():
        top_words = rdd.takeOrdered(10, key=lambda x: -x[1])
        print(f"Top words at {time}: {top_words}")

windowed_counts.foreachRDD(get_top_words)

3. 状态管理实现

# updateStateByKey状态更新
def update_function(new_values, running_count):
    if running_count is None:
        running_count = 0
    return sum(new_values, running_count)

running_counts = word_counts.updateStateByKey(update_function)

# mapWithState高性能状态管理
from pyspark.streaming import StateSpec

def mapping_function(key, value, state):
    if state.exists():
        state.update(state.get() + value)
    else:
        state.update(value)
    return (key, state.get())

state_spec = StateSpec.function(mapping_function)
stateful_stream = word_counts.mapWithState(state_spec)

4. 性能优化技巧

# 1. 批处理间隔优化
ssc = StreamingContext(sc, 5)  # 5秒间隔

# 2. 背压控制
conf.set("spark.streaming.backpressure.enabled", "true")
conf.set("spark.streaming.kafka.maxRatePerPartition", "1000")

# 3. 检查点设置
ssc.checkpoint("/path/to/checkpoint")

# 4. 内存优化
conf.set("spark.executor.memory", "2g")
conf.set("spark.streaming.receiver.maxRate", "10000")

最佳实践总结

1. 架构设计最佳实践

  • 合理选择批处理间隔:平衡延迟和吞吐量
  • 设计容错机制:使用检查点和可靠的数据源
  • 监控系统性能:实时监控处理延迟和资源使用
  • 优雅处理异常:实现错误处理和恢复机制

2. 性能优化最佳实践

  • 启用背压控制:防止数据积压和内存溢出
  • 优化并行度:根据数据量和资源调整并行度
  • 使用增量计算:在窗口操作中使用增量函数
  • 合理使用缓存:缓存重复使用的DStream

3. 运维管理最佳实践

  • 设置监控告警:监控关键指标和异常情况
  • 定期清理检查点:避免检查点目录过大
  • 版本兼容性管理:注意Spark版本和依赖兼容性
  • 资源动态调整:根据负载动态调整资源配置

常见问题和解决方案

1. 性能问题

问题:处理延迟过高 解决方案: - 减少批处理间隔 - 增加并行度 - 优化数据处理逻辑 - 启用背压控制

2. 内存问题

问题:内存溢出(OOM) 解决方案: - 增加executor内存 - 减少批处理大小 - 优化数据结构 - 及时清理不需要的状态

3. 数据丢失问题

问题:数据处理不一致 解决方案: - 使用可靠的数据源(如Kafka) - 启用检查点机制 - 实现幂等性处理 - 监控数据处理状态

技术发展趋势

1. Structured Streaming

  • 更高级的API:基于DataFrame和Dataset的流处理
  • 端到端一致性:提供exactly-once语义保证
  • 更好的性能:优化的查询执行引擎

2. 实时机器学习

  • 在线学习:支持模型的实时更新和预测
  • 特征工程:实时特征计算和存储
  • 模型服务:低延迟的模型推理服务

3. 云原生流处理

  • 容器化部署:支持Kubernetes等容器平台
  • 自动扩缩容:根据负载自动调整资源
  • 多云支持:支持多云环境的流处理

下一章预告:Spark MLlib机器学习

在下一章中,我们将学习:

5.1 机器学习基础

  • 机器学习概述和Spark MLlib介绍
  • 数据预处理和特征工程
  • 模型训练和评估流程

5.2 分类算法

  • 逻辑回归、决策树、随机森林
  • 支持向量机、朴素贝叶斯
  • 模型选择和超参数调优

5.3 回归算法

  • 线性回归、岭回归、Lasso回归
  • 回归树、梯度提升树
  • 回归模型评估指标

5.4 聚类算法

  • K-means聚类、高斯混合模型
  • 层次聚类、DBSCAN
  • 聚类效果评估

5.5 推荐系统

  • 协同过滤算法
  • 矩阵分解技术
  • 推荐系统评估

5.6 实际案例

  • 用户行为预测系统
  • 商品推荐引擎
  • 异常检测系统

练习题

基础练习

练习1:实时词频统计

要求: 1. 创建一个Socket DStream,监听本地9999端口 2. 实现实时词频统计功能 3. 每10秒输出当前词频统计结果 4. 添加词频过滤,只显示出现次数大于5的单词

提示

# 使用nc命令发送测试数据
# nc -lk 9999
# 然后输入文本数据

练习2:滑动窗口分析

要求: 1. 基于练习1,添加30秒滑动窗口(每10秒滑动一次) 2. 计算窗口内的词频统计 3. 实现窗口内Top10热词排行 4. 比较当前窗口与上一个窗口的词频变化

练习3:状态管理应用

要求: 1. 实现用户会话跟踪系统 2. 跟踪每个用户的累计访问次数 3. 计算用户的平均会话时长 4. 检测长时间未活跃的用户

进阶练习

练习4:多数据源集成

要求: 1. 同时处理文件数据源和Socket数据源 2. 实现数据源的联合处理和分析 3. 对不同数据源的数据进行关联分析 4. 实现数据源的故障切换机制

练习5:实时异常检测

要求: 1. 模拟系统监控数据流 2. 实现基于阈值的异常检测 3. 实现基于统计的异常检测(如3σ原则) 4. 添加异常告警和通知机制

练习6:性能优化实践

要求: 1. 创建一个高负载的流处理应用 2. 测试不同批处理间隔的性能影响 3. 比较不同并行度设置的效果 4. 实现背压控制和监控

综合项目

项目:实时日志分析系统

项目描述: 构建一个完整的实时日志分析系统,包括:

功能要求: 1. 数据接入:支持多种日志格式和数据源 2. 实时解析:解析日志字段,提取关键信息 3. 统计分析:实时计算各种统计指标 4. 异常检测:检测异常访问模式和错误 5. 可视化展示:实时展示分析结果

技术要求: 1. 使用Kafka作为数据源 2. 实现多级数据处理流水线 3. 使用状态管理跟踪用户会话 4. 实现窗口操作进行趋势分析 5. 添加性能监控和优化

评估标准: - 代码质量和架构设计(30%) - 功能完整性和正确性(40%) - 性能优化和可扩展性(20%) - 文档和测试覆盖率(10%)

思考题

  1. 架构设计:如何设计一个支持百万级QPS的实时流处理系统?需要考虑哪些关键因素?

  2. 容错机制:在流处理系统中,如何保证数据的一致性和可靠性?有哪些容错策略?

  3. 性能优化:影响Spark Streaming性能的主要因素有哪些?如何进行系统性的性能调优?

  4. 技术选型:在什么情况下选择Spark Streaming,什么情况下选择其他流处理框架(如Flink、Storm)?

  5. 未来发展:Structured Streaming相比DStream有哪些优势?在实际项目中如何选择?


本章学习目标检查清单

  • [ ] 理解流处理的基本概念和应用场景
  • [ ] 掌握DStream的创建、转换和输出操作
  • [ ] 熟练使用窗口操作进行时间序列分析
  • [ ] 掌握状态管理的原理和实现方法
  • [ ] 了解多种数据源的集成方式
  • [ ] 能够开发完整的实时流处理应用
  • [ ] 掌握流处理系统的性能优化技巧
  • [ ] 了解流处理系统的运维和监控

完成本章学习后,你应该能够独立设计和开发实时流处理系统,解决实际业务中的流数据处理需求。