5.1 无监督学习概述
5.1.1 什么是无监督学习
无监督学习是机器学习的一个重要分支,它从没有标签的数据中发现隐藏的模式和结构。
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.datasets import make_blobs, make_circles, make_moons, load_iris, load_digits
from sklearn.cluster import KMeans, DBSCAN, AgglomerativeClustering, SpectralClustering
from sklearn.decomposition import PCA, TruncatedSVD, FastICA, NMF
from sklearn.manifold import TSNE, MDS, Isomap
from sklearn.ensemble import IsolationForest
from sklearn.svm import OneClassSVM
from sklearn.neighbors import LocalOutlierFactor
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.metrics import silhouette_score, adjusted_rand_score, normalized_mutual_info_score
from sklearn.model_selection import train_test_split
import warnings
warnings.filterwarnings('ignore')
# 设置中文字体
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
class UnsupervisedLearningIntro:
def __init__(self):
self.data_types = {}
def theory_explanation(self):
"""无监督学习理论解释"""
print("=== 无监督学习概述 ===")
print("1. 定义:从无标签数据中发现隐藏模式")
print("2. 主要任务:")
print(" - 聚类:将相似的数据点分组")
print(" - 降维:减少数据的维度")
print(" - 异常检测:识别异常或离群点")
print(" - 关联规则:发现数据间的关联关系")
print("3. 特点:")
print(" - 无需标签数据")
print(" - 探索性数据分析")
print(" - 数据预处理")
print(" - 特征学习")
def create_sample_datasets(self):
"""创建示例数据集"""
# 聚类数据
X_blobs, y_blobs = make_blobs(n_samples=300, centers=4, n_features=2,
random_state=42, cluster_std=1.0)
X_circles, y_circles = make_circles(n_samples=300, noise=0.1,
factor=0.3, random_state=42)
X_moons, y_moons = make_moons(n_samples=300, noise=0.1, random_state=42)
# 高维数据(用于降维)
iris = load_iris()
X_iris = iris.data
y_iris = iris.target
# 存储数据
self.data_types = {
'blobs': (X_blobs, y_blobs),
'circles': (X_circles, y_circles),
'moons': (X_moons, y_moons),
'iris': (X_iris, y_iris)
}
# 可视化
fig, axes = plt.subplots(2, 2, figsize=(15, 12))
fig.suptitle('无监督学习示例数据集', fontsize=16)
datasets = [('blobs', 'Blobs数据'), ('circles', 'Circles数据'),
('moons', 'Moons数据'), ('iris', 'Iris数据')]
for i, (name, title) in enumerate(datasets):
ax = axes[i//2, i%2]
X, y = self.data_types[name]
if name == 'iris':
# 对于高维数据,使用PCA降维可视化
pca = PCA(n_components=2)
X_2d = pca.fit_transform(X)
scatter = ax.scatter(X_2d[:, 0], X_2d[:, 1], c=y, cmap='viridis', alpha=0.7)
else:
scatter = ax.scatter(X[:, 0], X[:, 1], c=y, cmap='viridis', alpha=0.7)
ax.set_title(title)
ax.set_xlabel('特征1')
ax.set_ylabel('特征2')
plt.colorbar(scatter, ax=ax)
plt.tight_layout()
plt.show()
return self.data_types
# 演示无监督学习基础
print("=== 无监督学习入门 ===")
intro = UnsupervisedLearningIntro()
intro.theory_explanation()
datasets = intro.create_sample_datasets()
5.2 聚类算法
5.2.1 K-Means聚类
K-Means是最经典的聚类算法之一,通过迭代优化簇中心来实现聚类。
class KMeansDemo:
def __init__(self):
self.models = {}
def theory_explanation(self):
"""K-Means理论解释"""
print("=== K-Means聚类理论 ===")
print("1. 算法原理:")
print(" - 随机初始化k个簇中心")
print(" - 将每个点分配到最近的簇中心")
print(" - 更新簇中心为簇内点的均值")
print(" - 重复直到收敛")
print("2. 优点:简单高效、适合球形簇")
print("3. 缺点:需要预设k值、对初始化敏感、假设簇为球形")
print("4. 适用场景:数据分布相对均匀、簇大小相近")
def visualize_kmeans_process(self):
"""可视化K-Means聚类过程"""
# 生成数据
X, _ = make_blobs(n_samples=300, centers=4, n_features=2,
random_state=42, cluster_std=1.0)
# 手动实现K-Means可视化过程
k = 4
max_iters = 10
# 随机初始化簇中心
np.random.seed(42)
centroids = X[np.random.choice(X.shape[0], k, replace=False)]
fig, axes = plt.subplots(2, 5, figsize=(20, 8))
fig.suptitle('K-Means聚类过程可视化', fontsize=16)
for iteration in range(max_iters):
if iteration >= 10:
break
ax = axes[iteration//5, iteration%5]
# 计算每个点到簇中心的距离
distances = np.sqrt(((X - centroids[:, np.newaxis])**2).sum(axis=2))
labels = np.argmin(distances, axis=0)
# 绘制数据点和簇中心
colors = ['red', 'blue', 'green', 'orange']
for i in range(k):
mask = labels == i
ax.scatter(X[mask, 0], X[mask, 1], c=colors[i], alpha=0.6, s=30)
ax.scatter(centroids[i, 0], centroids[i, 1], c='black',
marker='x', s=200, linewidths=3)
ax.set_title(f'迭代 {iteration + 1}')
ax.set_xlabel('特征1')
ax.set_ylabel('特征2')
# 更新簇中心
new_centroids = np.array([X[labels == i].mean(axis=0) for i in range(k)])
# 检查收敛
if np.allclose(centroids, new_centroids):
print(f"K-Means在第{iteration + 1}次迭代后收敛")
break
centroids = new_centroids
plt.tight_layout()
plt.show()
def basic_kmeans(self):
"""基础K-Means演示"""
# 生成数据
X, y_true = make_blobs(n_samples=300, centers=4, n_features=2,
random_state=42, cluster_std=1.0)
# K-Means聚类
kmeans = KMeans(n_clusters=4, random_state=42, n_init=10)
y_pred = kmeans.fit_predict(X)
# 评估
silhouette_avg = silhouette_score(X, y_pred)
ari = adjusted_rand_score(y_true, y_pred)
nmi = normalized_mutual_info_score(y_true, y_pred)
print(f"K-Means聚类结果:")
print(f"轮廓系数: {silhouette_avg:.4f}")
print(f"调整兰德指数: {ari:.4f}")
print(f"标准化互信息: {nmi:.4f}")
# 可视化
fig, axes = plt.subplots(1, 2, figsize=(15, 6))
# 真实标签
axes[0].scatter(X[:, 0], X[:, 1], c=y_true, cmap='viridis', alpha=0.7)
axes[0].set_title('真实聚类')
axes[0].set_xlabel('特征1')
axes[0].set_ylabel('特征2')
# 预测标签
axes[1].scatter(X[:, 0], X[:, 1], c=y_pred, cmap='viridis', alpha=0.7)
axes[1].scatter(kmeans.cluster_centers_[:, 0], kmeans.cluster_centers_[:, 1],
c='red', marker='x', s=200, linewidths=3, label='簇中心')
axes[1].set_title('K-Means聚类结果')
axes[1].set_xlabel('特征1')
axes[1].set_ylabel('特征2')
axes[1].legend()
plt.tight_layout()
plt.show()
return kmeans
def optimal_k_selection(self):
"""选择最优的k值"""
# 生成数据
X, _ = make_blobs(n_samples=300, centers=4, n_features=2,
random_state=42, cluster_std=1.0)
# 测试不同的k值
k_range = range(2, 11)
inertias = []
silhouette_scores = []
for k in k_range:
kmeans = KMeans(n_clusters=k, random_state=42, n_init=10)
y_pred = kmeans.fit_predict(X)
inertias.append(kmeans.inertia_)
silhouette_scores.append(silhouette_score(X, y_pred))
# 可视化
fig, axes = plt.subplots(1, 2, figsize=(15, 6))
# 肘部法则
axes[0].plot(k_range, inertias, 'bo-')
axes[0].set_xlabel('簇数量 (k)')
axes[0].set_ylabel('簇内平方和 (Inertia)')
axes[0].set_title('肘部法则')
axes[0].grid(True, alpha=0.3)
# 轮廓系数
axes[1].plot(k_range, silhouette_scores, 'ro-')
axes[1].set_xlabel('簇数量 (k)')
axes[1].set_ylabel('轮廓系数')
axes[1].set_title('轮廓系数法')
axes[1].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# 找到最优k
optimal_k_silhouette = k_range[np.argmax(silhouette_scores)]
print(f"基于轮廓系数的最优k值: {optimal_k_silhouette}")
print(f"对应的轮廓系数: {max(silhouette_scores):.4f}")
return optimal_k_silhouette
# 演示K-Means聚类
print("=== K-Means聚类演示 ===")
kmeans_demo = KMeansDemo()
kmeans_demo.theory_explanation()
kmeans_demo.visualize_kmeans_process()
kmeans_demo.basic_kmeans()
kmeans_demo.optimal_k_selection()
5.2.2 DBSCAN聚类
DBSCAN是一种基于密度的聚类算法,能够发现任意形状的簇并识别噪声点。
class DBSCANDemo:
def __init__(self):
self.models = {}
def theory_explanation(self):
"""DBSCAN理论解释"""
print("=== DBSCAN聚类理论 ===")
print("1. 算法原理:")
print(" - 核心点:邻域内点数≥MinPts的点")
print(" - 边界点:不是核心点但在核心点邻域内")
print(" - 噪声点:既不是核心点也不是边界点")
print("2. 参数:")
print(" - eps:邻域半径")
print(" - min_samples:最小样本数")
print("3. 优点:能发现任意形状的簇、自动确定簇数、识别噪声")
print("4. 缺点:对参数敏感、密度差异大时效果差")
def visualize_dbscan_concepts(self):
"""可视化DBSCAN概念"""
# 生成数据
X, _ = make_moons(n_samples=200, noise=0.1, random_state=42)
# DBSCAN聚类
dbscan = DBSCAN(eps=0.3, min_samples=5)
y_pred = dbscan.fit_predict(X)
# 识别核心点
core_samples_mask = np.zeros_like(dbscan.labels_, dtype=bool)
core_samples_mask[dbscan.core_sample_indices_] = True
# 可视化
fig, axes = plt.subplots(1, 2, figsize=(15, 6))
# 聚类结果
unique_labels = set(y_pred)
colors = plt.cm.Spectral(np.linspace(0, 1, len(unique_labels)))
for k, col in zip(unique_labels, colors):
if k == -1:
# 噪声点用黑色表示
col = 'black'
class_member_mask = (y_pred == k)
# 核心点
xy = X[class_member_mask & core_samples_mask]
axes[0].scatter(xy[:, 0], xy[:, 1], c=[col], marker='o', s=50, alpha=0.8)
# 边界点
xy = X[class_member_mask & ~core_samples_mask]
axes[0].scatter(xy[:, 0], xy[:, 1], c=[col], marker='^', s=30, alpha=0.5)
axes[0].set_title('DBSCAN聚类结果\n(圆形=核心点, 三角形=边界点, 黑色=噪声点)')
axes[0].set_xlabel('特征1')
axes[0].set_ylabel('特征2')
# 参数影响
eps_values = [0.1, 0.2, 0.3, 0.5]
n_clusters_list = []
n_noise_list = []
for eps in eps_values:
dbscan_temp = DBSCAN(eps=eps, min_samples=5)
y_temp = dbscan_temp.fit_predict(X)
n_clusters = len(set(y_temp)) - (1 if -1 in y_temp else 0)
n_noise = list(y_temp).count(-1)
n_clusters_list.append(n_clusters)
n_noise_list.append(n_noise)
axes[1].plot(eps_values, n_clusters_list, 'bo-', label='簇数量')
axes[1].plot(eps_values, n_noise_list, 'ro-', label='噪声点数量')
axes[1].set_xlabel('eps参数')
axes[1].set_ylabel('数量')
axes[1].set_title('DBSCAN参数影响')
axes[1].legend()
axes[1].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
def compare_clustering_algorithms(self):
"""比较不同聚类算法"""
# 生成不同类型的数据
datasets = [
make_blobs(n_samples=200, centers=3, n_features=2, random_state=42),
make_circles(n_samples=200, noise=0.1, factor=0.3, random_state=42),
make_moons(n_samples=200, noise=0.1, random_state=42)
]
dataset_names = ['Blobs', 'Circles', 'Moons']
# 聚类算法
algorithms = [
('K-Means', KMeans(n_clusters=3, random_state=42)),
('DBSCAN', DBSCAN(eps=0.3, min_samples=5)),
('层次聚类', AgglomerativeClustering(n_clusters=3)),
('谱聚类', SpectralClustering(n_clusters=3, random_state=42))
]
fig, axes = plt.subplots(len(datasets), len(algorithms), figsize=(20, 15))
fig.suptitle('不同聚类算法比较', fontsize=16)
for i, ((X, y_true), dataset_name) in enumerate(zip(datasets, dataset_names)):
for j, (alg_name, algorithm) in enumerate(algorithms):
ax = axes[i, j]
# 标准化数据
X_scaled = StandardScaler().fit_transform(X)
# 聚类
if hasattr(algorithm, 'fit_predict'):
y_pred = algorithm.fit_predict(X_scaled)
else:
y_pred = algorithm.fit(X_scaled).labels_
# 可视化
scatter = ax.scatter(X[:, 0], X[:, 1], c=y_pred, cmap='viridis', alpha=0.7)
ax.set_title(f'{dataset_name} - {alg_name}')
if i == len(datasets) - 1:
ax.set_xlabel('特征1')
if j == 0:
ax.set_ylabel('特征2')
plt.tight_layout()
plt.show()
# 演示DBSCAN聚类
print("=== DBSCAN聚类演示 ===")
dbscan_demo = DBSCANDemo()
dbscan_demo.theory_explanation()
dbscan_demo.visualize_dbscan_concepts()
dbscan_demo.compare_clustering_algorithms()
5.3 降维技术
5.3.1 主成分分析(PCA)
PCA是最经典的降维技术,通过线性变换将数据投影到低维空间。
class PCADemo:
def __init__(self):
self.models = {}
def theory_explanation(self):
"""PCA理论解释"""
print("=== PCA降维理论 ===")
print("1. 算法原理:")
print(" - 找到数据方差最大的方向(主成分)")
print(" - 将数据投影到主成分构成的子空间")
print(" - 保留最重要的k个主成分")
print("2. 数学基础:")
print(" - 协方差矩阵的特征值分解")
print(" - 特征值表示方差大小")
print(" - 特征向量表示主成分方向")
print("3. 优点:降维效果好、计算简单、可解释性强")
print("4. 缺点:线性变换、假设数据呈高斯分布")
def visualize_pca_process(self):
"""可视化PCA过程"""
# 生成2D数据
np.random.seed(42)
mean = [0, 0]
cov = [[3, 1.5], [1.5, 1]]
X = np.random.multivariate_normal(mean, cov, 200)
# 执行PCA
pca = PCA()
X_pca = pca.fit_transform(X)
# 获取主成分
components = pca.components_
explained_variance = pca.explained_variance_
fig, axes = plt.subplots(2, 2, figsize=(15, 12))
fig.suptitle('PCA降维过程可视化', fontsize=16)
# 原始数据和主成分
axes[0, 0].scatter(X[:, 0], X[:, 1], alpha=0.6)
# 绘制主成分方向
mean_point = np.mean(X, axis=0)
for i, (component, variance) in enumerate(zip(components, explained_variance)):
direction = component * np.sqrt(variance) * 3
axes[0, 0].arrow(mean_point[0], mean_point[1], direction[0], direction[1],
head_width=0.1, head_length=0.1, fc=f'C{i}', ec=f'C{i}',
label=f'PC{i+1}')
axes[0, 0].set_title('原始数据和主成分方向')
axes[0, 0].set_xlabel('特征1')
axes[0, 0].set_ylabel('特征2')
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)
# 解释方差比例
explained_variance_ratio = pca.explained_variance_ratio_
axes[0, 1].bar(range(1, len(explained_variance_ratio) + 1), explained_variance_ratio)
axes[0, 1].set_title('主成分解释方差比例')
axes[0, 1].set_xlabel('主成分')
axes[0, 1].set_ylabel('解释方差比例')
axes[0, 1].grid(True, alpha=0.3)
# 变换后的数据
axes[1, 0].scatter(X_pca[:, 0], X_pca[:, 1], alpha=0.6)
axes[1, 0].set_title('PCA变换后的数据')
axes[1, 0].set_xlabel('第一主成分')
axes[1, 0].set_ylabel('第二主成分')
axes[1, 0].grid(True, alpha=0.3)
# 累积解释方差
cumsum_variance = np.cumsum(explained_variance_ratio)
axes[1, 1].plot(range(1, len(cumsum_variance) + 1), cumsum_variance, 'bo-')
axes[1, 1].axhline(y=0.95, color='r', linestyle='--', label='95%阈值')
axes[1, 1].set_title('累积解释方差')
axes[1, 1].set_xlabel('主成分数量')
axes[1, 1].set_ylabel('累积解释方差比例')
axes[1, 1].legend()
axes[1, 1].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
return pca
def iris_pca_analysis(self):
"""Iris数据集PCA分析"""
# 加载数据
iris = load_iris()
X = iris.data
y = iris.target
target_names = iris.target_names
# 标准化
X_scaled = StandardScaler().fit_transform(X)
# PCA降维
pca = PCA()
X_pca = pca.fit_transform(X_scaled)
# 分析结果
print("=== Iris数据集PCA分析 ===")
print(f"原始维度: {X.shape[1]}")
print(f"主成分解释方差比例: {pca.explained_variance_ratio_}")
print(f"累积解释方差比例: {np.cumsum(pca.explained_variance_ratio_)}")
# 可视化
fig, axes = plt.subplots(2, 2, figsize=(15, 12))
fig.suptitle('Iris数据集PCA分析', fontsize=16)
# 2D投影
colors = ['red', 'green', 'blue']
for i, (color, target_name) in enumerate(zip(colors, target_names)):
mask = y == i
axes[0, 0].scatter(X_pca[mask, 0], X_pca[mask, 1],
c=color, label=target_name, alpha=0.7)
axes[0, 0].set_title('前两个主成分')
axes[0, 0].set_xlabel(f'PC1 ({pca.explained_variance_ratio_[0]:.2%})')
axes[0, 0].set_ylabel(f'PC2 ({pca.explained_variance_ratio_[1]:.2%})')
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)
# 主成分载荷
feature_names = iris.feature_names
loadings = pca.components_[:2].T * np.sqrt(pca.explained_variance_[:2])
for i, feature in enumerate(feature_names):
axes[0, 1].arrow(0, 0, loadings[i, 0], loadings[i, 1],
head_width=0.05, head_length=0.05, fc='red', ec='red')
axes[0, 1].text(loadings[i, 0]*1.1, loadings[i, 1]*1.1, feature,
fontsize=10, ha='center', va='center')
axes[0, 1].set_title('主成分载荷图')
axes[0, 1].set_xlabel('PC1载荷')
axes[0, 1].set_ylabel('PC2载荷')
axes[0, 1].grid(True, alpha=0.3)
# 解释方差
axes[1, 0].bar(range(1, len(pca.explained_variance_ratio_) + 1),
pca.explained_variance_ratio_)
axes[1, 0].set_title('各主成分解释方差比例')
axes[1, 0].set_xlabel('主成分')
axes[1, 0].set_ylabel('解释方差比例')
axes[1, 0].grid(True, alpha=0.3)
# 累积解释方差
cumsum_variance = np.cumsum(pca.explained_variance_ratio_)
axes[1, 1].plot(range(1, len(cumsum_variance) + 1), cumsum_variance, 'bo-')
axes[1, 1].axhline(y=0.95, color='r', linestyle='--', label='95%阈值')
axes[1, 1].set_title('累积解释方差')
axes[1, 1].set_xlabel('主成分数量')
axes[1, 1].set_ylabel('累积解释方差比例')
axes[1, 1].legend()
axes[1, 1].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
return pca, X_pca
# 演示PCA降维
print("=== PCA降维演示 ===")
pca_demo = PCADemo()
pca_demo.theory_explanation()
pca_demo.visualize_pca_process()
pca_demo.iris_pca_analysis()
5.3.2 t-SNE降维
t-SNE是一种非线性降维技术,特别适合数据可视化。
class TSNEDemo:
def __init__(self):
self.models = {}
def theory_explanation(self):
"""t-SNE理论解释"""
print("=== t-SNE降维理论 ===")
print("1. 算法原理:")
print(" - 在高维空间计算点对的相似性概率")
print(" - 在低维空间寻找相似的概率分布")
print(" - 最小化两个分布的KL散度")
print("2. 特点:")
print(" - 非线性降维")
print(" - 保持局部结构")
print(" - 适合可视化")
print("3. 参数:")
print(" - perplexity:困惑度,控制局部邻域大小")
print(" - learning_rate:学习率")
print(" - n_iter:迭代次数")
def compare_dimensionality_reduction(self):
"""比较不同降维方法"""
# 加载手写数字数据
digits = load_digits()
X = digits.data
y = digits.target
# 选择部分数据以加快计算
n_samples = 1000
indices = np.random.choice(X.shape[0], n_samples, replace=False)
X_subset = X[indices]
y_subset = y[indices]
# 标准化
X_scaled = StandardScaler().fit_transform(X_subset)
# 不同降维方法
methods = {
'PCA': PCA(n_components=2, random_state=42),
't-SNE': TSNE(n_components=2, random_state=42, perplexity=30),
'MDS': MDS(n_components=2, random_state=42),
'Isomap': Isomap(n_components=2)
}
fig, axes = plt.subplots(2, 2, figsize=(15, 12))
fig.suptitle('不同降维方法比较(手写数字数据)', fontsize=16)
for i, (name, method) in enumerate(methods.items()):
ax = axes[i//2, i%2]
print(f"正在计算 {name}...")
X_reduced = method.fit_transform(X_scaled)
# 可视化
scatter = ax.scatter(X_reduced[:, 0], X_reduced[:, 1],
c=y_subset, cmap='tab10', alpha=0.7, s=20)
ax.set_title(f'{name}降维结果')
ax.set_xlabel('维度1')
ax.set_ylabel('维度2')
# 添加颜色条
if i == 1: # 只在一个子图上添加颜色条
plt.colorbar(scatter, ax=ax, label='数字类别')
plt.tight_layout()
plt.show()
def tsne_parameter_analysis(self):
"""t-SNE参数影响分析"""
# 生成数据
iris = load_iris()
X = iris.data
y = iris.target
target_names = iris.target_names
# 标准化
X_scaled = StandardScaler().fit_transform(X)
# 不同perplexity值
perplexity_values = [5, 15, 30, 50]
fig, axes = plt.subplots(2, 2, figsize=(15, 12))
fig.suptitle('t-SNE参数影响分析(Iris数据)', fontsize=16)
colors = ['red', 'green', 'blue']
for i, perplexity in enumerate(perplexity_values):
ax = axes[i//2, i%2]
# t-SNE降维
tsne = TSNE(n_components=2, perplexity=perplexity,
random_state=42, n_iter=1000)
X_tsne = tsne.fit_transform(X_scaled)
# 可视化
for j, (color, target_name) in enumerate(zip(colors, target_names)):
mask = y == j
ax.scatter(X_tsne[mask, 0], X_tsne[mask, 1],
c=color, label=target_name, alpha=0.7)
ax.set_title(f'Perplexity = {perplexity}')
ax.set_xlabel('t-SNE 1')
ax.set_ylabel('t-SNE 2')
ax.legend()
ax.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# 演示t-SNE降维
print("=== t-SNE降维演示 ===")
tsne_demo = TSNEDemo()
tsne_demo.theory_explanation()
tsne_demo.compare_dimensionality_reduction()
tsne_demo.tsne_parameter_analysis()
5.4 异常检测
5.4.1 异常检测概述
异常检测用于识别数据中的异常点或离群值,在欺诈检测、故障诊断等领域有重要应用。
class AnomalyDetectionDemo:
def __init__(self):
self.models = {}
def theory_explanation(self):
"""异常检测理论解释"""
print("=== 异常检测理论 ===")
print("1. 异常类型:")
print(" - 点异常:单个数据点异常")
print(" - 上下文异常:在特定上下文中异常")
print(" - 集体异常:一组数据点异常")
print("2. 检测方法:")
print(" - 统计方法:基于统计分布")
print(" - 基于距离:计算到其他点的距离")
print(" - 基于密度:局部密度异常")
print(" - 基于模型:训练正常数据模型")
print("3. 应用场景:")
print(" - 欺诈检测、网络安全、质量控制")
def create_anomaly_data(self):
"""创建包含异常的数据"""
np.random.seed(42)
# 正常数据
normal_data = np.random.multivariate_normal([0, 0], [[1, 0.5], [0.5, 1]], 200)
# 异常数据
anomaly_data = np.random.uniform(-4, 4, (20, 2))
# 合并数据
X = np.vstack([normal_data, anomaly_data])
y_true = np.hstack([np.zeros(200), np.ones(20)]) # 0=正常, 1=异常
return X, y_true
def isolation_forest_demo(self):
"""孤立森林异常检测"""
# 创建数据
X, y_true = self.create_anomaly_data()
# 孤立森林
iso_forest = IsolationForest(contamination=0.1, random_state=42)
y_pred = iso_forest.fit_predict(X)
y_pred = (y_pred == -1).astype(int) # 转换为0/1标签
# 异常分数
anomaly_scores = iso_forest.decision_function(X)
# 评估
from sklearn.metrics import classification_report, roc_auc_score
print("=== 孤立森林异常检测 ===")
print(f"ROC AUC: {roc_auc_score(y_true, -anomaly_scores):.4f}")
print("\n分类报告:")
print(classification_report(y_true, y_pred, target_names=['正常', '异常']))
# 可视化
fig, axes = plt.subplots(1, 3, figsize=(18, 6))
# 真实标签
axes[0].scatter(X[y_true==0, 0], X[y_true==0, 1], c='blue', alpha=0.6, label='正常')
axes[0].scatter(X[y_true==1, 0], X[y_true==1, 1], c='red', alpha=0.8, label='异常')
axes[0].set_title('真实标签')
axes[0].set_xlabel('特征1')
axes[0].set_ylabel('特征2')
axes[0].legend()
axes[0].grid(True, alpha=0.3)
# 预测结果
axes[1].scatter(X[y_pred==0, 0], X[y_pred==0, 1], c='blue', alpha=0.6, label='预测正常')
axes[1].scatter(X[y_pred==1, 0], X[y_pred==1, 1], c='red', alpha=0.8, label='预测异常')
axes[1].set_title('孤立森林预测')
axes[1].set_xlabel('特征1')
axes[1].set_ylabel('特征2')
axes[1].legend()
axes[1].grid(True, alpha=0.3)
# 异常分数
scatter = axes[2].scatter(X[:, 0], X[:, 1], c=anomaly_scores, cmap='viridis', alpha=0.7)
axes[2].set_title('异常分数')
axes[2].set_xlabel('特征1')
axes[2].set_ylabel('特征2')
plt.colorbar(scatter, ax=axes[2], label='异常分数')
plt.tight_layout()
plt.show()
return iso_forest
def one_class_svm_demo(self):
"""One-Class SVM异常检测"""
# 创建数据
X, y_true = self.create_anomaly_data()
# One-Class SVM
oc_svm = OneClassSVM(nu=0.1, kernel='rbf', gamma='scale')
y_pred = oc_svm.fit_predict(X)
y_pred = (y_pred == -1).astype(int)
# 决策函数值
decision_scores = oc_svm.decision_function(X)
# 评估
print("=== One-Class SVM异常检测 ===")
print(f"ROC AUC: {roc_auc_score(y_true, -decision_scores):.4f}")
print("\n分类报告:")
print(classification_report(y_true, y_pred, target_names=['正常', '异常']))
# 可视化决策边界
xx, yy = np.meshgrid(np.linspace(X[:, 0].min()-1, X[:, 0].max()+1, 100),
np.linspace(X[:, 1].min()-1, X[:, 1].max()+1, 100))
Z = oc_svm.decision_function(np.c_[xx.ravel(), yy.ravel()])
Z = Z.reshape(xx.shape)
plt.figure(figsize=(12, 5))
plt.subplot(1, 2, 1)
plt.contourf(xx, yy, Z, levels=np.linspace(Z.min(), 0, 7), cmap='Blues_r', alpha=0.6)
plt.contour(xx, yy, Z, levels=[0], linewidths=2, colors='red')
plt.scatter(X[y_true==0, 0], X[y_true==0, 1], c='blue', alpha=0.6, label='正常')
plt.scatter(X[y_true==1, 0], X[y_true==1, 1], c='red', alpha=0.8, label='异常')
plt.title('One-Class SVM决策边界')
plt.xlabel('特征1')
plt.ylabel('特征2')
plt.legend()
plt.grid(True, alpha=0.3)
plt.subplot(1, 2, 2)
scatter = plt.scatter(X[:, 0], X[:, 1], c=decision_scores, cmap='viridis', alpha=0.7)
plt.title('决策函数值')
plt.xlabel('特征1')
plt.ylabel('特征2')
plt.colorbar(scatter, label='决策函数值')
plt.tight_layout()
plt.show()
return oc_svm
def local_outlier_factor_demo(self):
"""局部离群因子异常检测"""
# 创建数据
X, y_true = self.create_anomaly_data()
# LOF
lof = LocalOutlierFactor(n_neighbors=20, contamination=0.1)
y_pred = lof.fit_predict(X)
y_pred = (y_pred == -1).astype(int)
# 局部离群因子
lof_scores = -lof.negative_outlier_factor_
# 评估
print("=== 局部离群因子异常检测 ===")
print(f"ROC AUC: {roc_auc_score(y_true, lof_scores):.4f}")
print("\n分类报告:")
print(classification_report(y_true, y_pred, target_names=['正常', '异常']))
# 可视化
plt.figure(figsize=(15, 5))
plt.subplot(1, 3, 1)
plt.scatter(X[y_true==0, 0], X[y_true==0, 1], c='blue', alpha=0.6, label='正常')
plt.scatter(X[y_true==1, 0], X[y_true==1, 1], c='red', alpha=0.8, label='异常')
plt.title('真实标签')
plt.xlabel('特征1')
plt.ylabel('特征2')
plt.legend()
plt.grid(True, alpha=0.3)
plt.subplot(1, 3, 2)
plt.scatter(X[y_pred==0, 0], X[y_pred==0, 1], c='blue', alpha=0.6, label='预测正常')
plt.scatter(X[y_pred==1, 0], X[y_pred==1, 1], c='red', alpha=0.8, label='预测异常')
plt.title('LOF预测')
plt.xlabel('特征1')
plt.ylabel('特征2')
plt.legend()
plt.grid(True, alpha=0.3)
plt.subplot(1, 3, 3)
scatter = plt.scatter(X[:, 0], X[:, 1], c=lof_scores, cmap='viridis', alpha=0.7)
plt.title('局部离群因子')
plt.xlabel('特征1')
plt.ylabel('特征2')
plt.colorbar(scatter, label='LOF分数')
plt.tight_layout()
plt.show()
return lof
def compare_anomaly_methods(self):
"""比较不同异常检测方法"""
# 创建数据
X, y_true = self.create_anomaly_data()
# 异常检测方法
methods = {
'Isolation Forest': IsolationForest(contamination=0.1, random_state=42),
'One-Class SVM': OneClassSVM(nu=0.1, kernel='rbf', gamma='scale'),
'LOF': LocalOutlierFactor(n_neighbors=20, contamination=0.1)
}
results = {}
for name, method in methods.items():
if name == 'LOF':
y_pred = method.fit_predict(X)
scores = -method.negative_outlier_factor_
else:
y_pred = method.fit_predict(X)
scores = -method.decision_function(X) if hasattr(method, 'decision_function') else -method.score_samples(X)
y_pred = (y_pred == -1).astype(int)
auc = roc_auc_score(y_true, scores)
results[name] = {
'predictions': y_pred,
'scores': scores,
'auc': auc
}
print(f"{name}: ROC AUC = {auc:.4f}")
# 可视化比较
fig, axes = plt.subplots(2, 2, figsize=(15, 12))
fig.suptitle('异常检测方法比较', fontsize=16)
# 真实标签
axes[0, 0].scatter(X[y_true==0, 0], X[y_true==0, 1], c='blue', alpha=0.6, label='正常')
axes[0, 0].scatter(X[y_true==1, 0], X[y_true==1, 1], c='red', alpha=0.8, label='异常')
axes[0, 0].set_title('真实标签')
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)
# 各方法预测结果
for i, (name, result) in enumerate(results.items()):
if i >= 3:
break
ax = axes[(i+1)//2, (i+1)%2]
y_pred = result['predictions']
auc = result['auc']
ax.scatter(X[y_pred==0, 0], X[y_pred==0, 1], c='blue', alpha=0.6, label='预测正常')
ax.scatter(X[y_pred==1, 0], X[y_pred==1, 1], c='red', alpha=0.8, label='预测异常')
ax.set_title(f'{name} (AUC={auc:.3f})')
ax.legend()
ax.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
return results
# 演示异常检测
print("=== 异常检测演示 ===")
anomaly_demo = AnomalyDetectionDemo()
anomaly_demo.theory_explanation()
anomaly_demo.isolation_forest_demo()
anomaly_demo.one_class_svm_demo()
anomaly_demo.local_outlier_factor_demo()
anomaly_demo.compare_anomaly_methods()
5.5 综合案例:客户细分分析
5.5.1 项目背景
我们将使用无监督学习技术对客户进行细分,结合聚类、降维和异常检测。
class CustomerSegmentationProject:
def __init__(self):
self.data = None
self.models = {}
self.results = {}
def create_customer_data(self):
"""创建客户数据集"""
np.random.seed(42)
n_customers = 1000
# 生成客户特征
age = np.random.normal(40, 15, n_customers)
age = np.clip(age, 18, 80)
income = np.random.lognormal(10, 0.5, n_customers)
income = np.clip(income, 20000, 200000)
spending_score = np.random.beta(2, 5, n_customers) * 100
# 添加一些相关性
annual_spending = income * 0.3 + spending_score * 100 + np.random.normal(0, 5000, n_customers)
annual_spending = np.clip(annual_spending, 1000, 100000)
frequency = np.random.poisson(12, n_customers)
recency = np.random.exponential(30, n_customers)
# 创建DataFrame
self.data = pd.DataFrame({
'age': age,
'income': income,
'spending_score': spending_score,
'annual_spending': annual_spending,
'frequency': frequency,
'recency': recency
})
print("=== 客户数据集信息 ===")
print(self.data.describe())
print(f"\n数据形状: {self.data.shape}")
# 数据可视化
fig, axes = plt.subplots(2, 3, figsize=(18, 12))
fig.suptitle('客户数据分布', fontsize=16)
features = self.data.columns
for i, feature in enumerate(features):
ax = axes[i//3, i%3]
ax.hist(self.data[feature], bins=30, alpha=0.7, edgecolor='black')
ax.set_title(f'{feature}分布')
ax.set_xlabel(feature)
ax.set_ylabel('频次')
ax.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
return self.data
def data_preprocessing(self):
"""数据预处理"""
if self.data is None:
self.create_customer_data()
# 标准化
scaler = StandardScaler()
X_scaled = scaler.fit_transform(self.data)
# 相关性分析
correlation_matrix = self.data.corr()
plt.figure(figsize=(10, 8))
sns.heatmap(correlation_matrix, annot=True, cmap='coolwarm', center=0,
square=True, linewidths=0.5)
plt.title('特征相关性矩阵')
plt.tight_layout()
plt.show()
return X_scaled, scaler
def customer_clustering(self):
"""客户聚类分析"""
X_scaled, scaler = self.data_preprocessing()
# 确定最优聚类数
k_range = range(2, 11)
inertias = []
silhouette_scores = []
for k in k_range:
kmeans = KMeans(n_clusters=k, random_state=42, n_init=10)
labels = kmeans.fit_predict(X_scaled)
inertias.append(kmeans.inertia_)
silhouette_scores.append(silhouette_score(X_scaled, labels))
# 可视化选择最优k
fig, axes = plt.subplots(1, 2, figsize=(15, 6))
axes[0].plot(k_range, inertias, 'bo-')
axes[0].set_xlabel('聚类数 (k)')
axes[0].set_ylabel('簇内平方和')
axes[0].set_title('肘部法则')
axes[0].grid(True, alpha=0.3)
axes[1].plot(k_range, silhouette_scores, 'ro-')
axes[1].set_xlabel('聚类数 (k)')
axes[1].set_ylabel('轮廓系数')
axes[1].set_title('轮廓系数法')
axes[1].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# 选择最优k并进行聚类
optimal_k = k_range[np.argmax(silhouette_scores)]
print(f"最优聚类数: {optimal_k}")
kmeans = KMeans(n_clusters=optimal_k, random_state=42, n_init=10)
cluster_labels = kmeans.fit_predict(X_scaled)
# 添加聚类标签到原数据
self.data['cluster'] = cluster_labels
# 聚类结果分析
cluster_summary = self.data.groupby('cluster').agg({
'age': ['mean', 'std'],
'income': ['mean', 'std'],
'spending_score': ['mean', 'std'],
'annual_spending': ['mean', 'std'],
'frequency': ['mean', 'std'],
'recency': ['mean', 'std']
}).round(2)
print("\n=== 聚类结果摘要 ===")
print(cluster_summary)
self.models['kmeans'] = kmeans
return cluster_labels, optimal_k
def dimensionality_reduction_analysis(self):
"""降维分析"""
X_scaled, _ = self.data_preprocessing()
# PCA降维
pca = PCA(n_components=2)
X_pca = pca.fit_transform(X_scaled)
# t-SNE降维
tsne = TSNE(n_components=2, random_state=42, perplexity=30)
X_tsne = tsne.fit_transform(X_scaled)
# 可视化
cluster_labels = self.data['cluster']
fig, axes = plt.subplots(1, 2, figsize=(15, 6))
# PCA可视化
scatter1 = axes[0].scatter(X_pca[:, 0], X_pca[:, 1], c=cluster_labels,
cmap='viridis', alpha=0.7)
axes[0].set_title(f'PCA降维结果\n解释方差比例: {pca.explained_variance_ratio_.sum():.2%}')
axes[0].set_xlabel(f'PC1 ({pca.explained_variance_ratio_[0]:.2%})')
axes[0].set_ylabel(f'PC2 ({pca.explained_variance_ratio_[1]:.2%})')
plt.colorbar(scatter1, ax=axes[0], label='聚类')
# t-SNE可视化
scatter2 = axes[1].scatter(X_tsne[:, 0], X_tsne[:, 1], c=cluster_labels,
cmap='viridis', alpha=0.7)
axes[1].set_title('t-SNE降维结果')
axes[1].set_xlabel('t-SNE 1')
axes[1].set_ylabel('t-SNE 2')
plt.colorbar(scatter2, ax=axes[1], label='聚类')
plt.tight_layout()
plt.show()
self.models['pca'] = pca
self.models['tsne'] = tsne
return X_pca, X_tsne
def anomaly_detection_analysis(self):
"""异常客户检测"""
X_scaled, _ = self.data_preprocessing()
# 使用孤立森林检测异常客户
iso_forest = IsolationForest(contamination=0.05, random_state=42)
anomaly_labels = iso_forest.fit_predict(X_scaled)
anomaly_scores = iso_forest.decision_function(X_scaled)
# 添加异常标签
self.data['is_anomaly'] = (anomaly_labels == -1).astype(int)
self.data['anomaly_score'] = anomaly_scores
# 异常客户分析
print("=== 异常客户分析 ===")
print(f"异常客户数量: {sum(anomaly_labels == -1)}")
print(f"异常客户比例: {sum(anomaly_labels == -1) / len(anomaly_labels):.2%}")
# 异常客户特征分析
normal_customers = self.data[self.data['is_anomaly'] == 0]
anomaly_customers = self.data[self.data['is_anomaly'] == 1]
comparison = pd.DataFrame({
'Normal_Mean': normal_customers.select_dtypes(include=[np.number]).mean(),
'Anomaly_Mean': anomaly_customers.select_dtypes(include=[np.number]).mean()
})
comparison['Difference'] = comparison['Anomaly_Mean'] - comparison['Normal_Mean']
print("\n正常vs异常客户特征对比:")
print(comparison.round(2))
# 可视化异常检测结果
X_pca, _ = self.dimensionality_reduction_analysis()
plt.figure(figsize=(12, 5))
plt.subplot(1, 2, 1)
normal_mask = anomaly_labels == 1
anomaly_mask = anomaly_labels == -1
plt.scatter(X_pca[normal_mask, 0], X_pca[normal_mask, 1],
c='blue', alpha=0.6, label='正常客户')
plt.scatter(X_pca[anomaly_mask, 0], X_pca[anomaly_mask, 1],
c='red', alpha=0.8, label='异常客户', s=100)
plt.title('异常客户检测结果')
plt.xlabel('PC1')
plt.ylabel('PC2')
plt.legend()
plt.grid(True, alpha=0.3)
plt.subplot(1, 2, 2)
scatter = plt.scatter(X_pca[:, 0], X_pca[:, 1], c=anomaly_scores,
cmap='viridis', alpha=0.7)
plt.title('异常分数分布')
plt.xlabel('PC1')
plt.ylabel('PC2')
plt.colorbar(scatter, label='异常分数')
plt.tight_layout()
plt.show()
self.models['isolation_forest'] = iso_forest
return anomaly_labels, anomaly_scores
def generate_insights(self):
"""生成业务洞察"""
if 'cluster' not in self.data.columns:
self.customer_clustering()
print("=== 客户细分业务洞察 ===")
# 各聚类的特征描述
for cluster_id in sorted(self.data['cluster'].unique()):
cluster_data = self.data[self.data['cluster'] == cluster_id]
print(f"\n【聚类 {cluster_id}】({len(cluster_data)}个客户)")
print(f" 平均年龄: {cluster_data['age'].mean():.1f}岁")
print(f" 平均收入: {cluster_data['income'].mean():.0f}元")
print(f" 消费评分: {cluster_data['spending_score'].mean():.1f}")
print(f" 年消费额: {cluster_data['annual_spending'].mean():.0f}元")
print(f" 购买频次: {cluster_data['frequency'].mean():.1f}次/年")
print(f" 最近购买: {cluster_data['recency'].mean():.1f}天前")
# 客户价值评估
avg_value = cluster_data['annual_spending'].mean()
avg_frequency = cluster_data['frequency'].mean()
avg_recency = cluster_data['recency'].mean()
if avg_value > self.data['annual_spending'].mean() and avg_frequency > self.data['frequency'].mean():
customer_type = "高价值客户"
elif avg_recency > self.data['recency'].mean():
customer_type = "流失风险客户"
elif avg_frequency < self.data['frequency'].mean():
customer_type = "低活跃客户"
else:
customer_type = "普通客户"
print(f" 客户类型: {customer_type}")
# 异常客户洞察
if 'is_anomaly' in self.data.columns:
anomaly_customers = self.data[self.data['is_anomaly'] == 1]
if len(anomaly_customers) > 0:
print(f"\n【异常客户分析】")
print(f" 数量: {len(anomaly_customers)}个")
print(f" 特点: 可能是VIP客户或数据异常")
print(f" 建议: 人工审核,制定个性化服务策略")
# 运行客户细分项目
print("=== 客户细分分析项目 ===")
customer_project = CustomerSegmentationProject()
customer_project.create_customer_data()
customer_project.customer_clustering()
customer_project.dimensionality_reduction_analysis()
customer_project.anomaly_detection_analysis()
customer_project.generate_insights()
5.6 小结
本章我们深入学习了无监督学习的理论和实践:
5.6.1 主要内容回顾
无监督学习概述
- 无监督学习的定义和特点
- 主要任务:聚类、降维、异常检测
- 应用场景和数据集创建
聚类算法
- K-Means:基于距离的聚类,适合球形簇
- DBSCAN:基于密度的聚类,发现任意形状簇
- 层次聚类和谱聚类
- 聚类评估指标和最优参数选择
降维技术
- PCA:线性降维,保持方差最大化
- t-SNE:非线性降维,适合数据可视化
- 其他方法:MDS、Isomap等
- 降维效果评估和参数调优
异常检测
- 孤立森林:基于随机森林的异常检测
- One-Class SVM:基于支持向量机
- LOF:基于局部密度的异常检测
- 异常检测评估和方法比较
综合案例
- 客户细分分析完整项目
- 多种无监督学习技术的综合应用
- 业务洞察和实际应用
5.6.2 算法选择指导
| 任务类型 | 推荐算法 | 适用场景 | 注意事项 |
|---|---|---|---|
| 聚类 | K-Means | 球形簇,已知簇数 | 需要标准化,对初始化敏感 |
| 聚类 | DBSCAN | 任意形状簇,有噪声 | 参数调优困难 |
| 降维 | PCA | 线性关系,需要可解释性 | 假设线性关系 |
| 降维 | t-SNE | 非线性关系,数据可视化 | 计算复杂度高 |
| 异常检测 | Isolation Forest | 高维数据,无标签 | 适合连续特征 |
| 异常检测 | LOF | 局部异常,密度变化 | 对参数敏感 |
5.6.3 最佳实践
数据预处理
- 特征标准化或归一化
- 处理缺失值和异常值
- 特征选择和工程
算法选择
- 根据数据特点选择合适算法
- 考虑数据规模和计算资源
- 结合多种方法进行验证
参数调优
- 使用网格搜索或贝叶斯优化
- 交叉验证评估稳定性
- 可视化参数影响
结果评估
- 使用多个评估指标
- 结合业务知识解释结果
- 可视化验证合理性
5.6.4 常见陷阱
聚类陷阱
- 盲目相信聚类结果
- 忽略数据标准化
- 不验证聚类稳定性
降维陷阱
- 过度降维丢失信息
- 忽略非线性关系
- 不考虑计算复杂度
异常检测陷阱
- 异常比例设置不当
- 忽略业务背景
- 不验证检测结果
5.6.5 下一步学习
- 高级聚类:谱聚类、混合高斯模型
- 深度学习降维:自编码器、变分自编码器
- 时间序列异常检测:基于时间的异常检测
- 半监督学习:结合少量标签的学习
- 强化学习:基于奖励的学习
5.6.6 练习题
基础练习
- 实现简单的K-Means算法
- 比较不同距离度量对聚类的影响
- 分析PCA主成分的业务含义
进阶练习
- 实现自适应的DBSCAN参数选择
- 结合多种降维方法进行数据探索
- 设计异常检测的评估框架
项目练习
- 使用真实数据进行客户细分
- 构建推荐系统的用户画像
- 开发异常检测监控系统
通过本章的学习,你应该能够: - 理解无监督学习的核心概念和应用场景 - 掌握主要的聚类、降维和异常检测算法 - 根据数据特点选择合适的无监督学习方法 - 进行完整的无监督学习项目分析
下一章我们将学习模型评估与选择,探讨如何科学地评估和比较机器学习模型。 “`