4.1 无监督学习概述
4.1.1 什么是无监督学习
无监督学习是机器学习的一个重要分支,它从没有标签的数据中发现隐藏的模式和结构。与监督学习不同,无监督学习不需要预先知道正确答案,而是让算法自己发现数据中的规律。
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.datasets import make_blobs, make_circles, make_moons
from sklearn.preprocessing import StandardScaler
from sklearn.cluster import KMeans, AgglomerativeClustering, DBSCAN
from sklearn.decomposition import PCA, FastICA
from sklearn.manifold import TSNE, MDS
from sklearn.mixture import GaussianMixture
from sklearn.metrics import silhouette_score, adjusted_rand_score, normalized_mutual_info_score
from sklearn.neighbors import NearestNeighbors
from scipy.cluster.hierarchy import dendrogram, linkage
from scipy.spatial.distance import pdist, squareform
import warnings
warnings.filterwarnings('ignore')
# 设置中文字体和图形样式
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
sns.set_style("whitegrid")
class UnsupervisedLearningDemo:
"""
无监督学习演示类
"""
def __init__(self):
self.datasets = {}
self.results = {}
def create_sample_datasets(self):
"""
创建示例数据集
"""
np.random.seed(42)
# 1. 球形聚类数据
X_blobs, y_blobs = make_blobs(n_samples=300, centers=4, n_features=2,
random_state=42, cluster_std=1.5)
# 2. 环形数据
X_circles, y_circles = make_circles(n_samples=300, noise=0.1, factor=0.3,
random_state=42)
# 3. 月牙形数据
X_moons, y_moons = make_moons(n_samples=300, noise=0.1, random_state=42)
# 4. 高斯混合数据
X_gaussian = np.vstack([
np.random.multivariate_normal([2, 2], [[1, 0.5], [0.5, 1]], 100),
np.random.multivariate_normal([6, 6], [[1, -0.5], [-0.5, 1]], 100),
np.random.multivariate_normal([2, 6], [[1, 0], [0, 1]], 100)
])
y_gaussian = np.array([0]*100 + [1]*100 + [2]*100)
self.datasets = {
'blobs': (X_blobs, y_blobs),
'circles': (X_circles, y_circles),
'moons': (X_moons, y_moons),
'gaussian': (X_gaussian, y_gaussian)
}
return self.datasets
def visualize_datasets(self):
"""
可视化数据集
"""
fig, axes = plt.subplots(2, 2, figsize=(12, 10))
axes = axes.ravel()
titles = ['球形聚类数据', '环形数据', '月牙形数据', '高斯混合数据']
dataset_names = ['blobs', 'circles', 'moons', 'gaussian']
for i, (name, title) in enumerate(zip(dataset_names, titles)):
X, y = self.datasets[name]
scatter = axes[i].scatter(X[:, 0], X[:, 1], c=y, cmap='viridis', alpha=0.7)
axes[i].set_title(title)
axes[i].set_xlabel('特征1')
axes[i].set_ylabel('特征2')
plt.colorbar(scatter, ax=axes[i])
plt.tight_layout()
plt.show()
print("无监督学习的主要任务:")
print("1. 聚类分析 - 发现数据中的群组结构")
print("2. 降维 - 减少数据维度,保留重要信息")
print("3. 密度估计 - 估计数据的概率分布")
print("4. 异常检测 - 识别异常或离群点")
print("5. 关联规则挖掘 - 发现变量间的关联关系")
# 创建演示实例
unsupervised_demo = UnsupervisedLearningDemo()
datasets = unsupervised_demo.create_sample_datasets()
unsupervised_demo.visualize_datasets()
4.1.2 无监督学习的类型
无监督学习主要包括以下几种类型:
- 聚类分析:将相似的数据点分组
- 降维:减少数据的维度
- 密度估计:估计数据的概率分布
- 异常检测:识别异常数据点
- 关联规则挖掘:发现数据项之间的关联
4.2 聚类算法
4.2.1 K-means聚类
K-means是最常用的聚类算法之一,通过迭代优化将数据分为k个簇。
class KMeansAnalyzer:
"""
K-means聚类分析器
"""
def __init__(self):
self.models = {}
self.results = {}
def demonstrate_kmeans_process(self):
"""
演示K-means算法过程
"""
# 使用简单的2D数据
X, _ = make_blobs(n_samples=100, centers=3, n_features=2,
random_state=42, cluster_std=1.5)
# 手动实现K-means的一次迭代
k = 3
# 随机初始化质心
np.random.seed(42)
centroids = X[np.random.choice(X.shape[0], k, replace=False)]
fig, axes = plt.subplots(2, 3, figsize=(15, 10))
for iteration in range(6):
ax = axes[iteration // 3, iteration % 3]
if iteration % 2 == 0: # 分配步骤
# 计算每个点到质心的距离
distances = np.sqrt(((X - centroids[:, np.newaxis])**2).sum(axis=2))
labels = np.argmin(distances, axis=0)
# 绘制数据点和质心
colors = ['red', 'blue', 'green']
for i in range(k):
mask = labels == i
ax.scatter(X[mask, 0], X[mask, 1], c=colors[i], alpha=0.7, s=50)
ax.scatter(centroids[:, 0], centroids[:, 1], c='black',
marker='x', s=200, linewidth=3, label='质心')
ax.set_title(f'迭代 {iteration//2 + 1}: 分配步骤')
else: # 更新步骤
# 更新质心
new_centroids = np.array([X[labels == i].mean(axis=0) for i in range(k)])
# 绘制旧质心和新质心
for i in range(k):
mask = labels == i
ax.scatter(X[mask, 0], X[mask, 1], c=colors[i], alpha=0.7, s=50)
ax.scatter(centroids[:, 0], centroids[:, 1], c='black',
marker='x', s=200, linewidth=3, label='旧质心')
ax.scatter(new_centroids[:, 0], new_centroids[:, 1], c='red',
marker='o', s=200, linewidth=3, label='新质心')
# 绘制质心移动轨迹
for i in range(k):
ax.arrow(centroids[i, 0], centroids[i, 1],
new_centroids[i, 0] - centroids[i, 0],
new_centroids[i, 1] - centroids[i, 1],
head_width=0.1, head_length=0.1, fc='red', ec='red')
centroids = new_centroids
ax.set_title(f'迭代 {iteration//2 + 1}: 更新步骤')
ax.set_xlabel('特征1')
ax.set_ylabel('特征2')
ax.legend()
ax.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
def find_optimal_k(self, X, max_k=10):
"""
使用肘部法则和轮廓系数寻找最优k值
"""
k_range = range(2, max_k + 1)
inertias = []
silhouette_scores = []
for k in k_range:
kmeans = KMeans(n_clusters=k, random_state=42, n_init=10)
labels = kmeans.fit_predict(X)
inertias.append(kmeans.inertia_)
silhouette_scores.append(silhouette_score(X, labels))
# 可视化
fig, axes = plt.subplots(1, 2, figsize=(12, 5))
# 肘部法则
axes[0].plot(k_range, inertias, 'o-', linewidth=2, markersize=6)
axes[0].set_xlabel('聚类数量 (k)')
axes[0].set_ylabel('簇内平方和 (WCSS)')
axes[0].set_title('肘部法则')
axes[0].grid(True, alpha=0.3)
# 轮廓系数
axes[1].plot(k_range, silhouette_scores, 'o-', linewidth=2, markersize=6, color='orange')
best_k = k_range[np.argmax(silhouette_scores)]
axes[1].axvline(x=best_k, color='red', linestyle='--', alpha=0.7,
label=f'最优k={best_k}')
axes[1].set_xlabel('聚类数量 (k)')
axes[1].set_ylabel('轮廓系数')
axes[1].set_title('轮廓系数法')
axes[1].legend()
axes[1].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
print(f"最优k值(基于轮廓系数): {best_k}")
print(f"最佳轮廓系数: {max(silhouette_scores):.4f}")
return best_k, inertias, silhouette_scores
def compare_kmeans_variants(self, X, k=3):
"""
比较不同的K-means变体
"""
models = {
'K-means': KMeans(n_clusters=k, random_state=42, n_init=10),
'K-means++': KMeans(n_clusters=k, init='k-means++', random_state=42, n_init=10),
'Mini-batch K-means': KMeans(n_clusters=k, random_state=42, n_init=10, algorithm='auto')
}
fig, axes = plt.subplots(1, 3, figsize=(15, 5))
for i, (name, model) in enumerate(models.items()):
labels = model.fit_predict(X)
centroids = model.cluster_centers_
# 绘制聚类结果
colors = ['red', 'blue', 'green', 'purple', 'orange']
for j in range(k):
mask = labels == j
axes[i].scatter(X[mask, 0], X[mask, 1], c=colors[j], alpha=0.7, s=50)
axes[i].scatter(centroids[:, 0], centroids[:, 1], c='black',
marker='x', s=200, linewidth=3)
# 计算评估指标
inertia = model.inertia_
silhouette = silhouette_score(X, labels)
axes[i].set_title(f'{name}\n惯性: {inertia:.2f}, 轮廓: {silhouette:.3f}')
axes[i].set_xlabel('特征1')
axes[i].set_ylabel('特征2')
axes[i].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
return models
def analyze_different_datasets(self, datasets):
"""
在不同数据集上分析K-means性能
"""
fig, axes = plt.subplots(2, 4, figsize=(16, 8))
dataset_names = ['blobs', 'circles', 'moons', 'gaussian']
for i, name in enumerate(dataset_names):
X, y_true = datasets[name]
# 原始数据
axes[0, i].scatter(X[:, 0], X[:, 1], c=y_true, cmap='viridis', alpha=0.7)
axes[0, i].set_title(f'真实标签 - {name}')
axes[0, i].set_xlabel('特征1')
axes[0, i].set_ylabel('特征2')
# K-means聚类结果
k = len(np.unique(y_true))
kmeans = KMeans(n_clusters=k, random_state=42, n_init=10)
y_pred = kmeans.fit_predict(X)
axes[1, i].scatter(X[:, 0], X[:, 1], c=y_pred, cmap='viridis', alpha=0.7)
axes[1, i].scatter(kmeans.cluster_centers_[:, 0], kmeans.cluster_centers_[:, 1],
c='red', marker='x', s=200, linewidth=3)
# 计算评估指标
ari = adjusted_rand_score(y_true, y_pred)
nmi = normalized_mutual_info_score(y_true, y_pred)
silhouette = silhouette_score(X, y_pred)
axes[1, i].set_title(f'K-means - {name}\nARI: {ari:.3f}, NMI: {nmi:.3f}, Sil: {silhouette:.3f}')
axes[1, i].set_xlabel('特征1')
axes[1, i].set_ylabel('特征2')
plt.tight_layout()
plt.show()
# K-means演示
kmeans_analyzer = KMeansAnalyzer()
print("K-means算法演示:")
print("=" * 20)
# 演示K-means过程
kmeans_analyzer.demonstrate_kmeans_process()
# 寻找最优k值
X_blobs, _ = datasets['blobs']
best_k, inertias, silhouette_scores = kmeans_analyzer.find_optimal_k(X_blobs, max_k=8)
# 比较K-means变体
kmeans_models = kmeans_analyzer.compare_kmeans_variants(X_blobs, k=4)
# 在不同数据集上分析性能
kmeans_analyzer.analyze_different_datasets(datasets)
4.2.2 层次聚类
层次聚类通过构建聚类的层次结构来进行聚类,分为凝聚式和分裂式两种。
class HierarchicalClusteringAnalyzer:
"""
层次聚类分析器
"""
def __init__(self):
self.models = {}
self.linkage_matrices = {}
def demonstrate_hierarchical_process(self):
"""
演示层次聚类过程
"""
# 创建简单数据
np.random.seed(42)
X = np.array([[1, 2], [1.5, 1.8], [5, 8], [8, 8], [1, 0.6], [9, 11]])
labels = ['A', 'B', 'C', 'D', 'E', 'F']
# 计算距离矩阵
distance_matrix = pdist(X, metric='euclidean')
# 不同链接方法
linkage_methods = ['single', 'complete', 'average', 'ward']
fig, axes = plt.subplots(2, 4, figsize=(16, 8))
for i, method in enumerate(linkage_methods):
# 计算链接矩阵
Z = linkage(distance_matrix, method=method)
# 绘制数据点
axes[0, i].scatter(X[:, 0], X[:, 1], s=100, alpha=0.7)
for j, label in enumerate(labels):
axes[0, i].annotate(label, (X[j, 0], X[j, 1]),
xytext=(5, 5), textcoords='offset points')
axes[0, i].set_title(f'数据点 - {method}')
axes[0, i].set_xlabel('特征1')
axes[0, i].set_ylabel('特征2')
axes[0, i].grid(True, alpha=0.3)
# 绘制树状图
dendrogram(Z, labels=labels, ax=axes[1, i])
axes[1, i].set_title(f'树状图 - {method}')
axes[1, i].set_xlabel('样本')
axes[1, i].set_ylabel('距离')
plt.tight_layout()
plt.show()
return Z
def compare_linkage_methods(self, X, n_clusters=3):
"""
比较不同的链接方法
"""
linkage_methods = ['single', 'complete', 'average', 'ward']
fig, axes = plt.subplots(2, 4, figsize=(16, 8))
for i, method in enumerate(linkage_methods):
# 层次聚类
hierarchical = AgglomerativeClustering(n_clusters=n_clusters, linkage=method)
labels = hierarchical.fit_predict(X)
# 绘制聚类结果
colors = ['red', 'blue', 'green', 'purple', 'orange']
for j in range(n_clusters):
mask = labels == j
axes[0, i].scatter(X[mask, 0], X[mask, 1], c=colors[j], alpha=0.7, s=50)
axes[0, i].set_title(f'聚类结果 - {method}')
axes[0, i].set_xlabel('特征1')
axes[0, i].set_ylabel('特征2')
axes[0, i].grid(True, alpha=0.3)
# 计算并绘制树状图
distance_matrix = pdist(X, metric='euclidean')
Z = linkage(distance_matrix, method=method)
dendrogram(Z, ax=axes[1, i], truncate_mode='level', p=5)
axes[1, i].set_title(f'树状图 - {method}')
axes[1, i].set_xlabel('样本索引')
axes[1, i].set_ylabel('距离')
# 计算轮廓系数
silhouette = silhouette_score(X, labels)
axes[0, i].text(0.02, 0.98, f'轮廓系数: {silhouette:.3f}',
transform=axes[0, i].transAxes, verticalalignment='top',
bbox=dict(boxstyle='round', facecolor='white', alpha=0.8))
plt.tight_layout()
plt.show()
def determine_optimal_clusters(self, X, max_clusters=10):
"""
确定最优聚类数量
"""
# 计算距离矩阵和链接矩阵
distance_matrix = pdist(X, metric='euclidean')
Z = linkage(distance_matrix, method='ward')
# 计算不同聚类数的轮廓系数
cluster_range = range(2, max_clusters + 1)
silhouette_scores = []
for n_clusters in cluster_range:
hierarchical = AgglomerativeClustering(n_clusters=n_clusters, linkage='ward')
labels = hierarchical.fit_predict(X)
silhouette_scores.append(silhouette_score(X, labels))
# 可视化
fig, axes = plt.subplots(1, 2, figsize=(12, 5))
# 树状图
dendrogram(Z, ax=axes[0])
axes[0].set_title('完整树状图')
axes[0].set_xlabel('样本索引')
axes[0].set_ylabel('距离')
# 轮廓系数
axes[1].plot(cluster_range, silhouette_scores, 'o-', linewidth=2, markersize=6)
best_n = cluster_range[np.argmax(silhouette_scores)]
axes[1].axvline(x=best_n, color='red', linestyle='--', alpha=0.7,
label=f'最优聚类数={best_n}')
axes[1].set_xlabel('聚类数量')
axes[1].set_ylabel('轮廓系数')
axes[1].set_title('聚类数量选择')
axes[1].legend()
axes[1].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
print(f"最优聚类数量: {best_n}")
print(f"最佳轮廓系数: {max(silhouette_scores):.4f}")
return best_n, silhouette_scores
# 层次聚类演示
hierarchical_analyzer = HierarchicalClusteringAnalyzer()
print("\n层次聚类演示:")
print("=" * 20)
# 演示层次聚类过程
Z = hierarchical_analyzer.demonstrate_hierarchical_process()
# 比较不同链接方法
hierarchical_analyzer.compare_linkage_methods(X_blobs, n_clusters=4)
# 确定最优聚类数量
best_n_clusters, silhouette_scores = hierarchical_analyzer.determine_optimal_clusters(X_blobs, max_clusters=8)
4.2.3 DBSCAN聚类
DBSCAN(Density-Based Spatial Clustering of Applications with Noise)是一种基于密度的聚类算法,能够发现任意形状的聚类并识别噪声点。
class DBSCANAnalyzer:
"""
DBSCAN聚类分析器
"""
def __init__(self):
self.models = {}
def demonstrate_dbscan_concepts(self):
"""
演示DBSCAN核心概念
"""
# 创建包含噪声的数据
np.random.seed(42)
# 主要聚类
cluster1 = np.random.multivariate_normal([2, 2], [[0.5, 0], [0, 0.5]], 50)
cluster2 = np.random.multivariate_normal([6, 6], [[0.5, 0], [0, 0.5]], 50)
# 噪声点
noise = np.random.uniform(0, 8, (20, 2))
X = np.vstack([cluster1, cluster2, noise])
# 不同参数的DBSCAN
eps_values = [0.5, 1.0, 1.5, 2.0]
min_samples_values = [3, 5, 10, 15]
fig, axes = plt.subplots(2, 4, figsize=(16, 8))
# 不同eps值
for i, eps in enumerate(eps_values):
dbscan = DBSCAN(eps=eps, min_samples=5)
labels = dbscan.fit_predict(X)
# 绘制聚类结果
unique_labels = set(labels)
colors = plt.cm.Spectral(np.linspace(0, 1, len(unique_labels)))
for k, col in zip(unique_labels, colors):
if k == -1:
# 噪声点用黑色表示
col = 'black'
marker = 'x'
alpha = 0.5
label = '噪声'
else:
marker = 'o'
alpha = 0.7
label = f'簇{k}'
class_member_mask = (labels == k)
xy = X[class_member_mask]
axes[0, i].scatter(xy[:, 0], xy[:, 1], c=[col], marker=marker,
alpha=alpha, s=50, label=label if k <= 1 or k == -1 else "")
n_clusters = len(set(labels)) - (1 if -1 in labels else 0)
n_noise = list(labels).count(-1)
axes[0, i].set_title(f'eps={eps}\n簇数: {n_clusters}, 噪声: {n_noise}')
axes[0, i].set_xlabel('特征1')
axes[0, i].set_ylabel('特征2')
axes[0, i].legend()
axes[0, i].grid(True, alpha=0.3)
# 不同min_samples值
for i, min_samples in enumerate(min_samples_values):
dbscan = DBSCAN(eps=1.0, min_samples=min_samples)
labels = dbscan.fit_predict(X)
# 绘制聚类结果
unique_labels = set(labels)
colors = plt.cm.Spectral(np.linspace(0, 1, len(unique_labels)))
for k, col in zip(unique_labels, colors):
if k == -1:
col = 'black'
marker = 'x'
alpha = 0.5
label = '噪声'
else:
marker = 'o'
alpha = 0.7
label = f'簇{k}'
class_member_mask = (labels == k)
xy = X[class_member_mask]
axes[1, i].scatter(xy[:, 0], xy[:, 1], c=[col], marker=marker,
alpha=alpha, s=50, label=label if k <= 1 or k == -1 else "")
n_clusters = len(set(labels)) - (1 if -1 in labels else 0)
n_noise = list(labels).count(-1)
axes[1, i].set_title(f'min_samples={min_samples}\n簇数: {n_clusters}, 噪声: {n_noise}')
axes[1, i].set_xlabel('特征1')
axes[1, i].set_ylabel('特征2')
axes[1, i].legend()
axes[1, i].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
return X
def find_optimal_parameters(self, X):
"""
寻找最优DBSCAN参数
"""
# 使用k-距离图确定eps
k = 4 # min_samples - 1
neighbors = NearestNeighbors(n_neighbors=k)
neighbors_fit = neighbors.fit(X)
distances, indices = neighbors_fit.kneighbors(X)
# 排序距离
distances = np.sort(distances[:, k-1], axis=0)
# 绘制k-距离图
plt.figure(figsize=(10, 6))
plt.plot(distances)
plt.xlabel('数据点(按距离排序)')
plt.ylabel(f'{k}-距离')
plt.title('k-距离图(用于确定eps)')
plt.grid(True, alpha=0.3)
# 寻找拐点(简单方法:最大二阶导数)
if len(distances) > 10:
second_derivative = np.diff(distances, 2)
knee_point = np.argmax(second_derivative) + 2
optimal_eps = distances[knee_point]
plt.axhline(y=optimal_eps, color='red', linestyle='--',
label=f'建议eps={optimal_eps:.3f}')
plt.legend()
plt.show()
# 网格搜索最优参数
eps_range = np.arange(0.1, 2.0, 0.1)
min_samples_range = range(3, 15)
best_score = -1
best_params = {}
results = []
for eps in eps_range:
for min_samples in min_samples_range:
dbscan = DBSCAN(eps=eps, min_samples=min_samples)
labels = dbscan.fit_predict(X)
# 跳过只有噪声或只有一个簇的情况
n_clusters = len(set(labels)) - (1 if -1 in labels else 0)
if n_clusters < 2:
continue
# 计算轮廓系数(排除噪声点)
if len(set(labels)) > 1 and -1 not in labels:
score = silhouette_score(X, labels)
elif len(set(labels)) > 2: # 有噪声点但也有多个簇
mask = labels != -1
if np.sum(mask) > 1 and len(set(labels[mask])) > 1:
score = silhouette_score(X[mask], labels[mask])
else:
continue
else:
continue
results.append({
'eps': eps,
'min_samples': min_samples,
'n_clusters': n_clusters,
'n_noise': list(labels).count(-1),
'silhouette_score': score
})
if score > best_score:
best_score = score
best_params = {'eps': eps, 'min_samples': min_samples}
# 可视化参数搜索结果
if results:
results_df = pd.DataFrame(results)
fig, axes = plt.subplots(1, 2, figsize=(12, 5))
# 热力图:轮廓系数
pivot_table = results_df.pivot_table(values='silhouette_score',
index='min_samples',
columns='eps',
aggfunc='mean')
sns.heatmap(pivot_table, annot=True, fmt='.3f', cmap='viridis', ax=axes[0])
axes[0].set_title('轮廓系数热力图')
axes[0].set_xlabel('eps')
axes[0].set_ylabel('min_samples')
# 散点图:聚类数vs轮廓系数
scatter = axes[1].scatter(results_df['n_clusters'], results_df['silhouette_score'],
c=results_df['eps'], cmap='viridis', alpha=0.7)
axes[1].set_xlabel('聚类数量')
axes[1].set_ylabel('轮廓系数')
axes[1].set_title('聚类数量 vs 轮廓系数')
plt.colorbar(scatter, ax=axes[1], label='eps')
plt.tight_layout()
plt.show()
print(f"最优参数: eps={best_params['eps']}, min_samples={best_params['min_samples']}")
print(f"最佳轮廓系数: {best_score:.4f}")
return best_params if results else {'eps': 0.5, 'min_samples': 5}
def compare_clustering_algorithms(self, datasets):
"""
比较不同聚类算法
"""
algorithms = {
'K-means': lambda X, n: KMeans(n_clusters=n, random_state=42, n_init=10).fit_predict(X),
'Hierarchical': lambda X, n: AgglomerativeClustering(n_clusters=n).fit_predict(X),
'DBSCAN': lambda X, n: DBSCAN(eps=0.5, min_samples=5).fit_predict(X)
}
fig, axes = plt.subplots(4, 4, figsize=(16, 16))
dataset_names = ['blobs', 'circles', 'moons', 'gaussian']
for i, dataset_name in enumerate(dataset_names):
X, y_true = datasets[dataset_name]
n_clusters = len(np.unique(y_true))
# 原始数据
axes[i, 0].scatter(X[:, 0], X[:, 1], c=y_true, cmap='viridis', alpha=0.7)
axes[i, 0].set_title(f'真实标签 - {dataset_name}')
axes[i, 0].set_ylabel('特征2')
for j, (alg_name, algorithm) in enumerate(algorithms.items()):
y_pred = algorithm(X, n_clusters)
# 处理DBSCAN的噪声点
if alg_name == 'DBSCAN':
unique_labels = set(y_pred)
colors = plt.cm.Spectral(np.linspace(0, 1, len(unique_labels)))
for k, col in zip(unique_labels, colors):
if k == -1:
col = 'black'
marker = 'x'
alpha = 0.5
else:
marker = 'o'
alpha = 0.7
class_member_mask = (y_pred == k)
xy = X[class_member_mask]
axes[i, j+1].scatter(xy[:, 0], xy[:, 1], c=[col],
marker=marker, alpha=alpha, s=30)
else:
axes[i, j+1].scatter(X[:, 0], X[:, 1], c=y_pred, cmap='viridis', alpha=0.7)
# 计算评估指标
if alg_name == 'DBSCAN' and -1 in y_pred:
# 对于DBSCAN,排除噪声点计算ARI
mask = y_pred != -1
if np.sum(mask) > 0 and len(set(y_pred[mask])) > 1:
ari = adjusted_rand_score(y_true[mask], y_pred[mask])
else:
ari = 0
else:
ari = adjusted_rand_score(y_true, y_pred)
axes[i, j+1].set_title(f'{alg_name}\nARI: {ari:.3f}')
if i == 3: # 最后一行
axes[i, j+1].set_xlabel('特征1')
if j == 0: # 第一列
axes[i, j+1].set_ylabel('特征2')
plt.tight_layout()
plt.show()
# DBSCAN演示
dbscan_analyzer = DBSCANAnalyzer()
print("\nDBSCAN聚类演示:")
print("=" * 20)
# 演示DBSCAN概念
X_dbscan = dbscan_analyzer.demonstrate_dbscan_concepts()
# 寻找最优参数
best_dbscan_params = dbscan_analyzer.find_optimal_parameters(X_blobs)
# 比较不同聚类算法
dbscan_analyzer.compare_clustering_algorithms(datasets)
4.3 降维技术
4.3.1 主成分分析 (PCA)
PCA是最常用的线性降维技术,通过寻找数据的主要变化方向来减少维度。
class PCAAnalyzer:
"""
PCA分析器
"""
def __init__(self):
self.models = {}
self.results = {}
def demonstrate_pca_concepts(self):
"""
演示PCA核心概念
"""
# 创建相关的2D数据
np.random.seed(42)
mean = [0, 0]
cov = [[3, 1.5], [1.5, 1]]
X = np.random.multivariate_normal(mean, cov, 200)
# 执行PCA
pca = PCA()
X_pca = pca.fit_transform(X)
# 可视化
fig, axes = plt.subplots(1, 3, figsize=(15, 5))
# 原始数据
axes[0].scatter(X[:, 0], X[:, 1], alpha=0.7)
axes[0].set_title('原始数据')
axes[0].set_xlabel('特征1')
axes[0].set_ylabel('特征2')
axes[0].grid(True, alpha=0.3)
axes[0].axis('equal')
# 原始数据 + 主成分
axes[1].scatter(X[:, 0], X[:, 1], alpha=0.7)
# 绘制主成分方向
mean_point = np.mean(X, axis=0)
for i, (component, var) in enumerate(zip(pca.components_, pca.explained_variance_)):
axes[1].arrow(mean_point[0], mean_point[1],
component[0] * np.sqrt(var) * 3,
component[1] * np.sqrt(var) * 3,
head_width=0.2, head_length=0.3,
fc=f'C{i}', ec=f'C{i}', linewidth=3,
label=f'PC{i+1} ({pca.explained_variance_ratio_[i]:.1%})')
axes[1].set_title('原始数据 + 主成分')
axes[1].set_xlabel('特征1')
axes[1].set_ylabel('特征2')
axes[1].legend()
axes[1].grid(True, alpha=0.3)
axes[1].axis('equal')
# 变换后的数据
axes[2].scatter(X_pca[:, 0], X_pca[:, 1], alpha=0.7)
axes[2].set_title('PCA变换后的数据')
axes[2].set_xlabel(f'PC1 ({pca.explained_variance_ratio_[0]:.1%})')
axes[2].set_ylabel(f'PC2 ({pca.explained_variance_ratio_[1]:.1%})')
axes[2].grid(True, alpha=0.3)
axes[2].axis('equal')
plt.tight_layout()
plt.show()
print("PCA结果:")
print(f"解释方差比: {pca.explained_variance_ratio_}")
print(f"累积解释方差比: {np.cumsum(pca.explained_variance_ratio_)}")
print(f"主成分:")
for i, component in enumerate(pca.components_):
print(f" PC{i+1}: {component}")
return pca, X, X_pca
def analyze_high_dimensional_data(self):
"""
分析高维数据的PCA
"""
# 创建高维数据
from sklearn.datasets import load_digits
digits = load_digits()
X, y = digits.data, digits.target
print(f"原始数据形状: {X.shape}")
print(f"类别数量: {len(np.unique(y))}")
# 执行PCA
pca = PCA()
X_pca = pca.fit_transform(X)
# 分析解释方差
cumsum_var_ratio = np.cumsum(pca.explained_variance_ratio_)
# 可视化
fig, axes = plt.subplots(2, 3, figsize=(15, 10))
# 解释方差比
axes[0, 0].plot(pca.explained_variance_ratio_[:20], 'o-')
axes[0, 0].set_title('前20个主成分的解释方差比')
axes[0, 0].set_xlabel('主成分')
axes[0, 0].set_ylabel('解释方差比')
axes[0, 0].grid(True, alpha=0.3)
# 累积解释方差比
axes[0, 1].plot(cumsum_var_ratio, 'o-')
axes[0, 1].axhline(y=0.95, color='red', linestyle='--', label='95%')
axes[0, 1].axhline(y=0.99, color='orange', linestyle='--', label='99%')
axes[0, 1].set_title('累积解释方差比')
axes[0, 1].set_xlabel('主成分数量')
axes[0, 1].set_ylabel('累积解释方差比')
axes[0, 1].legend()
axes[0, 1].grid(True, alpha=0.3)
# 找到解释95%方差的主成分数量
n_components_95 = np.argmax(cumsum_var_ratio >= 0.95) + 1
print(f"解释95%方差需要的主成分数量: {n_components_95}")
# 2D可视化(前两个主成分)
scatter = axes[0, 2].scatter(X_pca[:, 0], X_pca[:, 1], c=y, cmap='tab10', alpha=0.7)
axes[0, 2].set_title('前两个主成分的2D可视化')
axes[0, 2].set_xlabel(f'PC1 ({pca.explained_variance_ratio_[0]:.1%})')
axes[0, 2].set_ylabel(f'PC2 ({pca.explained_variance_ratio_[1]:.1%})')
plt.colorbar(scatter, ax=axes[0, 2])
# 不同维度的重构误差
dimensions = [2, 5, 10, 20, 30, 50]
reconstruction_errors = []
for n_comp in dimensions:
pca_temp = PCA(n_components=n_comp)
X_reduced = pca_temp.fit_transform(X)
X_reconstructed = pca_temp.inverse_transform(X_reduced)
error = np.mean((X - X_reconstructed) ** 2)
reconstruction_errors.append(error)
axes[1, 0].plot(dimensions, reconstruction_errors, 'o-')
axes[1, 0].set_title('重构误差 vs 主成分数量')
axes[1, 0].set_xlabel('主成分数量')
axes[1, 0].set_ylabel('重构误差 (MSE)')
axes[1, 0].grid(True, alpha=0.3)
# 显示原始图像和重构图像
pca_2d = PCA(n_components=2)
pca_10d = PCA(n_components=10)
pca_30d = PCA(n_components=30)
X_2d = pca_2d.fit_transform(X)
X_10d = pca_10d.fit_transform(X)
X_30d = pca_30d.fit_transform(X)
X_reconstructed_2d = pca_2d.inverse_transform(X_2d)
X_reconstructed_10d = pca_10d.inverse_transform(X_10d)
X_reconstructed_30d = pca_30d.inverse_transform(X_30d)
# 选择一个样本进行可视化
sample_idx = 0
images = [
X[sample_idx].reshape(8, 8),
X_reconstructed_2d[sample_idx].reshape(8, 8),
X_reconstructed_10d[sample_idx].reshape(8, 8),
X_reconstructed_30d[sample_idx].reshape(8, 8)
]
titles = ['原始', '2D重构', '10D重构', '30D重构']
for i, (img, title) in enumerate(zip(images, titles)):
if i == 0:
ax = axes[1, 1]
elif i == 1:
ax = axes[1, 2]
elif i == 2:
# 创建新的子图
ax = fig.add_subplot(2, 5, 9)
else:
ax = fig.add_subplot(2, 5, 10)
ax.imshow(img, cmap='gray')
ax.set_title(title)
ax.axis('off')
plt.tight_layout()
plt.show()
return pca, X, X_pca, n_components_95
def compare_dimensionality_reduction_methods(self, X, y):
"""
比较不同的降维方法
"""
# 标准化数据
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
# 不同的降维方法
methods = {
'PCA': PCA(n_components=2),
't-SNE': TSNE(n_components=2, random_state=42, perplexity=30),
'MDS': MDS(n_components=2, random_state=42),
'ICA': FastICA(n_components=2, random_state=42)
}
fig, axes = plt.subplots(2, 2, figsize=(12, 10))
axes = axes.ravel()
for i, (name, method) in enumerate(methods.items()):
print(f"执行 {name}...")
if name == 't-SNE':
# t-SNE对大数据集较慢,使用子集
if len(X_scaled) > 1000:
indices = np.random.choice(len(X_scaled), 1000, replace=False)
X_subset = X_scaled[indices]
y_subset = y[indices]
else:
X_subset = X_scaled
y_subset = y
X_reduced = method.fit_transform(X_subset)
y_plot = y_subset
else:
X_reduced = method.fit_transform(X_scaled)
y_plot = y
scatter = axes[i].scatter(X_reduced[:, 0], X_reduced[:, 1],
c=y_plot, cmap='tab10', alpha=0.7)
axes[i].set_title(f'{name}')
axes[i].set_xlabel('成分1')
axes[i].set_ylabel('成分2')
# 添加解释方差信息(如果可用)
if hasattr(method, 'explained_variance_ratio_'):
var_ratio = method.explained_variance_ratio_
axes[i].set_xlabel(f'成分1 ({var_ratio[0]:.1%})')
axes[i].set_ylabel(f'成分2 ({var_ratio[1]:.1%})')
plt.tight_layout()
plt.show()
# PCA演示
pca_analyzer = PCAAnalyzer()
print("\nPCA降维演示:")
print("=" * 20)
# 演示PCA概念
pca_model, X_original, X_pca_result = pca_analyzer.demonstrate_pca_concepts()
# 分析高维数据
pca_digits, X_digits, X_digits_pca, n_comp_95 = pca_analyzer.analyze_high_dimensional_data()
# 比较不同降维方法
pca_analyzer.compare_dimensionality_reduction_methods(X_digits[:500], digits.target[:500])
4.3.2 t-SNE和其他非线性降维
class NonlinearDimensionalityReduction:
"""
非线性降维技术分析
"""
def __init__(self):
self.methods = {}
def demonstrate_tsne_parameters(self, X, y):
"""
演示t-SNE参数的影响
"""
# 使用数据子集以提高速度
if len(X) > 500:
indices = np.random.choice(len(X), 500, replace=False)
X_subset = X[indices]
y_subset = y[indices]
else:
X_subset = X
y_subset = y
# 标准化
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X_subset)
# 不同的perplexity值
perplexity_values = [5, 15, 30, 50]
fig, axes = plt.subplots(2, 2, figsize=(12, 10))
axes = axes.ravel()
for i, perplexity in enumerate(perplexity_values):
print(f"执行 t-SNE (perplexity={perplexity})...")
tsne = TSNE(n_components=2, perplexity=perplexity,
random_state=42, n_iter=1000)
X_tsne = tsne.fit_transform(X_scaled)
scatter = axes[i].scatter(X_tsne[:, 0], X_tsne[:, 1],
c=y_subset, cmap='tab10', alpha=0.7)
axes[i].set_title(f't-SNE (perplexity={perplexity})')
axes[i].set_xlabel('t-SNE 1')
axes[i].set_ylabel('t-SNE 2')
plt.tight_layout()
plt.show()
def compare_manifold_learning_methods(self, X, y):
"""
比较不同的流形学习方法
"""
from sklearn.manifold import Isomap, LocallyLinearEmbedding, SpectralEmbedding
# 使用数据子集
if len(X) > 300:
indices = np.random.choice(len(X), 300, replace=False)
X_subset = X[indices]
y_subset = y[indices]
else:
X_subset = X
y_subset = y
# 标准化
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X_subset)
# 不同的流形学习方法
methods = {
'Isomap': Isomap(n_components=2, n_neighbors=10),
'LLE': LocallyLinearEmbedding(n_components=2, n_neighbors=10, random_state=42),
'Spectral Embedding': SpectralEmbedding(n_components=2, random_state=42),
't-SNE': TSNE(n_components=2, random_state=42, perplexity=30)
}
fig, axes = plt.subplots(2, 2, figsize=(12, 10))
axes = axes.ravel()
for i, (name, method) in enumerate(methods.items()):
print(f"执行 {name}...")
try:
X_reduced = method.fit_transform(X_scaled)
scatter = axes[i].scatter(X_reduced[:, 0], X_reduced[:, 1],
c=y_subset, cmap='tab10', alpha=0.7)
axes[i].set_title(f'{name}')
axes[i].set_xlabel('成分1')
axes[i].set_ylabel('成分2')
except Exception as e:
axes[i].text(0.5, 0.5, f'{name}\n执行失败',
transform=axes[i].transAxes, ha='center', va='center')
axes[i].set_title(f'{name} (失败)')
print(f"{name} 执行失败: {e}")
plt.tight_layout()
plt.show()
# 非线性降维演示
nonlinear_dr = NonlinearDimensionalityReduction()
print("\n非线性降维演示:")
print("=" * 20)
# 演示t-SNE参数影响
nonlinear_dr.demonstrate_tsne_parameters(X_digits, digits.target)
# 比较流形学习方法
nonlinear_dr.compare_manifold_learning_methods(X_digits, digits.target)
4.4 本章小结
4.4.1 核心内容回顾
本章详细介绍了无监督学习的主要算法:
聚类算法
- K-means:基于质心的聚类
- 层次聚类:构建聚类层次结构
- DBSCAN:基于密度的聚类
降维技术
- PCA:线性降维的经典方法
- t-SNE:非线性降维,适合可视化
- 其他流形学习方法
4.4.2 算法选择指南
算法类型 | 算法 | 优点 | 缺点 | 适用场景 |
---|---|---|---|---|
聚类 | K-means | 简单快速、可扩展 | 需要预设k值、假设球形簇 | 球形分布的数据 |
聚类 | 层次聚类 | 不需要预设簇数、可解释性强 | 计算复杂度高 | 小数据集、需要层次结构 |
聚类 | DBSCAN | 发现任意形状簇、识别噪声 | 参数敏感、密度变化大时效果差 | 非球形簇、有噪声的数据 |
降维 | PCA | 线性、快速、可解释 | 只能捕获线性关系 | 线性相关的高维数据 |
降维 | t-SNE | 保持局部结构、可视化效果好 | 计算慢、参数敏感 | 数据可视化、非线性结构 |
4.4.3 实践建议
聚类分析
- 使用多种评估指标(轮廓系数、ARI、NMI)
- 尝试不同的聚类算法
- 注意数据预处理和标准化
降维技术
- 根据数据特性选择线性或非线性方法
- 考虑计算复杂度和数据规模
- 评估降维后的信息保留程度
参数调优
- 使用网格搜索或贝叶斯优化
- 结合领域知识设置参数范围
- 使用交叉验证评估稳定性
4.5 下一章预告
下一章我们将学习模型评估与选择,包括: - 评估指标详解 - 交叉验证技术 - 偏差-方差权衡 - 模型选择策略 - 超参数优化
4.6 练习题
基础练习
- 实现简单的K-means算法(不使用sklearn)
- 手动计算PCA的主成分
- 比较不同聚类算法在同一数据集上的表现
- 分析PCA中主成分的含义
进阶练习
- 实现自适应的DBSCAN参数选择算法
- 比较PCA和t-SNE在高维数据可视化中的效果
- 研究不同距离度量对聚类结果的影响
- 实现增量式PCA算法
项目练习
- 客户细分项目:使用聚类算法进行客户细分分析
- 图像压缩项目:使用PCA实现图像压缩
- 文档聚类项目:对文本文档进行主题聚类
- 异常检测项目:结合聚类和降维进行异常检测
思考题
- 为什么K-means对初始化敏感?如何改进?
- 在什么情况下非线性降维比线性降维更有效?
- 如何评估聚类结果的质量?
- 降维会丢失哪些信息?如何权衡?