4.1 无监督学习概述

4.1.1 什么是无监督学习

无监督学习是机器学习的一个重要分支,它从没有标签的数据中发现隐藏的模式和结构。与监督学习不同,无监督学习不需要预先知道正确答案,而是让算法自己发现数据中的规律。

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.datasets import make_blobs, make_circles, make_moons
from sklearn.preprocessing import StandardScaler
from sklearn.cluster import KMeans, AgglomerativeClustering, DBSCAN
from sklearn.decomposition import PCA, FastICA
from sklearn.manifold import TSNE, MDS
from sklearn.mixture import GaussianMixture
from sklearn.metrics import silhouette_score, adjusted_rand_score, normalized_mutual_info_score
from sklearn.neighbors import NearestNeighbors
from scipy.cluster.hierarchy import dendrogram, linkage
from scipy.spatial.distance import pdist, squareform
import warnings
warnings.filterwarnings('ignore')

# 设置中文字体和图形样式
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
sns.set_style("whitegrid")

class UnsupervisedLearningDemo:
    """
    无监督学习演示类
    """
    def __init__(self):
        self.datasets = {}
        self.results = {}
    
    def create_sample_datasets(self):
        """
        创建示例数据集
        """
        np.random.seed(42)
        
        # 1. 球形聚类数据
        X_blobs, y_blobs = make_blobs(n_samples=300, centers=4, n_features=2, 
                                     random_state=42, cluster_std=1.5)
        
        # 2. 环形数据
        X_circles, y_circles = make_circles(n_samples=300, noise=0.1, factor=0.3, 
                                           random_state=42)
        
        # 3. 月牙形数据
        X_moons, y_moons = make_moons(n_samples=300, noise=0.1, random_state=42)
        
        # 4. 高斯混合数据
        X_gaussian = np.vstack([
            np.random.multivariate_normal([2, 2], [[1, 0.5], [0.5, 1]], 100),
            np.random.multivariate_normal([6, 6], [[1, -0.5], [-0.5, 1]], 100),
            np.random.multivariate_normal([2, 6], [[1, 0], [0, 1]], 100)
        ])
        y_gaussian = np.array([0]*100 + [1]*100 + [2]*100)
        
        self.datasets = {
            'blobs': (X_blobs, y_blobs),
            'circles': (X_circles, y_circles),
            'moons': (X_moons, y_moons),
            'gaussian': (X_gaussian, y_gaussian)
        }
        
        return self.datasets
    
    def visualize_datasets(self):
        """
        可视化数据集
        """
        fig, axes = plt.subplots(2, 2, figsize=(12, 10))
        axes = axes.ravel()
        
        titles = ['球形聚类数据', '环形数据', '月牙形数据', '高斯混合数据']
        dataset_names = ['blobs', 'circles', 'moons', 'gaussian']
        
        for i, (name, title) in enumerate(zip(dataset_names, titles)):
            X, y = self.datasets[name]
            
            scatter = axes[i].scatter(X[:, 0], X[:, 1], c=y, cmap='viridis', alpha=0.7)
            axes[i].set_title(title)
            axes[i].set_xlabel('特征1')
            axes[i].set_ylabel('特征2')
            plt.colorbar(scatter, ax=axes[i])
        
        plt.tight_layout()
        plt.show()
        
        print("无监督学习的主要任务:")
        print("1. 聚类分析 - 发现数据中的群组结构")
        print("2. 降维 - 减少数据维度,保留重要信息")
        print("3. 密度估计 - 估计数据的概率分布")
        print("4. 异常检测 - 识别异常或离群点")
        print("5. 关联规则挖掘 - 发现变量间的关联关系")

# 创建演示实例
unsupervised_demo = UnsupervisedLearningDemo()
datasets = unsupervised_demo.create_sample_datasets()
unsupervised_demo.visualize_datasets()

4.1.2 无监督学习的类型

无监督学习主要包括以下几种类型:

  1. 聚类分析:将相似的数据点分组
  2. 降维:减少数据的维度
  3. 密度估计:估计数据的概率分布
  4. 异常检测:识别异常数据点
  5. 关联规则挖掘:发现数据项之间的关联

4.2 聚类算法

4.2.1 K-means聚类

K-means是最常用的聚类算法之一,通过迭代优化将数据分为k个簇。

class KMeansAnalyzer:
    """
    K-means聚类分析器
    """
    def __init__(self):
        self.models = {}
        self.results = {}
    
    def demonstrate_kmeans_process(self):
        """
        演示K-means算法过程
        """
        # 使用简单的2D数据
        X, _ = make_blobs(n_samples=100, centers=3, n_features=2, 
                         random_state=42, cluster_std=1.5)
        
        # 手动实现K-means的一次迭代
        k = 3
        
        # 随机初始化质心
        np.random.seed(42)
        centroids = X[np.random.choice(X.shape[0], k, replace=False)]
        
        fig, axes = plt.subplots(2, 3, figsize=(15, 10))
        
        for iteration in range(6):
            ax = axes[iteration // 3, iteration % 3]
            
            if iteration % 2 == 0:  # 分配步骤
                # 计算每个点到质心的距离
                distances = np.sqrt(((X - centroids[:, np.newaxis])**2).sum(axis=2))
                labels = np.argmin(distances, axis=0)
                
                # 绘制数据点和质心
                colors = ['red', 'blue', 'green']
                for i in range(k):
                    mask = labels == i
                    ax.scatter(X[mask, 0], X[mask, 1], c=colors[i], alpha=0.7, s=50)
                
                ax.scatter(centroids[:, 0], centroids[:, 1], c='black', 
                          marker='x', s=200, linewidth=3, label='质心')
                ax.set_title(f'迭代 {iteration//2 + 1}: 分配步骤')
                
            else:  # 更新步骤
                # 更新质心
                new_centroids = np.array([X[labels == i].mean(axis=0) for i in range(k)])
                
                # 绘制旧质心和新质心
                for i in range(k):
                    mask = labels == i
                    ax.scatter(X[mask, 0], X[mask, 1], c=colors[i], alpha=0.7, s=50)
                
                ax.scatter(centroids[:, 0], centroids[:, 1], c='black', 
                          marker='x', s=200, linewidth=3, label='旧质心')
                ax.scatter(new_centroids[:, 0], new_centroids[:, 1], c='red', 
                          marker='o', s=200, linewidth=3, label='新质心')
                
                # 绘制质心移动轨迹
                for i in range(k):
                    ax.arrow(centroids[i, 0], centroids[i, 1], 
                            new_centroids[i, 0] - centroids[i, 0],
                            new_centroids[i, 1] - centroids[i, 1],
                            head_width=0.1, head_length=0.1, fc='red', ec='red')
                
                centroids = new_centroids
                ax.set_title(f'迭代 {iteration//2 + 1}: 更新步骤')
            
            ax.set_xlabel('特征1')
            ax.set_ylabel('特征2')
            ax.legend()
            ax.grid(True, alpha=0.3)
        
        plt.tight_layout()
        plt.show()
    
    def find_optimal_k(self, X, max_k=10):
        """
        使用肘部法则和轮廓系数寻找最优k值
        """
        k_range = range(2, max_k + 1)
        inertias = []
        silhouette_scores = []
        
        for k in k_range:
            kmeans = KMeans(n_clusters=k, random_state=42, n_init=10)
            labels = kmeans.fit_predict(X)
            
            inertias.append(kmeans.inertia_)
            silhouette_scores.append(silhouette_score(X, labels))
        
        # 可视化
        fig, axes = plt.subplots(1, 2, figsize=(12, 5))
        
        # 肘部法则
        axes[0].plot(k_range, inertias, 'o-', linewidth=2, markersize=6)
        axes[0].set_xlabel('聚类数量 (k)')
        axes[0].set_ylabel('簇内平方和 (WCSS)')
        axes[0].set_title('肘部法则')
        axes[0].grid(True, alpha=0.3)
        
        # 轮廓系数
        axes[1].plot(k_range, silhouette_scores, 'o-', linewidth=2, markersize=6, color='orange')
        best_k = k_range[np.argmax(silhouette_scores)]
        axes[1].axvline(x=best_k, color='red', linestyle='--', alpha=0.7, 
                       label=f'最优k={best_k}')
        axes[1].set_xlabel('聚类数量 (k)')
        axes[1].set_ylabel('轮廓系数')
        axes[1].set_title('轮廓系数法')
        axes[1].legend()
        axes[1].grid(True, alpha=0.3)
        
        plt.tight_layout()
        plt.show()
        
        print(f"最优k值(基于轮廓系数): {best_k}")
        print(f"最佳轮廓系数: {max(silhouette_scores):.4f}")
        
        return best_k, inertias, silhouette_scores
    
    def compare_kmeans_variants(self, X, k=3):
        """
        比较不同的K-means变体
        """
        models = {
            'K-means': KMeans(n_clusters=k, random_state=42, n_init=10),
            'K-means++': KMeans(n_clusters=k, init='k-means++', random_state=42, n_init=10),
            'Mini-batch K-means': KMeans(n_clusters=k, random_state=42, n_init=10, algorithm='auto')
        }
        
        fig, axes = plt.subplots(1, 3, figsize=(15, 5))
        
        for i, (name, model) in enumerate(models.items()):
            labels = model.fit_predict(X)
            centroids = model.cluster_centers_
            
            # 绘制聚类结果
            colors = ['red', 'blue', 'green', 'purple', 'orange']
            for j in range(k):
                mask = labels == j
                axes[i].scatter(X[mask, 0], X[mask, 1], c=colors[j], alpha=0.7, s=50)
            
            axes[i].scatter(centroids[:, 0], centroids[:, 1], c='black', 
                           marker='x', s=200, linewidth=3)
            
            # 计算评估指标
            inertia = model.inertia_
            silhouette = silhouette_score(X, labels)
            
            axes[i].set_title(f'{name}\n惯性: {inertia:.2f}, 轮廓: {silhouette:.3f}')
            axes[i].set_xlabel('特征1')
            axes[i].set_ylabel('特征2')
            axes[i].grid(True, alpha=0.3)
        
        plt.tight_layout()
        plt.show()
        
        return models
    
    def analyze_different_datasets(self, datasets):
        """
        在不同数据集上分析K-means性能
        """
        fig, axes = plt.subplots(2, 4, figsize=(16, 8))
        
        dataset_names = ['blobs', 'circles', 'moons', 'gaussian']
        
        for i, name in enumerate(dataset_names):
            X, y_true = datasets[name]
            
            # 原始数据
            axes[0, i].scatter(X[:, 0], X[:, 1], c=y_true, cmap='viridis', alpha=0.7)
            axes[0, i].set_title(f'真实标签 - {name}')
            axes[0, i].set_xlabel('特征1')
            axes[0, i].set_ylabel('特征2')
            
            # K-means聚类结果
            k = len(np.unique(y_true))
            kmeans = KMeans(n_clusters=k, random_state=42, n_init=10)
            y_pred = kmeans.fit_predict(X)
            
            axes[1, i].scatter(X[:, 0], X[:, 1], c=y_pred, cmap='viridis', alpha=0.7)
            axes[1, i].scatter(kmeans.cluster_centers_[:, 0], kmeans.cluster_centers_[:, 1], 
                              c='red', marker='x', s=200, linewidth=3)
            
            # 计算评估指标
            ari = adjusted_rand_score(y_true, y_pred)
            nmi = normalized_mutual_info_score(y_true, y_pred)
            silhouette = silhouette_score(X, y_pred)
            
            axes[1, i].set_title(f'K-means - {name}\nARI: {ari:.3f}, NMI: {nmi:.3f}, Sil: {silhouette:.3f}')
            axes[1, i].set_xlabel('特征1')
            axes[1, i].set_ylabel('特征2')
        
        plt.tight_layout()
        plt.show()

# K-means演示
kmeans_analyzer = KMeansAnalyzer()

print("K-means算法演示:")
print("=" * 20)

# 演示K-means过程
kmeans_analyzer.demonstrate_kmeans_process()

# 寻找最优k值
X_blobs, _ = datasets['blobs']
best_k, inertias, silhouette_scores = kmeans_analyzer.find_optimal_k(X_blobs, max_k=8)

# 比较K-means变体
kmeans_models = kmeans_analyzer.compare_kmeans_variants(X_blobs, k=4)

# 在不同数据集上分析性能
kmeans_analyzer.analyze_different_datasets(datasets)

4.2.2 层次聚类

层次聚类通过构建聚类的层次结构来进行聚类,分为凝聚式和分裂式两种。

class HierarchicalClusteringAnalyzer:
    """
    层次聚类分析器
    """
    def __init__(self):
        self.models = {}
        self.linkage_matrices = {}
    
    def demonstrate_hierarchical_process(self):
        """
        演示层次聚类过程
        """
        # 创建简单数据
        np.random.seed(42)
        X = np.array([[1, 2], [1.5, 1.8], [5, 8], [8, 8], [1, 0.6], [9, 11]])
        labels = ['A', 'B', 'C', 'D', 'E', 'F']
        
        # 计算距离矩阵
        distance_matrix = pdist(X, metric='euclidean')
        
        # 不同链接方法
        linkage_methods = ['single', 'complete', 'average', 'ward']
        
        fig, axes = plt.subplots(2, 4, figsize=(16, 8))
        
        for i, method in enumerate(linkage_methods):
            # 计算链接矩阵
            Z = linkage(distance_matrix, method=method)
            
            # 绘制数据点
            axes[0, i].scatter(X[:, 0], X[:, 1], s=100, alpha=0.7)
            for j, label in enumerate(labels):
                axes[0, i].annotate(label, (X[j, 0], X[j, 1]), 
                                   xytext=(5, 5), textcoords='offset points')
            axes[0, i].set_title(f'数据点 - {method}')
            axes[0, i].set_xlabel('特征1')
            axes[0, i].set_ylabel('特征2')
            axes[0, i].grid(True, alpha=0.3)
            
            # 绘制树状图
            dendrogram(Z, labels=labels, ax=axes[1, i])
            axes[1, i].set_title(f'树状图 - {method}')
            axes[1, i].set_xlabel('样本')
            axes[1, i].set_ylabel('距离')
        
        plt.tight_layout()
        plt.show()
        
        return Z
    
    def compare_linkage_methods(self, X, n_clusters=3):
        """
        比较不同的链接方法
        """
        linkage_methods = ['single', 'complete', 'average', 'ward']
        
        fig, axes = plt.subplots(2, 4, figsize=(16, 8))
        
        for i, method in enumerate(linkage_methods):
            # 层次聚类
            hierarchical = AgglomerativeClustering(n_clusters=n_clusters, linkage=method)
            labels = hierarchical.fit_predict(X)
            
            # 绘制聚类结果
            colors = ['red', 'blue', 'green', 'purple', 'orange']
            for j in range(n_clusters):
                mask = labels == j
                axes[0, i].scatter(X[mask, 0], X[mask, 1], c=colors[j], alpha=0.7, s=50)
            
            axes[0, i].set_title(f'聚类结果 - {method}')
            axes[0, i].set_xlabel('特征1')
            axes[0, i].set_ylabel('特征2')
            axes[0, i].grid(True, alpha=0.3)
            
            # 计算并绘制树状图
            distance_matrix = pdist(X, metric='euclidean')
            Z = linkage(distance_matrix, method=method)
            
            dendrogram(Z, ax=axes[1, i], truncate_mode='level', p=5)
            axes[1, i].set_title(f'树状图 - {method}')
            axes[1, i].set_xlabel('样本索引')
            axes[1, i].set_ylabel('距离')
            
            # 计算轮廓系数
            silhouette = silhouette_score(X, labels)
            axes[0, i].text(0.02, 0.98, f'轮廓系数: {silhouette:.3f}', 
                           transform=axes[0, i].transAxes, verticalalignment='top',
                           bbox=dict(boxstyle='round', facecolor='white', alpha=0.8))
        
        plt.tight_layout()
        plt.show()
    
    def determine_optimal_clusters(self, X, max_clusters=10):
        """
        确定最优聚类数量
        """
        # 计算距离矩阵和链接矩阵
        distance_matrix = pdist(X, metric='euclidean')
        Z = linkage(distance_matrix, method='ward')
        
        # 计算不同聚类数的轮廓系数
        cluster_range = range(2, max_clusters + 1)
        silhouette_scores = []
        
        for n_clusters in cluster_range:
            hierarchical = AgglomerativeClustering(n_clusters=n_clusters, linkage='ward')
            labels = hierarchical.fit_predict(X)
            silhouette_scores.append(silhouette_score(X, labels))
        
        # 可视化
        fig, axes = plt.subplots(1, 2, figsize=(12, 5))
        
        # 树状图
        dendrogram(Z, ax=axes[0])
        axes[0].set_title('完整树状图')
        axes[0].set_xlabel('样本索引')
        axes[0].set_ylabel('距离')
        
        # 轮廓系数
        axes[1].plot(cluster_range, silhouette_scores, 'o-', linewidth=2, markersize=6)
        best_n = cluster_range[np.argmax(silhouette_scores)]
        axes[1].axvline(x=best_n, color='red', linestyle='--', alpha=0.7, 
                       label=f'最优聚类数={best_n}')
        axes[1].set_xlabel('聚类数量')
        axes[1].set_ylabel('轮廓系数')
        axes[1].set_title('聚类数量选择')
        axes[1].legend()
        axes[1].grid(True, alpha=0.3)
        
        plt.tight_layout()
        plt.show()
        
        print(f"最优聚类数量: {best_n}")
        print(f"最佳轮廓系数: {max(silhouette_scores):.4f}")
        
        return best_n, silhouette_scores

# 层次聚类演示
hierarchical_analyzer = HierarchicalClusteringAnalyzer()

print("\n层次聚类演示:")
print("=" * 20)

# 演示层次聚类过程
Z = hierarchical_analyzer.demonstrate_hierarchical_process()

# 比较不同链接方法
hierarchical_analyzer.compare_linkage_methods(X_blobs, n_clusters=4)

# 确定最优聚类数量
best_n_clusters, silhouette_scores = hierarchical_analyzer.determine_optimal_clusters(X_blobs, max_clusters=8)

4.2.3 DBSCAN聚类

DBSCAN(Density-Based Spatial Clustering of Applications with Noise)是一种基于密度的聚类算法,能够发现任意形状的聚类并识别噪声点。

class DBSCANAnalyzer:
    """
    DBSCAN聚类分析器
    """
    def __init__(self):
        self.models = {}
    
    def demonstrate_dbscan_concepts(self):
        """
        演示DBSCAN核心概念
        """
        # 创建包含噪声的数据
        np.random.seed(42)
        
        # 主要聚类
        cluster1 = np.random.multivariate_normal([2, 2], [[0.5, 0], [0, 0.5]], 50)
        cluster2 = np.random.multivariate_normal([6, 6], [[0.5, 0], [0, 0.5]], 50)
        
        # 噪声点
        noise = np.random.uniform(0, 8, (20, 2))
        
        X = np.vstack([cluster1, cluster2, noise])
        
        # 不同参数的DBSCAN
        eps_values = [0.5, 1.0, 1.5, 2.0]
        min_samples_values = [3, 5, 10, 15]
        
        fig, axes = plt.subplots(2, 4, figsize=(16, 8))
        
        # 不同eps值
        for i, eps in enumerate(eps_values):
            dbscan = DBSCAN(eps=eps, min_samples=5)
            labels = dbscan.fit_predict(X)
            
            # 绘制聚类结果
            unique_labels = set(labels)
            colors = plt.cm.Spectral(np.linspace(0, 1, len(unique_labels)))
            
            for k, col in zip(unique_labels, colors):
                if k == -1:
                    # 噪声点用黑色表示
                    col = 'black'
                    marker = 'x'
                    alpha = 0.5
                    label = '噪声'
                else:
                    marker = 'o'
                    alpha = 0.7
                    label = f'簇{k}'
                
                class_member_mask = (labels == k)
                xy = X[class_member_mask]
                axes[0, i].scatter(xy[:, 0], xy[:, 1], c=[col], marker=marker, 
                                  alpha=alpha, s=50, label=label if k <= 1 or k == -1 else "")
            
            n_clusters = len(set(labels)) - (1 if -1 in labels else 0)
            n_noise = list(labels).count(-1)
            
            axes[0, i].set_title(f'eps={eps}\n簇数: {n_clusters}, 噪声: {n_noise}')
            axes[0, i].set_xlabel('特征1')
            axes[0, i].set_ylabel('特征2')
            axes[0, i].legend()
            axes[0, i].grid(True, alpha=0.3)
        
        # 不同min_samples值
        for i, min_samples in enumerate(min_samples_values):
            dbscan = DBSCAN(eps=1.0, min_samples=min_samples)
            labels = dbscan.fit_predict(X)
            
            # 绘制聚类结果
            unique_labels = set(labels)
            colors = plt.cm.Spectral(np.linspace(0, 1, len(unique_labels)))
            
            for k, col in zip(unique_labels, colors):
                if k == -1:
                    col = 'black'
                    marker = 'x'
                    alpha = 0.5
                    label = '噪声'
                else:
                    marker = 'o'
                    alpha = 0.7
                    label = f'簇{k}'
                
                class_member_mask = (labels == k)
                xy = X[class_member_mask]
                axes[1, i].scatter(xy[:, 0], xy[:, 1], c=[col], marker=marker, 
                                  alpha=alpha, s=50, label=label if k <= 1 or k == -1 else "")
            
            n_clusters = len(set(labels)) - (1 if -1 in labels else 0)
            n_noise = list(labels).count(-1)
            
            axes[1, i].set_title(f'min_samples={min_samples}\n簇数: {n_clusters}, 噪声: {n_noise}')
            axes[1, i].set_xlabel('特征1')
            axes[1, i].set_ylabel('特征2')
            axes[1, i].legend()
            axes[1, i].grid(True, alpha=0.3)
        
        plt.tight_layout()
        plt.show()
        
        return X
    
    def find_optimal_parameters(self, X):
        """
        寻找最优DBSCAN参数
        """
        # 使用k-距离图确定eps
        k = 4  # min_samples - 1
        neighbors = NearestNeighbors(n_neighbors=k)
        neighbors_fit = neighbors.fit(X)
        distances, indices = neighbors_fit.kneighbors(X)
        
        # 排序距离
        distances = np.sort(distances[:, k-1], axis=0)
        
        # 绘制k-距离图
        plt.figure(figsize=(10, 6))
        plt.plot(distances)
        plt.xlabel('数据点(按距离排序)')
        plt.ylabel(f'{k}-距离')
        plt.title('k-距离图(用于确定eps)')
        plt.grid(True, alpha=0.3)
        
        # 寻找拐点(简单方法:最大二阶导数)
        if len(distances) > 10:
            second_derivative = np.diff(distances, 2)
            knee_point = np.argmax(second_derivative) + 2
            optimal_eps = distances[knee_point]
            
            plt.axhline(y=optimal_eps, color='red', linestyle='--', 
                       label=f'建议eps={optimal_eps:.3f}')
            plt.legend()
        
        plt.show()
        
        # 网格搜索最优参数
        eps_range = np.arange(0.1, 2.0, 0.1)
        min_samples_range = range(3, 15)
        
        best_score = -1
        best_params = {}
        results = []
        
        for eps in eps_range:
            for min_samples in min_samples_range:
                dbscan = DBSCAN(eps=eps, min_samples=min_samples)
                labels = dbscan.fit_predict(X)
                
                # 跳过只有噪声或只有一个簇的情况
                n_clusters = len(set(labels)) - (1 if -1 in labels else 0)
                if n_clusters < 2:
                    continue
                
                # 计算轮廓系数(排除噪声点)
                if len(set(labels)) > 1 and -1 not in labels:
                    score = silhouette_score(X, labels)
                elif len(set(labels)) > 2:  # 有噪声点但也有多个簇
                    mask = labels != -1
                    if np.sum(mask) > 1 and len(set(labels[mask])) > 1:
                        score = silhouette_score(X[mask], labels[mask])
                    else:
                        continue
                else:
                    continue
                
                results.append({
                    'eps': eps,
                    'min_samples': min_samples,
                    'n_clusters': n_clusters,
                    'n_noise': list(labels).count(-1),
                    'silhouette_score': score
                })
                
                if score > best_score:
                    best_score = score
                    best_params = {'eps': eps, 'min_samples': min_samples}
        
        # 可视化参数搜索结果
        if results:
            results_df = pd.DataFrame(results)
            
            fig, axes = plt.subplots(1, 2, figsize=(12, 5))
            
            # 热力图:轮廓系数
            pivot_table = results_df.pivot_table(values='silhouette_score', 
                                                index='min_samples', 
                                                columns='eps', 
                                                aggfunc='mean')
            
            sns.heatmap(pivot_table, annot=True, fmt='.3f', cmap='viridis', ax=axes[0])
            axes[0].set_title('轮廓系数热力图')
            axes[0].set_xlabel('eps')
            axes[0].set_ylabel('min_samples')
            
            # 散点图:聚类数vs轮廓系数
            scatter = axes[1].scatter(results_df['n_clusters'], results_df['silhouette_score'], 
                                    c=results_df['eps'], cmap='viridis', alpha=0.7)
            axes[1].set_xlabel('聚类数量')
            axes[1].set_ylabel('轮廓系数')
            axes[1].set_title('聚类数量 vs 轮廓系数')
            plt.colorbar(scatter, ax=axes[1], label='eps')
            
            plt.tight_layout()
            plt.show()
            
            print(f"最优参数: eps={best_params['eps']}, min_samples={best_params['min_samples']}")
            print(f"最佳轮廓系数: {best_score:.4f}")
        
        return best_params if results else {'eps': 0.5, 'min_samples': 5}
    
    def compare_clustering_algorithms(self, datasets):
        """
        比较不同聚类算法
        """
        algorithms = {
            'K-means': lambda X, n: KMeans(n_clusters=n, random_state=42, n_init=10).fit_predict(X),
            'Hierarchical': lambda X, n: AgglomerativeClustering(n_clusters=n).fit_predict(X),
            'DBSCAN': lambda X, n: DBSCAN(eps=0.5, min_samples=5).fit_predict(X)
        }
        
        fig, axes = plt.subplots(4, 4, figsize=(16, 16))
        
        dataset_names = ['blobs', 'circles', 'moons', 'gaussian']
        
        for i, dataset_name in enumerate(dataset_names):
            X, y_true = datasets[dataset_name]
            n_clusters = len(np.unique(y_true))
            
            # 原始数据
            axes[i, 0].scatter(X[:, 0], X[:, 1], c=y_true, cmap='viridis', alpha=0.7)
            axes[i, 0].set_title(f'真实标签 - {dataset_name}')
            axes[i, 0].set_ylabel('特征2')
            
            for j, (alg_name, algorithm) in enumerate(algorithms.items()):
                y_pred = algorithm(X, n_clusters)
                
                # 处理DBSCAN的噪声点
                if alg_name == 'DBSCAN':
                    unique_labels = set(y_pred)
                    colors = plt.cm.Spectral(np.linspace(0, 1, len(unique_labels)))
                    
                    for k, col in zip(unique_labels, colors):
                        if k == -1:
                            col = 'black'
                            marker = 'x'
                            alpha = 0.5
                        else:
                            marker = 'o'
                            alpha = 0.7
                        
                        class_member_mask = (y_pred == k)
                        xy = X[class_member_mask]
                        axes[i, j+1].scatter(xy[:, 0], xy[:, 1], c=[col], 
                                            marker=marker, alpha=alpha, s=30)
                else:
                    axes[i, j+1].scatter(X[:, 0], X[:, 1], c=y_pred, cmap='viridis', alpha=0.7)
                
                # 计算评估指标
                if alg_name == 'DBSCAN' and -1 in y_pred:
                    # 对于DBSCAN,排除噪声点计算ARI
                    mask = y_pred != -1
                    if np.sum(mask) > 0 and len(set(y_pred[mask])) > 1:
                        ari = adjusted_rand_score(y_true[mask], y_pred[mask])
                    else:
                        ari = 0
                else:
                    ari = adjusted_rand_score(y_true, y_pred)
                
                axes[i, j+1].set_title(f'{alg_name}\nARI: {ari:.3f}')
                
                if i == 3:  # 最后一行
                    axes[i, j+1].set_xlabel('特征1')
                if j == 0:  # 第一列
                    axes[i, j+1].set_ylabel('特征2')
        
        plt.tight_layout()
        plt.show()

# DBSCAN演示
dbscan_analyzer = DBSCANAnalyzer()

print("\nDBSCAN聚类演示:")
print("=" * 20)

# 演示DBSCAN概念
X_dbscan = dbscan_analyzer.demonstrate_dbscan_concepts()

# 寻找最优参数
best_dbscan_params = dbscan_analyzer.find_optimal_parameters(X_blobs)

# 比较不同聚类算法
dbscan_analyzer.compare_clustering_algorithms(datasets)

4.3 降维技术

4.3.1 主成分分析 (PCA)

PCA是最常用的线性降维技术,通过寻找数据的主要变化方向来减少维度。

class PCAAnalyzer:
    """
    PCA分析器
    """
    def __init__(self):
        self.models = {}
        self.results = {}
    
    def demonstrate_pca_concepts(self):
        """
        演示PCA核心概念
        """
        # 创建相关的2D数据
        np.random.seed(42)
        mean = [0, 0]
        cov = [[3, 1.5], [1.5, 1]]
        X = np.random.multivariate_normal(mean, cov, 200)
        
        # 执行PCA
        pca = PCA()
        X_pca = pca.fit_transform(X)
        
        # 可视化
        fig, axes = plt.subplots(1, 3, figsize=(15, 5))
        
        # 原始数据
        axes[0].scatter(X[:, 0], X[:, 1], alpha=0.7)
        axes[0].set_title('原始数据')
        axes[0].set_xlabel('特征1')
        axes[0].set_ylabel('特征2')
        axes[0].grid(True, alpha=0.3)
        axes[0].axis('equal')
        
        # 原始数据 + 主成分
        axes[1].scatter(X[:, 0], X[:, 1], alpha=0.7)
        
        # 绘制主成分方向
        mean_point = np.mean(X, axis=0)
        for i, (component, var) in enumerate(zip(pca.components_, pca.explained_variance_)):
            axes[1].arrow(mean_point[0], mean_point[1], 
                         component[0] * np.sqrt(var) * 3, 
                         component[1] * np.sqrt(var) * 3,
                         head_width=0.2, head_length=0.3, 
                         fc=f'C{i}', ec=f'C{i}', linewidth=3,
                         label=f'PC{i+1} ({pca.explained_variance_ratio_[i]:.1%})')
        
        axes[1].set_title('原始数据 + 主成分')
        axes[1].set_xlabel('特征1')
        axes[1].set_ylabel('特征2')
        axes[1].legend()
        axes[1].grid(True, alpha=0.3)
        axes[1].axis('equal')
        
        # 变换后的数据
        axes[2].scatter(X_pca[:, 0], X_pca[:, 1], alpha=0.7)
        axes[2].set_title('PCA变换后的数据')
        axes[2].set_xlabel(f'PC1 ({pca.explained_variance_ratio_[0]:.1%})')
        axes[2].set_ylabel(f'PC2 ({pca.explained_variance_ratio_[1]:.1%})')
        axes[2].grid(True, alpha=0.3)
        axes[2].axis('equal')
        
        plt.tight_layout()
        plt.show()
        
        print("PCA结果:")
        print(f"解释方差比: {pca.explained_variance_ratio_}")
        print(f"累积解释方差比: {np.cumsum(pca.explained_variance_ratio_)}")
        print(f"主成分:")
        for i, component in enumerate(pca.components_):
            print(f"  PC{i+1}: {component}")
        
        return pca, X, X_pca
    
    def analyze_high_dimensional_data(self):
        """
        分析高维数据的PCA
        """
        # 创建高维数据
        from sklearn.datasets import load_digits
        digits = load_digits()
        X, y = digits.data, digits.target
        
        print(f"原始数据形状: {X.shape}")
        print(f"类别数量: {len(np.unique(y))}")
        
        # 执行PCA
        pca = PCA()
        X_pca = pca.fit_transform(X)
        
        # 分析解释方差
        cumsum_var_ratio = np.cumsum(pca.explained_variance_ratio_)
        
        # 可视化
        fig, axes = plt.subplots(2, 3, figsize=(15, 10))
        
        # 解释方差比
        axes[0, 0].plot(pca.explained_variance_ratio_[:20], 'o-')
        axes[0, 0].set_title('前20个主成分的解释方差比')
        axes[0, 0].set_xlabel('主成分')
        axes[0, 0].set_ylabel('解释方差比')
        axes[0, 0].grid(True, alpha=0.3)
        
        # 累积解释方差比
        axes[0, 1].plot(cumsum_var_ratio, 'o-')
        axes[0, 1].axhline(y=0.95, color='red', linestyle='--', label='95%')
        axes[0, 1].axhline(y=0.99, color='orange', linestyle='--', label='99%')
        axes[0, 1].set_title('累积解释方差比')
        axes[0, 1].set_xlabel('主成分数量')
        axes[0, 1].set_ylabel('累积解释方差比')
        axes[0, 1].legend()
        axes[0, 1].grid(True, alpha=0.3)
        
        # 找到解释95%方差的主成分数量
        n_components_95 = np.argmax(cumsum_var_ratio >= 0.95) + 1
        print(f"解释95%方差需要的主成分数量: {n_components_95}")
        
        # 2D可视化(前两个主成分)
        scatter = axes[0, 2].scatter(X_pca[:, 0], X_pca[:, 1], c=y, cmap='tab10', alpha=0.7)
        axes[0, 2].set_title('前两个主成分的2D可视化')
        axes[0, 2].set_xlabel(f'PC1 ({pca.explained_variance_ratio_[0]:.1%})')
        axes[0, 2].set_ylabel(f'PC2 ({pca.explained_variance_ratio_[1]:.1%})')
        plt.colorbar(scatter, ax=axes[0, 2])
        
        # 不同维度的重构误差
        dimensions = [2, 5, 10, 20, 30, 50]
        reconstruction_errors = []
        
        for n_comp in dimensions:
            pca_temp = PCA(n_components=n_comp)
            X_reduced = pca_temp.fit_transform(X)
            X_reconstructed = pca_temp.inverse_transform(X_reduced)
            error = np.mean((X - X_reconstructed) ** 2)
            reconstruction_errors.append(error)
        
        axes[1, 0].plot(dimensions, reconstruction_errors, 'o-')
        axes[1, 0].set_title('重构误差 vs 主成分数量')
        axes[1, 0].set_xlabel('主成分数量')
        axes[1, 0].set_ylabel('重构误差 (MSE)')
        axes[1, 0].grid(True, alpha=0.3)
        
        # 显示原始图像和重构图像
        pca_2d = PCA(n_components=2)
        pca_10d = PCA(n_components=10)
        pca_30d = PCA(n_components=30)
        
        X_2d = pca_2d.fit_transform(X)
        X_10d = pca_10d.fit_transform(X)
        X_30d = pca_30d.fit_transform(X)
        
        X_reconstructed_2d = pca_2d.inverse_transform(X_2d)
        X_reconstructed_10d = pca_10d.inverse_transform(X_10d)
        X_reconstructed_30d = pca_30d.inverse_transform(X_30d)
        
        # 选择一个样本进行可视化
        sample_idx = 0
        images = [
            X[sample_idx].reshape(8, 8),
            X_reconstructed_2d[sample_idx].reshape(8, 8),
            X_reconstructed_10d[sample_idx].reshape(8, 8),
            X_reconstructed_30d[sample_idx].reshape(8, 8)
        ]
        titles = ['原始', '2D重构', '10D重构', '30D重构']
        
        for i, (img, title) in enumerate(zip(images, titles)):
            if i == 0:
                ax = axes[1, 1]
            elif i == 1:
                ax = axes[1, 2]
            elif i == 2:
                # 创建新的子图
                ax = fig.add_subplot(2, 5, 9)
            else:
                ax = fig.add_subplot(2, 5, 10)
            
            ax.imshow(img, cmap='gray')
            ax.set_title(title)
            ax.axis('off')
        
        plt.tight_layout()
        plt.show()
        
        return pca, X, X_pca, n_components_95
    
    def compare_dimensionality_reduction_methods(self, X, y):
        """
        比较不同的降维方法
        """
        # 标准化数据
        scaler = StandardScaler()
        X_scaled = scaler.fit_transform(X)
        
        # 不同的降维方法
        methods = {
            'PCA': PCA(n_components=2),
            't-SNE': TSNE(n_components=2, random_state=42, perplexity=30),
            'MDS': MDS(n_components=2, random_state=42),
            'ICA': FastICA(n_components=2, random_state=42)
        }
        
        fig, axes = plt.subplots(2, 2, figsize=(12, 10))
        axes = axes.ravel()
        
        for i, (name, method) in enumerate(methods.items()):
            print(f"执行 {name}...")
            
            if name == 't-SNE':
                # t-SNE对大数据集较慢,使用子集
                if len(X_scaled) > 1000:
                    indices = np.random.choice(len(X_scaled), 1000, replace=False)
                    X_subset = X_scaled[indices]
                    y_subset = y[indices]
                else:
                    X_subset = X_scaled
                    y_subset = y
                
                X_reduced = method.fit_transform(X_subset)
                y_plot = y_subset
            else:
                X_reduced = method.fit_transform(X_scaled)
                y_plot = y
            
            scatter = axes[i].scatter(X_reduced[:, 0], X_reduced[:, 1], 
                                    c=y_plot, cmap='tab10', alpha=0.7)
            axes[i].set_title(f'{name}')
            axes[i].set_xlabel('成分1')
            axes[i].set_ylabel('成分2')
            
            # 添加解释方差信息(如果可用)
            if hasattr(method, 'explained_variance_ratio_'):
                var_ratio = method.explained_variance_ratio_
                axes[i].set_xlabel(f'成分1 ({var_ratio[0]:.1%})')
                axes[i].set_ylabel(f'成分2 ({var_ratio[1]:.1%})')
        
        plt.tight_layout()
        plt.show()

# PCA演示
pca_analyzer = PCAAnalyzer()

print("\nPCA降维演示:")
print("=" * 20)

# 演示PCA概念
pca_model, X_original, X_pca_result = pca_analyzer.demonstrate_pca_concepts()

# 分析高维数据
pca_digits, X_digits, X_digits_pca, n_comp_95 = pca_analyzer.analyze_high_dimensional_data()

# 比较不同降维方法
pca_analyzer.compare_dimensionality_reduction_methods(X_digits[:500], digits.target[:500])

4.3.2 t-SNE和其他非线性降维

class NonlinearDimensionalityReduction:
    """
    非线性降维技术分析
    """
    def __init__(self):
        self.methods = {}
    
    def demonstrate_tsne_parameters(self, X, y):
        """
        演示t-SNE参数的影响
        """
        # 使用数据子集以提高速度
        if len(X) > 500:
            indices = np.random.choice(len(X), 500, replace=False)
            X_subset = X[indices]
            y_subset = y[indices]
        else:
            X_subset = X
            y_subset = y
        
        # 标准化
        scaler = StandardScaler()
        X_scaled = scaler.fit_transform(X_subset)
        
        # 不同的perplexity值
        perplexity_values = [5, 15, 30, 50]
        
        fig, axes = plt.subplots(2, 2, figsize=(12, 10))
        axes = axes.ravel()
        
        for i, perplexity in enumerate(perplexity_values):
            print(f"执行 t-SNE (perplexity={perplexity})...")
            
            tsne = TSNE(n_components=2, perplexity=perplexity, 
                       random_state=42, n_iter=1000)
            X_tsne = tsne.fit_transform(X_scaled)
            
            scatter = axes[i].scatter(X_tsne[:, 0], X_tsne[:, 1], 
                                    c=y_subset, cmap='tab10', alpha=0.7)
            axes[i].set_title(f't-SNE (perplexity={perplexity})')
            axes[i].set_xlabel('t-SNE 1')
            axes[i].set_ylabel('t-SNE 2')
        
        plt.tight_layout()
        plt.show()
    
    def compare_manifold_learning_methods(self, X, y):
        """
        比较不同的流形学习方法
        """
        from sklearn.manifold import Isomap, LocallyLinearEmbedding, SpectralEmbedding
        
        # 使用数据子集
        if len(X) > 300:
            indices = np.random.choice(len(X), 300, replace=False)
            X_subset = X[indices]
            y_subset = y[indices]
        else:
            X_subset = X
            y_subset = y
        
        # 标准化
        scaler = StandardScaler()
        X_scaled = scaler.fit_transform(X_subset)
        
        # 不同的流形学习方法
        methods = {
            'Isomap': Isomap(n_components=2, n_neighbors=10),
            'LLE': LocallyLinearEmbedding(n_components=2, n_neighbors=10, random_state=42),
            'Spectral Embedding': SpectralEmbedding(n_components=2, random_state=42),
            't-SNE': TSNE(n_components=2, random_state=42, perplexity=30)
        }
        
        fig, axes = plt.subplots(2, 2, figsize=(12, 10))
        axes = axes.ravel()
        
        for i, (name, method) in enumerate(methods.items()):
            print(f"执行 {name}...")
            
            try:
                X_reduced = method.fit_transform(X_scaled)
                
                scatter = axes[i].scatter(X_reduced[:, 0], X_reduced[:, 1], 
                                        c=y_subset, cmap='tab10', alpha=0.7)
                axes[i].set_title(f'{name}')
                axes[i].set_xlabel('成分1')
                axes[i].set_ylabel('成分2')
                
            except Exception as e:
                axes[i].text(0.5, 0.5, f'{name}\n执行失败', 
                           transform=axes[i].transAxes, ha='center', va='center')
                axes[i].set_title(f'{name} (失败)')
                print(f"{name} 执行失败: {e}")
        
        plt.tight_layout()
        plt.show()

# 非线性降维演示
nonlinear_dr = NonlinearDimensionalityReduction()

print("\n非线性降维演示:")
print("=" * 20)

# 演示t-SNE参数影响
nonlinear_dr.demonstrate_tsne_parameters(X_digits, digits.target)

# 比较流形学习方法
nonlinear_dr.compare_manifold_learning_methods(X_digits, digits.target)

4.4 本章小结

4.4.1 核心内容回顾

本章详细介绍了无监督学习的主要算法:

  1. 聚类算法

    • K-means:基于质心的聚类
    • 层次聚类:构建聚类层次结构
    • DBSCAN:基于密度的聚类
  2. 降维技术

    • PCA:线性降维的经典方法
    • t-SNE:非线性降维,适合可视化
    • 其他流形学习方法

4.4.2 算法选择指南

算法类型 算法 优点 缺点 适用场景
聚类 K-means 简单快速、可扩展 需要预设k值、假设球形簇 球形分布的数据
聚类 层次聚类 不需要预设簇数、可解释性强 计算复杂度高 小数据集、需要层次结构
聚类 DBSCAN 发现任意形状簇、识别噪声 参数敏感、密度变化大时效果差 非球形簇、有噪声的数据
降维 PCA 线性、快速、可解释 只能捕获线性关系 线性相关的高维数据
降维 t-SNE 保持局部结构、可视化效果好 计算慢、参数敏感 数据可视化、非线性结构

4.4.3 实践建议

  1. 聚类分析

    • 使用多种评估指标(轮廓系数、ARI、NMI)
    • 尝试不同的聚类算法
    • 注意数据预处理和标准化
  2. 降维技术

    • 根据数据特性选择线性或非线性方法
    • 考虑计算复杂度和数据规模
    • 评估降维后的信息保留程度
  3. 参数调优

    • 使用网格搜索或贝叶斯优化
    • 结合领域知识设置参数范围
    • 使用交叉验证评估稳定性

4.5 下一章预告

下一章我们将学习模型评估与选择,包括: - 评估指标详解 - 交叉验证技术 - 偏差-方差权衡 - 模型选择策略 - 超参数优化

4.6 练习题

基础练习

  1. 实现简单的K-means算法(不使用sklearn)
  2. 手动计算PCA的主成分
  3. 比较不同聚类算法在同一数据集上的表现
  4. 分析PCA中主成分的含义

进阶练习

  1. 实现自适应的DBSCAN参数选择算法
  2. 比较PCA和t-SNE在高维数据可视化中的效果
  3. 研究不同距离度量对聚类结果的影响
  4. 实现增量式PCA算法

项目练习

  1. 客户细分项目:使用聚类算法进行客户细分分析
  2. 图像压缩项目:使用PCA实现图像压缩
  3. 文档聚类项目:对文本文档进行主题聚类
  4. 异常检测项目:结合聚类和降维进行异常检测

思考题

  1. 为什么K-means对初始化敏感?如何改进?
  2. 在什么情况下非线性降维比线性降维更有效?
  3. 如何评估聚类结果的质量?
  4. 降维会丢失哪些信息?如何权衡?