1.1 Apache Spark概述

1.1.1 什么是Apache Spark

Apache Spark是一个开源的分布式计算框架,专为大数据处理而设计。它提供了一个统一的分析引擎,支持大规模数据处理,具有以下特点:

  • 速度快: 内存计算比Hadoop MapReduce快100倍,磁盘计算快10倍
  • 易用性: 提供Java、Scala、Python、R等多种语言API
  • 通用性: 支持批处理、流处理、机器学习、图计算等多种工作负载
  • 容错性: 通过RDD的血缘关系实现自动容错恢复

1.1.2 Spark生态系统

import matplotlib.pyplot as plt
import matplotlib.patches as patches
from matplotlib.patches import FancyBboxPatch
import numpy as np

class SparkEcosystemVisualizer:
    """
    Spark生态系统可视化
    """
    
    def __init__(self):
        self.components = {}
        self.setup_components()
    
    def setup_components(self):
        """
        设置Spark生态系统组件
        """
        self.components = {
            'Spark Core': {
                'description': 'Spark的核心引擎,提供RDD、任务调度、内存管理等基础功能',
                'features': ['RDD抽象', '任务调度', '内存管理', '容错机制'],
                'position': (0.5, 0.2),
                'color': '#FF6B6B'
            },
            'Spark SQL': {
                'description': '结构化数据处理模块,支持SQL查询和DataFrame API',
                'features': ['SQL查询', 'DataFrame API', '数据源连接', 'Catalyst优化器'],
                'position': (0.2, 0.6),
                'color': '#4ECDC4'
            },
            'Spark Streaming': {
                'description': '实时流数据处理模块,支持微批处理和结构化流',
                'features': ['微批处理', '结构化流', '窗口操作', '状态管理'],
                'position': (0.8, 0.6),
                'color': '#45B7D1'
            },
            'MLlib': {
                'description': '机器学习库,提供分布式机器学习算法',
                'features': ['分类算法', '回归算法', '聚类算法', '推荐系统'],
                'position': (0.2, 0.8),
                'color': '#96CEB4'
            },
            'GraphX': {
                'description': '图计算库,支持图算法和图分析',
                'features': ['图构建', '图算法', '图分析', '图可视化'],
                'position': (0.8, 0.8),
                'color': '#FFEAA7'
            }
        }
    
    def visualize_ecosystem(self):
        """
        可视化Spark生态系统
        """
        fig, ax = plt.subplots(1, 1, figsize=(14, 10))
        
        # 绘制组件
        for name, info in self.components.items():
            x, y = info['position']
            
            # 绘制组件框
            if name == 'Spark Core':
                # 核心组件使用更大的框
                box = FancyBboxPatch(
                    (x-0.15, y-0.08), 0.3, 0.16,
                    boxstyle="round,pad=0.02",
                    facecolor=info['color'],
                    edgecolor='black',
                    linewidth=2,
                    alpha=0.8
                )
            else:
                box = FancyBboxPatch(
                    (x-0.1, y-0.06), 0.2, 0.12,
                    boxstyle="round,pad=0.02",
                    facecolor=info['color'],
                    edgecolor='black',
                    linewidth=1.5,
                    alpha=0.8
                )
            
            ax.add_patch(box)
            
            # 添加组件名称
            ax.text(x, y, name, ha='center', va='center', 
                   fontsize=12, fontweight='bold', color='white')
            
            # 绘制连接线到核心
            if name != 'Spark Core':
                core_x, core_y = self.components['Spark Core']['position']
                ax.plot([x, core_x], [y, core_y], 'k--', alpha=0.5, linewidth=1)
        
        # 添加标题和说明
        ax.set_title('Apache Spark生态系统架构', fontsize=16, fontweight='bold', pad=20)
        
        # 添加组件说明
        description_text = ""
        for name, info in self.components.items():
            description_text += f"• {name}: {info['description']}\n"
        
        ax.text(0.02, 0.02, description_text, transform=ax.transAxes, 
               fontsize=9, verticalalignment='bottom',
               bbox=dict(boxstyle='round', facecolor='lightgray', alpha=0.8))
        
        ax.set_xlim(0, 1)
        ax.set_ylim(0, 1)
        ax.set_aspect('equal')
        ax.axis('off')
        
        plt.tight_layout()
        plt.show()
    
    def compare_with_hadoop(self):
        """
        Spark与Hadoop对比
        """
        print("\nSpark vs Hadoop MapReduce 对比:")
        print("=" * 50)
        
        comparison_data = {
            '特性': ['处理速度', '易用性', '内存使用', '容错机制', '实时处理', '学习曲线'],
            'Spark': ['快(内存计算)', '高(多语言API)', '高效(内存缓存)', '自动(RDD血缘)', '支持', '较平缓'],
            'Hadoop MapReduce': ['慢(磁盘I/O)', '低(Java为主)', '低(磁盘存储)', '手动(检查点)', '不支持', '较陡峭']
        }
        
        # 创建对比表格
        fig, ax = plt.subplots(figsize=(12, 8))
        
        # 隐藏坐标轴
        ax.axis('tight')
        ax.axis('off')
        
        # 创建表格
        table_data = []
        for i in range(len(comparison_data['特性'])):
            table_data.append([
                comparison_data['特性'][i],
                comparison_data['Spark'][i],
                comparison_data['Hadoop MapReduce'][i]
            ])
        
        table = ax.table(cellText=table_data,
                        colLabels=['特性', 'Apache Spark', 'Hadoop MapReduce'],
                        cellLoc='center',
                        loc='center',
                        colColours=['lightblue', 'lightgreen', 'lightcoral'])
        
        table.auto_set_font_size(False)
        table.set_fontsize(10)
        table.scale(1.2, 2)
        
        # 设置表格样式
        for i in range(len(table_data) + 1):
            for j in range(3):
                cell = table[(i, j)]
                if i == 0:  # 标题行
                    cell.set_text_props(weight='bold')
                cell.set_edgecolor('black')
                cell.set_linewidth(1)
        
        plt.title('Apache Spark vs Hadoop MapReduce 特性对比', 
                 fontsize=14, fontweight='bold', pad=20)
        plt.show()
        
        return comparison_data
    
    def show_use_cases(self):
        """
        展示Spark应用场景
        """
        print("\nApache Spark主要应用场景:")
        print("=" * 40)
        
        use_cases = {
            '批处理': {
                'description': '大规模数据的批量处理和分析',
                'examples': ['ETL处理', '数据仓库', '报表生成', '数据清洗'],
                'advantages': ['高吞吐量', '容错性强', '易于扩展']
            },
            '实时流处理': {
                'description': '实时数据流的处理和分析',
                'examples': ['实时监控', '欺诈检测', '推荐系统', '日志分析'],
                'advantages': ['低延迟', '高吞吐', '状态管理']
            },
            '机器学习': {
                'description': '大规模机器学习模型训练和预测',
                'examples': ['分类预测', '聚类分析', '推荐算法', '特征工程'],
                'advantages': ['分布式计算', '内置算法', 'Pipeline支持']
            },
            '图计算': {
                'description': '大规模图数据的分析和处理',
                'examples': ['社交网络分析', '网页排名', '路径查找', '社区发现'],
                'advantages': ['图抽象', '内置算法', '可视化支持']
            },
            '交互式分析': {
                'description': '交互式数据探索和分析',
                'examples': ['数据探索', 'Ad-hoc查询', '可视化分析', '原型开发'],
                'advantages': ['快速响应', '易于使用', '多语言支持']
            }
        }
        
        for use_case, details in use_cases.items():
            print(f"\n{use_case}:")
            print(f"  描述: {details['description']}")
            print(f"  示例: {', '.join(details['examples'])}")
            print(f"  优势: {', '.join(details['advantages'])}")
        
        # 可视化应用场景
        fig, ax = plt.subplots(figsize=(12, 8))
        
        # 创建饼图显示不同应用场景的重要性
        labels = list(use_cases.keys())
        sizes = [20, 25, 30, 15, 10]  # 假设的重要性权重
        colors = ['#FF9999', '#66B2FF', '#99FF99', '#FFCC99', '#FF99CC']
        explode = (0.05, 0.05, 0.1, 0.05, 0.05)  # 突出机器学习
        
        wedges, texts, autotexts = ax.pie(sizes, explode=explode, labels=labels, 
                                          colors=colors, autopct='%1.1f%%',
                                          shadow=True, startangle=90)
        
        # 美化文本
        for autotext in autotexts:
            autotext.set_color('white')
            autotext.set_fontweight('bold')
        
        ax.set_title('Apache Spark应用场景分布', fontsize=14, fontweight='bold')
        
        plt.tight_layout()
        plt.show()
        
        return use_cases

# Spark生态系统演示
eco_visualizer = SparkEcosystemVisualizer()

print("Apache Spark生态系统概览:")
print("=" * 40)

# 可视化生态系统
eco_visualizer.visualize_ecosystem()

# 对比分析
comparison = eco_visualizer.compare_with_hadoop()

# 应用场景
use_cases = eco_visualizer.show_use_cases()

1.1.3 Spark核心概念

class SparkCoreConceptsDemo:
    """
    Spark核心概念演示
    """
    
    def __init__(self):
        self.concepts = {}
        self.setup_concepts()
    
    def setup_concepts(self):
        """
        设置核心概念
        """
        self.concepts = {
            'RDD': {
                'name': 'Resilient Distributed Dataset',
                'description': '弹性分布式数据集,Spark的基本数据抽象',
                'characteristics': ['不可变', '分布式', '容错', '惰性求值'],
                'operations': ['Transformation', 'Action']
            },
            'DataFrame': {
                'name': 'DataFrame',
                'description': '结构化数据抽象,类似于关系数据库中的表',
                'characteristics': ['Schema', '优化', '类型安全', 'SQL支持'],
                'operations': ['select', 'filter', 'groupBy', 'join']
            },
            'Dataset': {
                'name': 'Dataset',
                'description': '类型安全的数据抽象,结合了RDD和DataFrame的优点',
                'characteristics': ['类型安全', '编译时检查', '优化', '面向对象'],
                'operations': ['map', 'filter', 'reduce', 'collect']
            },
            'SparkContext': {
                'name': 'SparkContext',
                'description': 'Spark应用程序的入口点,协调集群资源',
                'characteristics': ['资源管理', '任务调度', '配置管理', '监控'],
                'operations': ['parallelize', 'textFile', 'broadcast', 'accumulator']
            },
            'SparkSession': {
                'name': 'SparkSession',
                'description': 'Spark 2.0+的统一入口点,整合了多个上下文',
                'characteristics': ['统一接口', '配置管理', '资源管理', '向后兼容'],
                'operations': ['createDataFrame', 'sql', 'read', 'write']
            }
        }
    
    def explain_rdd_concept(self):
        """
        详细解释RDD概念
        """
        print("\nRDD (Resilient Distributed Dataset) 详解:")
        print("=" * 50)
        
        # RDD特性说明
        rdd_features = {
            '弹性 (Resilient)': {
                'description': '自动容错,通过血缘关系重新计算丢失的分区',
                'example': '节点故障时自动恢复数据'
            },
            '分布式 (Distributed)': {
                'description': '数据分布在集群的多个节点上',
                'example': '大文件自动分割到多个机器'
            },
            '数据集 (Dataset)': {
                'description': '数据的集合,可以是任何类型的对象',
                'example': '文本行、数字、自定义对象等'
            },
            '不可变 (Immutable)': {
                'description': 'RDD创建后不能修改,只能通过转换创建新RDD',
                'example': 'filter操作创建新的RDD而不修改原RDD'
            },
            '惰性求值 (Lazy Evaluation)': {
                'description': '转换操作不会立即执行,直到遇到行动操作',
                'example': 'map和filter不执行,collect时才执行'
            }
        }
        
        for feature, details in rdd_features.items():
            print(f"\n{feature}:")
            print(f"  描述: {details['description']}")
            print(f"  示例: {details['example']}")
        
        # RDD操作类型
        print("\n\nRDD操作类型:")
        print("-" * 30)
        
        transformations = [
            ('map', '对每个元素应用函数'),
            ('filter', '过滤满足条件的元素'),
            ('flatMap', '扁平化映射'),
            ('union', '合并两个RDD'),
            ('join', '连接两个RDD'),
            ('groupByKey', '按键分组'),
            ('reduceByKey', '按键归约')
        ]
        
        actions = [
            ('collect', '收集所有元素到驱动程序'),
            ('count', '计算元素数量'),
            ('first', '返回第一个元素'),
            ('take', '返回前n个元素'),
            ('reduce', '归约所有元素'),
            ('foreach', '对每个元素执行操作'),
            ('saveAsTextFile', '保存到文件')
        ]
        
        print("转换操作 (Transformations):")
        for op, desc in transformations:
            print(f"  {op}: {desc}")
        
        print("\n行动操作 (Actions):")
        for op, desc in actions:
            print(f"  {op}: {desc}")
    
    def demonstrate_lazy_evaluation(self):
        """
        演示惰性求值
        """
        print("\n\n惰性求值演示:")
        print("=" * 30)
        
        # 模拟RDD操作序列
        operations = [
            ('创建RDD', 'rdd = sc.parallelize([1, 2, 3, 4, 5])', '不执行'),
            ('map转换', 'rdd2 = rdd.map(lambda x: x * 2)', '不执行'),
            ('filter转换', 'rdd3 = rdd2.filter(lambda x: x > 5)', '不执行'),
            ('collect行动', 'result = rdd3.collect()', '执行所有操作')
        ]
        
        print("操作序列:")
        for i, (name, code, status) in enumerate(operations, 1):
            print(f"{i}. {name}: {code} -> {status}")
        
        # 可视化执行流程
        fig, ax = plt.subplots(figsize=(12, 6))
        
        # 绘制操作流程
        x_positions = np.arange(len(operations))
        colors = ['lightblue', 'lightblue', 'lightblue', 'lightcoral']
        
        for i, (name, code, status) in enumerate(operations):
            # 绘制操作框
            rect = patches.Rectangle((i-0.4, 0.3), 0.8, 0.4, 
                                   linewidth=2, edgecolor='black', 
                                   facecolor=colors[i], alpha=0.7)
            ax.add_patch(rect)
            
            # 添加操作名称
            ax.text(i, 0.5, name, ha='center', va='center', 
                   fontweight='bold', fontsize=10)
            
            # 添加执行状态
            ax.text(i, 0.1, status, ha='center', va='center', 
                   fontsize=9, style='italic')
            
            # 绘制箭头
            if i < len(operations) - 1:
                ax.arrow(i+0.4, 0.5, 0.2, 0, head_width=0.05, 
                        head_length=0.05, fc='black', ec='black')
        
        ax.set_xlim(-0.5, len(operations)-0.5)
        ax.set_ylim(0, 1)
        ax.set_title('Spark惰性求值执行流程', fontsize=14, fontweight='bold')
        ax.axis('off')
        
        # 添加说明
        ax.text(0.02, 0.95, '蓝色: 转换操作(惰性)\n红色: 行动操作(立即执行)', 
               transform=ax.transAxes, fontsize=10,
               bbox=dict(boxstyle='round', facecolor='white', alpha=0.8))
        
        plt.tight_layout()
        plt.show()
    
    def compare_abstractions(self):
        """
        比较不同数据抽象
        """
        print("\n\nSpark数据抽象比较:")
        print("=" * 40)
        
        abstractions = {
            'RDD': {
                'level': '低级API',
                'type_safety': '运行时',
                'optimization': '无',
                'ease_of_use': '复杂',
                'performance': '基准',
                'use_case': '底层操作、非结构化数据'
            },
            'DataFrame': {
                'level': '高级API',
                'type_safety': '运行时',
                'optimization': 'Catalyst优化器',
                'ease_of_use': '简单',
                'performance': '优化后',
                'use_case': '结构化数据、SQL查询'
            },
            'Dataset': {
                'level': '高级API',
                'type_safety': '编译时',
                'optimization': 'Catalyst优化器',
                'ease_of_use': '中等',
                'performance': '优化后',
                'use_case': '类型安全、面向对象'
            }
        }
        
        # 创建比较表格
        fig, ax = plt.subplots(figsize=(14, 8))
        ax.axis('tight')
        ax.axis('off')
        
        # 准备表格数据
        headers = ['特性', 'RDD', 'DataFrame', 'Dataset']
        table_data = []
        
        features = ['API级别', '类型安全', '性能优化', '易用性', '性能表现', '适用场景']
        feature_keys = ['level', 'type_safety', 'optimization', 'ease_of_use', 'performance', 'use_case']
        
        for i, feature in enumerate(features):
            row = [feature]
            for abs_name in ['RDD', 'DataFrame', 'Dataset']:
                row.append(abstractions[abs_name][feature_keys[i]])
            table_data.append(row)
        
        # 创建表格
        table = ax.table(cellText=table_data,
                        colLabels=headers,
                        cellLoc='center',
                        loc='center',
                        colColours=['lightgray', 'lightblue', 'lightgreen', 'lightyellow'])
        
        table.auto_set_font_size(False)
        table.set_fontsize(10)
        table.scale(1.2, 2)
        
        # 设置表格样式
        for i in range(len(table_data) + 1):
            for j in range(4):
                cell = table[(i, j)]
                if i == 0:  # 标题行
                    cell.set_text_props(weight='bold')
                cell.set_edgecolor('black')
                cell.set_linewidth(1)
        
        plt.title('Spark数据抽象特性比较', fontsize=14, fontweight='bold', pad=20)
        plt.show()
        
        return abstractions

# 核心概念演示
concepts_demo = SparkCoreConceptsDemo()

# RDD概念详解
concepts_demo.explain_rdd_concept()

# 惰性求值演示
concepts_demo.demonstrate_lazy_evaluation()

# 数据抽象比较
abstractions_comparison = concepts_demo.compare_abstractions()

1.2 Spark环境搭建

1.2.1 本地开发环境搭建

class SparkEnvironmentSetup:
    """
    Spark环境搭建指南
    """
    
    def __init__(self):
        self.setup_steps = {}
        self.requirements = {}
    
    def show_requirements(self):
        """
        显示环境要求
        """
        print("Spark环境搭建要求:")
        print("=" * 30)
        
        requirements = {
            'Java': {
                'version': 'Java 8 或 Java 11',
                'reason': 'Spark运行在JVM上',
                'check_command': 'java -version'
            },
            'Python': {
                'version': 'Python 3.6+',
                'reason': 'PySpark需要Python环境',
                'check_command': 'python --version'
            },
            'Scala': {
                'version': 'Scala 2.12 (可选)',
                'reason': 'Spark原生语言,性能最佳',
                'check_command': 'scala -version'
            },
            'Memory': {
                'version': '最少4GB RAM',
                'reason': '内存计算需要足够内存',
                'check_command': '系统信息查看'
            }
        }
        
        for component, details in requirements.items():
            print(f"\n{component}:")
            print(f"  版本要求: {details['version']}")
            print(f"  原因: {details['reason']}")
            print(f"  检查命令: {details['check_command']}")
        
        return requirements
    
    def installation_guide(self):
        """
        安装指南
        """
        print("\n\nSpark安装步骤:")
        print("=" * 30)
        
        steps = {
            '1. 下载Spark': {
                'description': '从Apache官网下载Spark',
                'commands': [
                    'wget https://downloads.apache.org/spark/spark-3.4.0/spark-3.4.0-bin-hadoop3.tgz',
                    'tar -xzf spark-3.4.0-bin-hadoop3.tgz',
                    'mv spark-3.4.0-bin-hadoop3 /opt/spark'
                ],
                'notes': '选择与Hadoop版本兼容的Spark版本'
            },
            '2. 配置环境变量': {
                'description': '设置SPARK_HOME和PATH',
                'commands': [
                    'export SPARK_HOME=/opt/spark',
                    'export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin',
                    'export PYSPARK_PYTHON=python3'
                ],
                'notes': '添加到~/.bashrc或~/.zshrc文件中'
            },
            '3. 验证安装': {
                'description': '测试Spark是否正确安装',
                'commands': [
                    'spark-shell --version',
                    'pyspark --version',
                    'spark-submit --version'
                ],
                'notes': '所有命令都应该显示版本信息'
            },
            '4. 启动Spark': {
                'description': '启动Spark服务',
                'commands': [
                    'start-master.sh',
                    'start-worker.sh spark://localhost:7077',
                    'pyspark'
                ],
                'notes': 'Web UI可通过http://localhost:8080访问'
            }
        }
        
        for step, details in steps.items():
            print(f"\n{step}: {details['description']}")
            print("命令:")
            for cmd in details['commands']:
                print(f"  $ {cmd}")
            print(f"注意: {details['notes']}")
        
        return steps
    
    def pyspark_setup(self):
        """
        PySpark环境配置
        """
        print("\n\nPySpark环境配置:")
        print("=" * 30)
        
        # pip安装方式
        print("方式1: 使用pip安装 (推荐用于开发)")
        pip_commands = [
            'pip install pyspark',
            'pip install pyspark[sql]',
            'pip install jupyter',
            'pip install matplotlib seaborn'
        ]
        
        for cmd in pip_commands:
            print(f"  $ {cmd}")
        
        # conda安装方式
        print("\n方式2: 使用conda安装")
        conda_commands = [
            'conda install -c conda-forge pyspark',
            'conda install jupyter',
            'conda install matplotlib seaborn'
        ]
        
        for cmd in conda_commands:
            print(f"  $ {cmd}")
        
        # 验证安装
        print("\n验证PySpark安装:")
        verification_code = '''
# Python代码验证
from pyspark.sql import SparkSession

# 创建SparkSession
spark = SparkSession.builder \
    .appName("TestApp") \
    .master("local[*]") \
    .getOrCreate()

# 创建简单DataFrame
data = [("Alice", 25), ("Bob", 30), ("Charlie", 35)]
columns = ["name", "age"]
df = spark.createDataFrame(data, columns)

# 显示数据
df.show()

# 停止SparkSession
spark.stop()
'''
        
        print(verification_code)
    
    def jupyter_integration(self):
        """
        Jupyter集成配置
        """
        print("\n\nJupyter Notebook集成:")
        print("=" * 30)
        
        # 环境变量配置
        print("1. 环境变量配置:")
        env_vars = [
            'export PYSPARK_DRIVER_PYTHON=jupyter',
            'export PYSPARK_DRIVER_PYTHON_OPTS="notebook"',
            'export PYSPARK_PYTHON=python3'
        ]
        
        for var in env_vars:
            print(f"  {var}")
        
        # 启动方式
        print("\n2. 启动Jupyter with PySpark:")
        startup_commands = [
            'pyspark',  # 直接启动
            'jupyter notebook',  # 或者先启动jupyter再导入pyspark
        ]
        
        for cmd in startup_commands:
            print(f"  $ {cmd}")
        
        # Jupyter配置示例
        print("\n3. Jupyter Notebook中的PySpark配置:")
        jupyter_config = '''
# 在Notebook第一个cell中运行
import os
import sys

# 设置Spark环境
os.environ['SPARK_HOME'] = '/opt/spark'  # 根据实际路径调整
sys.path.append('/opt/spark/python')
sys.path.append('/opt/spark/python/lib/py4j-0.10.9-src.zip')

from pyspark.sql import SparkSession
from pyspark.conf import SparkConf

# 创建Spark配置
conf = SparkConf() \
    .setAppName("JupyterSparkApp") \
    .setMaster("local[*]") \
    .set("spark.sql.adaptive.enabled", "true") \
    .set("spark.sql.adaptive.coalescePartitions.enabled", "true")

# 创建SparkSession
spark = SparkSession.builder.config(conf=conf).getOrCreate()
sc = spark.sparkContext

print(f"Spark版本: {spark.version}")
print(f"Spark UI: {sc.uiWebUrl}")
'''
        
        print(jupyter_config)
    
    def docker_setup(self):
        """
        Docker环境搭建
        """
        print("\n\nDocker环境搭建:")
        print("=" * 30)
        
        # Dockerfile示例
        print("1. Dockerfile示例:")
        dockerfile_content = '''
FROM openjdk:11-jre-slim

# 安装Python和必要工具
RUN apt-get update && apt-get install -y \
    python3 \
    python3-pip \
    wget \
    && rm -rf /var/lib/apt/lists/*

# 下载和安装Spark
WORKDIR /opt
RUN wget https://downloads.apache.org/spark/spark-3.4.0/spark-3.4.0-bin-hadoop3.tgz \
    && tar -xzf spark-3.4.0-bin-hadoop3.tgz \
    && mv spark-3.4.0-bin-hadoop3 spark \
    && rm spark-3.4.0-bin-hadoop3.tgz

# 设置环境变量
ENV SPARK_HOME=/opt/spark
ENV PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin
ENV PYSPARK_PYTHON=python3

# 安装Python依赖
RUN pip3 install pyspark jupyter matplotlib seaborn pandas numpy

# 工作目录
WORKDIR /workspace

# 暴露端口
EXPOSE 8080 8888 4040

# 启动命令
CMD ["jupyter", "notebook", "--ip=0.0.0.0", "--port=8888", "--no-browser", "--allow-root"]
'''
        
        print(dockerfile_content)
        
        # Docker命令
        print("\n2. Docker构建和运行命令:")
        docker_commands = [
            '# 构建镜像',
            'docker build -t spark-jupyter .',
            '',
            '# 运行容器',
            'docker run -p 8888:8888 -p 8080:8080 -p 4040:4040 -v $(pwd):/workspace spark-jupyter',
            '',
            '# 或使用docker-compose'
        ]
        
        for cmd in docker_commands:
            print(cmd)
        
        # docker-compose.yml
        print("\n3. docker-compose.yml示例:")
        compose_content = '''
version: '3.8'
services:
  spark-jupyter:
    build: .
    ports:
      - "8888:8888"  # Jupyter
      - "8080:8080"  # Spark Master UI
      - "4040:4040"  # Spark Application UI
    volumes:
      - ./workspace:/workspace
    environment:
      - JUPYTER_ENABLE_LAB=yes
'''
        
        print(compose_content)

# 环境搭建演示
setup_guide = SparkEnvironmentSetup()

print("Apache Spark环境搭建指南:")
print("=" * 40)

# 显示要求
requirements = setup_guide.show_requirements()

# 安装指南
installation_steps = setup_guide.installation_guide()

# PySpark配置
setup_guide.pyspark_setup()

# Jupyter集成
setup_guide.jupyter_integration()

# Docker环境
setup_guide.docker_setup()

1.2.2 集群环境配置

class SparkClusterSetup:
    """
    Spark集群环境配置
    """
    
    def __init__(self):
        self.cluster_modes = {}
        self.setup_cluster_modes()
    
    def setup_cluster_modes(self):
        """
        设置集群模式
        """
        self.cluster_modes = {
            'Standalone': {
                'description': 'Spark自带的简单集群管理器',
                'advantages': ['简单易用', '快速部署', '无外部依赖'],
                'disadvantages': ['功能有限', '资源管理简单'],
                'use_case': '小型集群、开发测试'
            },
            'YARN': {
                'description': 'Hadoop的资源管理器',
                'advantages': ['成熟稳定', '资源共享', '安全性好'],
                'disadvantages': ['配置复杂', '依赖Hadoop'],
                'use_case': '大型企业、Hadoop生态'
            },
            'Mesos': {
                'description': 'Apache Mesos资源管理器',
                'advantages': ['细粒度资源分配', '多框架支持'],
                'disadvantages': ['配置复杂', '学习成本高'],
                'use_case': '多框架环境、云原生'
            },
            'Kubernetes': {
                'description': '容器编排平台',
                'advantages': ['云原生', '弹性扩展', '容器化'],
                'disadvantages': ['相对较新', '配置复杂'],
                'use_case': '云环境、微服务架构'
            }
        }
    
    def compare_cluster_modes(self):
        """
        比较不同集群模式
        """
        print("Spark集群模式比较:")
        print("=" * 30)
        
        for mode, details in self.cluster_modes.items():
            print(f"\n{mode}:")
            print(f"  描述: {details['description']}")
            print(f"  优点: {', '.join(details['advantages'])}")
            print(f"  缺点: {', '.join(details['disadvantages'])}")
            print(f"  适用场景: {details['use_case']}")
        
        # 可视化比较
        self.visualize_cluster_comparison()
    
    def visualize_cluster_comparison(self):
        """
        可视化集群模式比较
        """
        fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 12))
        
        # 1. 复杂度比较
        modes = list(self.cluster_modes.keys())
        complexity_scores = [2, 4, 4, 3]  # 1-5分,5最复杂
        
        bars1 = ax1.bar(modes, complexity_scores, color=['lightgreen', 'orange', 'red', 'lightblue'])
        ax1.set_title('配置复杂度比较', fontweight='bold')
        ax1.set_ylabel('复杂度 (1-5分)')
        ax1.set_ylim(0, 5)
        
        # 添加数值标签
        for bar, score in zip(bars1, complexity_scores):
            ax1.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.1,
                    str(score), ha='center', va='bottom')
        
        # 2. 成熟度比较
        maturity_scores = [4, 5, 3, 3]  # 1-5分,5最成熟
        
        bars2 = ax2.bar(modes, maturity_scores, color=['lightgreen', 'orange', 'red', 'lightblue'])
        ax2.set_title('技术成熟度比较', fontweight='bold')
        ax2.set_ylabel('成熟度 (1-5分)')
        ax2.set_ylim(0, 5)
        
        for bar, score in zip(bars2, maturity_scores):
            ax2.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.1,
                    str(score), ha='center', va='bottom')
        
        # 3. 适用场景雷达图
        categories = ['易用性', '扩展性', '稳定性', '性能', '生态系统']
        
        # 各模式在不同维度的评分
        standalone_scores = [5, 2, 3, 4, 2]
        yarn_scores = [2, 5, 5, 4, 5]
        mesos_scores = [2, 4, 4, 4, 3]
        k8s_scores = [3, 5, 3, 4, 4]
        
        # 计算角度
        angles = np.linspace(0, 2 * np.pi, len(categories), endpoint=False).tolist()
        angles += angles[:1]  # 闭合图形
        
        # 添加闭合点
        standalone_scores += standalone_scores[:1]
        yarn_scores += yarn_scores[:1]
        
        ax3.plot(angles, standalone_scores, 'o-', linewidth=2, label='Standalone')
        ax3.plot(angles, yarn_scores, 's-', linewidth=2, label='YARN')
        ax3.fill(angles, standalone_scores, alpha=0.25)
        ax3.fill(angles, yarn_scores, alpha=0.25)
        
        ax3.set_xticks(angles[:-1])
        ax3.set_xticklabels(categories)
        ax3.set_ylim(0, 5)
        ax3.set_title('Standalone vs YARN 特性对比', fontweight='bold')
        ax3.legend()
        ax3.grid(True)
        
        # 4. 使用场景分布
        use_cases = ['开发测试', '小型生产', '大型企业', '云原生']
        mode_suitability = {
            'Standalone': [5, 4, 2, 1],
            'YARN': [3, 4, 5, 2],
            'Mesos': [2, 3, 4, 4],
            'Kubernetes': [3, 3, 3, 5]
        }
        
        x = np.arange(len(use_cases))
        width = 0.2
        
        for i, (mode, scores) in enumerate(mode_suitability.items()):
            ax4.bar(x + i * width, scores, width, label=mode, alpha=0.8)
        
        ax4.set_xlabel('使用场景')
        ax4.set_ylabel('适用性评分 (1-5分)')
        ax4.set_title('不同场景下的适用性', fontweight='bold')
        ax4.set_xticks(x + width * 1.5)
        ax4.set_xticklabels(use_cases)
        ax4.legend()
        ax4.grid(True, alpha=0.3)
        
        plt.tight_layout()
        plt.show()
    
    def standalone_cluster_setup(self):
        """
        Standalone集群搭建
        """
        print("\n\nStandalone集群搭建:")
        print("=" * 30)
        
        setup_steps = {
            '1. 准备节点': {
                'description': '准备Master和Worker节点',
                'details': [
                    '确保所有节点安装了Java和Spark',
                    '配置SSH免密登录',
                    '同步时间和主机名解析'
                ]
            },
            '2. 配置Master节点': {
                'description': '配置Spark Master',
                'details': [
                    '编辑 $SPARK_HOME/conf/spark-env.sh',
                    '设置 SPARK_MASTER_HOST',
                    '启动Master: start-master.sh'
                ]
            },
            '3. 配置Worker节点': {
                'description': '配置Spark Worker',
                'details': [
                    '编辑 $SPARK_HOME/conf/spark-env.sh',
                    '设置 SPARK_MASTER_URL',
                    '启动Worker: start-worker.sh'
                ]
            },
            '4. 验证集群': {
                'description': '验证集群状态',
                'details': [
                    '访问Master Web UI: http://master:8080',
                    '检查Worker节点状态',
                    '提交测试作业'
                ]
            }
        }
        
        for step, info in setup_steps.items():
            print(f"\n{step}: {info['description']}")
            for detail in info['details']:
                print(f"  - {detail}")
        
        # 配置文件示例
        print("\n配置文件示例:")
        print("-" * 20)
        
        spark_env_content = '''
# $SPARK_HOME/conf/spark-env.sh

# Java路径
export JAVA_HOME=/usr/lib/jvm/java-11-openjdk

# Master节点配置
export SPARK_MASTER_HOST=master-node
export SPARK_MASTER_PORT=7077
export SPARK_MASTER_WEBUI_PORT=8080

# Worker节点配置
export SPARK_WORKER_CORES=4
export SPARK_WORKER_MEMORY=4g
export SPARK_WORKER_PORT=7078
export SPARK_WORKER_WEBUI_PORT=8081

# 日志配置
export SPARK_LOG_DIR=/var/log/spark
'''
        
        print(spark_env_content)
        
        # 启动脚本
        print("\n集群启动脚本:")
        startup_script = '''
#!/bin/bash
# start-cluster.sh

# 启动Master
echo "启动Spark Master..."
start-master.sh

# 启动所有Worker
echo "启动Spark Workers..."
start-workers.sh

echo "集群启动完成!"
echo "Master Web UI: http://$(hostname):8080"
'''
        
        print(startup_script)
    
    def yarn_integration(self):
        """
        YARN集成配置
        """
        print("\n\nYARN集成配置:")
        print("=" * 30)
        
        yarn_config = {
            '前提条件': [
                'Hadoop集群已正确安装和配置',
                'YARN ResourceManager正常运行',
                'Spark与Hadoop版本兼容'
            ],
            '配置步骤': [
                '设置HADOOP_CONF_DIR环境变量',
                '配置spark-defaults.conf',
                '配置yarn-site.xml',
                '测试YARN集成'
            ],
            '运行模式': {
                'yarn-client': '驱动程序在客户端运行',
                'yarn-cluster': '驱动程序在YARN集群中运行'
            }
        }
        
        for category, items in yarn_config.items():
            print(f"\n{category}:")
            if isinstance(items, list):
                for item in items:
                    print(f"  - {item}")
            elif isinstance(items, dict):
                for key, value in items.items():
                    print(f"  {key}: {value}")
        
        # 配置文件示例
        print("\n配置文件示例:")
        print("-" * 20)
        
        spark_defaults_content = '''
# $SPARK_HOME/conf/spark-defaults.conf

# YARN配置
spark.master                     yarn
spark.submit.deployMode          cluster
spark.yarn.am.memory             1g
spark.yarn.am.cores              1

# 执行器配置
spark.executor.memory            2g
spark.executor.cores             2
spark.executor.instances         4

# 驱动程序配置
spark.driver.memory              1g
spark.driver.cores               1

# 动态分配
spark.dynamicAllocation.enabled  true
spark.dynamicAllocation.minExecutors 1
spark.dynamicAllocation.maxExecutors 10
'''
        
        print(spark_defaults_content)
        
        # 提交命令示例
        print("\nYARN作业提交示例:")
        submit_commands = [
            '# Client模式',
            'spark-submit --master yarn --deploy-mode client --class MyApp app.jar',
            '',
            '# Cluster模式',
            'spark-submit --master yarn --deploy-mode cluster --class MyApp app.jar',
            '',
            '# PySpark作业',
            'spark-submit --master yarn --deploy-mode cluster my_script.py'
        ]
        
        for cmd in submit_commands:
            print(cmd)

# 集群配置演示
cluster_setup = SparkClusterSetup()

print("\nSpark集群环境配置:")
print("=" * 40)

# 集群模式比较
cluster_setup.compare_cluster_modes()

# Standalone集群搭建
cluster_setup.standalone_cluster_setup()

# YARN集成
cluster_setup.yarn_integration()

1.3 第一个Spark应用

1.3.1 Hello World示例

class SparkHelloWorld:
    """
    Spark Hello World示例
    """
    
    def __init__(self):
        self.examples = {}
    
    def basic_rdd_example(self):
        """
        基础RDD示例
        """
        print("基础RDD示例:")
        print("=" * 20)
        
        # 模拟代码(实际需要Spark环境)
        example_code = '''
# 导入必要的模块
from pyspark import SparkContext, SparkConf

# 创建Spark配置
conf = SparkConf().setAppName("HelloWorld").setMaster("local[*]")
sc = SparkContext(conf=conf)

# 创建RDD
data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
rdd = sc.parallelize(data)

print(f"原始数据: {rdd.collect()}")

# 转换操作
squared_rdd = rdd.map(lambda x: x ** 2)
filtered_rdd = squared_rdd.filter(lambda x: x > 10)

# 行动操作
result = filtered_rdd.collect()
print(f"平方后大于10的数: {result}")

# 统计操作
count = filtered_rdd.count()
sum_value = filtered_rdd.sum()
average = sum_value / count

print(f"数量: {count}")
print(f"总和: {sum_value}")
print(f"平均值: {average}")

# 停止SparkContext
sc.stop()
'''
        
        print(example_code)
        
        # 模拟执行结果
        print("\n执行结果:")
        print("原始数据: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]")
        print("平方后大于10的数: [16, 25, 36, 49, 64, 81, 100]")
        print("数量: 7")
        print("总和: 371")
        print("平均值: 53.0")
    
    def dataframe_example(self):
        """
        DataFrame示例
        """
        print("\n\nDataFrame示例:")
        print("=" * 20)
        
        example_code = '''
# 导入必要的模块
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, count, max as spark_max

# 创建SparkSession
spark = SparkSession.builder \
    .appName("DataFrameExample") \
    .master("local[*]") \
    .getOrCreate()

# 创建示例数据
data = [
    ("Alice", 25, "Engineer", 75000),
    ("Bob", 30, "Manager", 85000),
    ("Charlie", 35, "Engineer", 80000),
    ("Diana", 28, "Analyst", 65000),
    ("Eve", 32, "Manager", 90000)
]

columns = ["name", "age", "job", "salary"]
df = spark.createDataFrame(data, columns)

# 显示数据
print("原始数据:")
df.show()

# 基本操作
print("\n数据概览:")
df.printSchema()
print(f"总行数: {df.count()}")

# 过滤和选择
print("\n年龄大于30的员工:")
df.filter(col("age") > 30).select("name", "age", "job").show()

# 分组统计
print("\n按职位统计:")
df.groupBy("job") \
  .agg(count("*").alias("count"), 
       avg("salary").alias("avg_salary"), 
       spark_max("age").alias("max_age")) \
  .show()

# SQL查询
df.createOrReplaceTempView("employees")
result = spark.sql("""
    SELECT job, 
           COUNT(*) as count,
           AVG(salary) as avg_salary
    FROM employees 
    WHERE age >= 30
    GROUP BY job
    ORDER BY avg_salary DESC
""")

print("\nSQL查询结果:")
result.show()

# 停止SparkSession
spark.stop()
'''
        
        print(example_code)
        
        # 模拟执行结果
        print("\n执行结果:")
        print("原始数据:")
        print("+-------+---+--------+------+")
        print("|   name|age|     job|salary|")
        print("+-------+---+--------+------+")
        print("|  Alice| 25|Engineer| 75000|")
        print("|    Bob| 30| Manager| 85000|")
        print("|Charlie| 35|Engineer| 80000|")
        print("|  Diana| 28| Analyst| 65000|")
        print("|    Eve| 32| Manager| 90000|")
        print("+-------+---+--------+------+")
    
    def word_count_example(self):
        """
        经典WordCount示例
        """
        print("\n\n经典WordCount示例:")
        print("=" * 25)
        
        example_code = '''
# WordCount - Spark经典示例
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split, lower, regexp_replace

# 创建SparkSession
spark = SparkSession.builder \
    .appName("WordCount") \
    .master("local[*]") \
    .getOrCreate()

# 示例文本数据
text_data = [
    "Apache Spark is a unified analytics engine",
    "Spark provides high-level APIs in Java, Scala, Python and R",
    "Spark also supports a rich set of higher-level tools",
    "including Spark SQL for SQL and DataFrames"
]

# 创建DataFrame
df = spark.createDataFrame([(line,) for line in text_data], ["text"])

# 方法1: 使用DataFrame API
print("方法1: DataFrame API")
words_df = df.select(
    explode(split(lower(regexp_replace("text", "[^a-zA-Z\\s]", "")), " ")).alias("word")
).filter("word != ''")

word_count_df = words_df.groupBy("word").count().orderBy("count", ascending=False)
word_count_df.show()

# 方法2: 使用RDD API
print("\n方法2: RDD API")
rdd = spark.sparkContext.parallelize(text_data)
word_count_rdd = rdd.flatMap(lambda line: line.lower().split()) \
                   .map(lambda word: (word, 1)) \
                   .reduceByKey(lambda a, b: a + b) \
                   .sortBy(lambda x: x[1], ascending=False)

print("词频统计结果:")
for word, count in word_count_rdd.collect():
    print(f"{word}: {count}")

# 停止SparkSession
spark.stop()
'''
        
        print(example_code)
        
        # 模拟执行结果
        print("\n执行结果:")
        print("方法1: DataFrame API")
        print("+--------+-----+")
        print("|    word|count|")
        print("+--------+-----+")
        print("|   spark|    4|")
        print("|     and|    2|")
        print("|  apache|    1|")
        print("|      is|    1|")
        print("|provides|    1|")
        print("+--------+-----+")
    
    def performance_monitoring_example(self):
        """
        性能监控示例
        """
        print("\n\n性能监控示例:")
        print("=" * 20)
        
        monitoring_code = '''
# Spark性能监控和调优
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import time

# 创建SparkSession with 监控配置
spark = SparkSession.builder \
    .appName("PerformanceMonitoring") \
    .master("local[*]") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .getOrCreate()

# 设置日志级别
spark.sparkContext.setLogLevel("WARN")

print(f"Spark版本: {spark.version}")
print(f"Spark UI: {spark.sparkContext.uiWebUrl}")
print(f"应用程序ID: {spark.sparkContext.applicationId}")

# 创建大数据集进行性能测试
print("\n创建测试数据...")
start_time = time.time()

# 生成100万条记录
large_data = spark.range(1000000).select(
    col("id"),
    (col("id") % 100).alias("category"),
    (rand() * 1000).alias("value"),
    when(col("id") % 2 == 0, "even").otherwise("odd").alias("type")
)

# 缓存数据以提高性能
large_data.cache()
count = large_data.count()  # 触发缓存

create_time = time.time() - start_time
print(f"数据创建完成: {count:,} 条记录,耗时: {create_time:.2f}秒")

# 执行复杂查询
print("\n执行聚合查询...")
start_time = time.time()

result = large_data.groupBy("category", "type") \
    .agg(
        count("*").alias("count"),
        avg("value").alias("avg_value"),
        max("value").alias("max_value"),
        min("value").alias("min_value")
    ) \
    .orderBy("category", "type")

result_count = result.count()
query_time = time.time() - start_time

print(f"查询完成: {result_count} 个分组,耗时: {query_time:.2f}秒")
print("\n查询结果示例:")
result.show(10)

# 显示执行计划
print("\n查询执行计划:")
result.explain()

# 清理缓存
large_data.unpersist()
spark.stop()
'''
        
        print(monitoring_code)
        
        print("\n监控要点:")
        monitoring_points = [
            "Spark UI (http://localhost:4040) - 查看作业执行情况",
            "执行计划分析 - 使用explain()方法",
            "缓存策略 - 合理使用cache()和persist()",
            "分区优化 - 控制数据分区数量",
            "资源配置 - 调整executor和driver内存"
        ]
        
        for point in monitoring_points:
            print(f"  • {point}")

# Hello World演示
hello_world = SparkHelloWorld()

print("\nSpark Hello World示例:")
print("=" * 40)

# 基础RDD示例
hello_world.basic_rdd_example()

# DataFrame示例
hello_world.dataframe_example()

# WordCount示例
hello_world.word_count_example()

# 性能监控示例
hello_world.performance_monitoring_example()

1.4 Spark配置与优化

1.4.1 基础配置

class SparkConfigurationGuide:
    """
    Spark配置指南
    """
    
    def __init__(self):
        self.config_categories = {}
        self.setup_config_categories()
    
    def setup_config_categories(self):
        """
        设置配置类别
        """
        self.config_categories = {
            '应用程序配置': {
                'spark.app.name': '应用程序名称',
                'spark.master': '集群管理器URL',
                'spark.submit.deployMode': '部署模式(client/cluster)',
                'spark.driver.host': '驱动程序主机名'
            },
            '执行器配置': {
                'spark.executor.memory': '执行器内存大小',
                'spark.executor.cores': '执行器CPU核数',
                'spark.executor.instances': '执行器实例数',
                'spark.executor.memoryFraction': '执行器内存分配比例'
            },
            '驱动程序配置': {
                'spark.driver.memory': '驱动程序内存大小',
                'spark.driver.cores': '驱动程序CPU核数',
                'spark.driver.maxResultSize': '结果集最大大小',
                'spark.driver.extraJavaOptions': '额外JVM选项'
            },
            '动态分配': {
                'spark.dynamicAllocation.enabled': '启用动态分配',
                'spark.dynamicAllocation.minExecutors': '最小执行器数',
                'spark.dynamicAllocation.maxExecutors': '最大执行器数',
                'spark.dynamicAllocation.initialExecutors': '初始执行器数'
            }
        }
    
    def show_configuration_guide(self):
        """
        显示配置指南
        """
        print("Spark配置指南:")
        print("=" * 20)
        
        for category, configs in self.config_categories.items():
            print(f"\n{category}:")
            for config, description in configs.items():
                print(f"  {config}: {description}")
    
    def configuration_examples(self):
        """
        配置示例
        """
        print("\n\n配置示例:")
        print("=" * 15)
        
        # 开发环境配置
        print("1. 开发环境配置:")
        dev_config = '''
# spark-defaults.conf (开发环境)
spark.master                     local[*]
spark.driver.memory              2g
spark.executor.memory            2g
spark.sql.adaptive.enabled       true
spark.sql.adaptive.coalescePartitions.enabled true
spark.serializer                 org.apache.spark.serializer.KryoSerializer
'''
        print(dev_config)
        
        # 生产环境配置
        print("\n2. 生产环境配置:")
        prod_config = '''
# spark-defaults.conf (生产环境)
spark.master                     yarn
spark.submit.deployMode          cluster
spark.driver.memory              4g
spark.driver.cores               2
spark.executor.memory            8g
spark.executor.cores             4
spark.executor.instances         10
spark.dynamicAllocation.enabled  true
spark.dynamicAllocation.minExecutors 2
spark.dynamicAllocation.maxExecutors 20
spark.sql.adaptive.enabled       true
spark.sql.adaptive.coalescePartitions.enabled true
spark.serializer                 org.apache.spark.serializer.KryoSerializer
spark.sql.execution.arrow.pyspark.enabled true
'''
        print(prod_config)
        
        # 程序中配置
        print("\n3. 程序中动态配置:")
        code_config = '''
# Python代码中配置
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf

# 创建配置对象
conf = SparkConf() \
    .setAppName("MySparkApp") \
    .setMaster("local[4]") \
    .set("spark.executor.memory", "4g") \
    .set("spark.executor.cores", "2") \
    .set("spark.sql.adaptive.enabled", "true") \
    .set("spark.sql.adaptive.coalescePartitions.enabled", "true")

# 创建SparkSession
spark = SparkSession.builder.config(conf=conf).getOrCreate()

# 运行时修改配置
spark.conf.set("spark.sql.shuffle.partitions", "200")

# 查看当前配置
print("当前配置:")
for item in spark.sparkContext.getConf().getAll():
    print(f"{item[0]}: {item[1]}")
'''
        print(code_config)

# 配置指南演示
config_guide = SparkConfigurationGuide()

print("\nSpark配置与优化:")
print("=" * 30)

# 显示配置指南
config_guide.show_configuration_guide()

# 配置示例
config_guide.configuration_examples()

1.5 本章小结

本章介绍了Apache Spark的基础知识和环境搭建:

核心内容回顾

  1. Spark概述

    • Spark是统一的大数据处理引擎
    • 支持批处理、流处理、机器学习、图计算
    • 比Hadoop MapReduce快10-100倍
  2. 生态系统组件

    • Spark Core: 核心引擎和RDD抽象
    • Spark SQL: 结构化数据处理
    • Spark Streaming: 实时流处理
    • MLlib: 机器学习库
    • GraphX: 图计算库
  3. 核心概念

    • RDD: 弹性分布式数据集
    • DataFrame: 结构化数据抽象
    • Dataset: 类型安全的数据抽象
    • 惰性求值和血缘关系
  4. 环境搭建

    • 本地开发环境配置
    • 集群环境部署
    • Docker容器化部署
    • Jupyter集成开发

实践技能

  • 掌握Spark环境的安装和配置
  • 理解不同集群模式的特点和适用场景
  • 能够编写基础的Spark应用程序
  • 了解性能监控和基础优化方法

下一章预告

下一章将深入学习Spark Core和RDD编程,包括: - RDD的创建和操作 - 转换操作和行动操作详解 - RDD的持久化和分区 - 共享变量:广播变量和累加器

练习题

  1. 环境搭建练习

    • 在本地搭建Spark开发环境
    • 配置Jupyter Notebook集成
    • 运行第一个Spark程序
  2. 基础编程练习

    • 实现WordCount程序的三种不同方式
    • 创建包含学生信息的DataFrame并进行查询
    • 比较RDD和DataFrame的性能差异
  3. 配置优化练习

    • 针对不同数据量调整Spark配置
    • 监控Spark应用的执行情况
    • 分析执行计划并优化查询性能