9.1 社交网络分析系统
9.1.1 系统架构设计
from neo4j import GraphDatabase
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional, Tuple
import json
import logging
from dataclasses import dataclass
from enum import Enum
import networkx as nx
import matplotlib.pyplot as plt
import seaborn as sns
from collections import defaultdict
import random
class RelationshipType(Enum):
"""关系类型"""
FRIEND = "FRIEND"
FOLLOW = "FOLLOW"
LIKE = "LIKE"
COMMENT = "COMMENT"
SHARE = "SHARE"
BLOCK = "BLOCK"
MESSAGE = "MESSAGE"
class ContentType(Enum):
"""内容类型"""
POST = "POST"
PHOTO = "PHOTO"
VIDEO = "VIDEO"
STORY = "STORY"
LIVE = "LIVE"
@dataclass
class User:
"""用户数据模型"""
user_id: str
username: str
email: str
age: int
gender: str
location: str
interests: List[str]
join_date: datetime
is_verified: bool = False
follower_count: int = 0
following_count: int = 0
@dataclass
class Content:
"""内容数据模型"""
content_id: str
author_id: str
content_type: ContentType
title: str
text: str
tags: List[str]
created_at: datetime
like_count: int = 0
comment_count: int = 0
share_count: int = 0
view_count: int = 0
class SocialNetworkAnalyzer:
"""社交网络分析系统"""
def __init__(self, neo4j_config: Dict[str, Any]):
# Neo4j连接
self.driver = GraphDatabase.driver(
neo4j_config['uri'],
auth=(neo4j_config['username'], neo4j_config['password'])
)
# 配置日志
logging.basicConfig(level=logging.INFO)
self.logger = logging.getLogger(__name__)
# 初始化数据库约束和索引
self._setup_database()
def _setup_database(self):
"""设置数据库约束和索引"""
with self.driver.session() as session:
# 创建唯一约束
constraints = [
"CREATE CONSTRAINT user_id_unique IF NOT EXISTS FOR (u:User) REQUIRE u.user_id IS UNIQUE",
"CREATE CONSTRAINT content_id_unique IF NOT EXISTS FOR (c:Content) REQUIRE c.content_id IS UNIQUE",
"CREATE CONSTRAINT tag_name_unique IF NOT EXISTS FOR (t:Tag) REQUIRE t.name IS UNIQUE"
]
for constraint in constraints:
try:
session.run(constraint)
except Exception as e:
self.logger.warning(f"Constraint creation failed: {e}")
# 创建索引
indexes = [
"CREATE INDEX user_username IF NOT EXISTS FOR (u:User) ON (u.username)",
"CREATE INDEX user_location IF NOT EXISTS FOR (u:User) ON (u.location)",
"CREATE INDEX content_created_at IF NOT EXISTS FOR (c:Content) ON (c.created_at)",
"CREATE INDEX content_type IF NOT EXISTS FOR (c:Content) ON (c.content_type)"
]
for index in indexes:
try:
session.run(index)
except Exception as e:
self.logger.warning(f"Index creation failed: {e}")
def create_user(self, user: User) -> bool:
"""创建用户"""
with self.driver.session() as session:
try:
cypher = """
CREATE (u:User {
user_id: $user_id,
username: $username,
email: $email,
age: $age,
gender: $gender,
location: $location,
interests: $interests,
join_date: $join_date,
is_verified: $is_verified,
follower_count: $follower_count,
following_count: $following_count
})
RETURN u.user_id as user_id
"""
result = session.run(cypher, {
'user_id': user.user_id,
'username': user.username,
'email': user.email,
'age': user.age,
'gender': user.gender,
'location': user.location,
'interests': user.interests,
'join_date': user.join_date.isoformat(),
'is_verified': user.is_verified,
'follower_count': user.follower_count,
'following_count': user.following_count
})
created_user = result.single()
if created_user:
self.logger.info(f"Created user: {created_user['user_id']}")
return True
return False
except Exception as e:
self.logger.error(f"Failed to create user: {e}")
return False
def create_content(self, content: Content) -> bool:
"""创建内容"""
with self.driver.session() as session:
try:
# 创建内容节点
cypher = """
MATCH (author:User {user_id: $author_id})
CREATE (c:Content {
content_id: $content_id,
content_type: $content_type,
title: $title,
text: $text,
created_at: $created_at,
like_count: $like_count,
comment_count: $comment_count,
share_count: $share_count,
view_count: $view_count
})
CREATE (author)-[:AUTHORED]->(c)
RETURN c.content_id as content_id
"""
result = session.run(cypher, {
'author_id': content.author_id,
'content_id': content.content_id,
'content_type': content.content_type.value,
'title': content.title,
'text': content.text,
'created_at': content.created_at.isoformat(),
'like_count': content.like_count,
'comment_count': content.comment_count,
'share_count': content.share_count,
'view_count': content.view_count
})
created_content = result.single()
if created_content:
# 创建标签关系
self._create_content_tags(content.content_id, content.tags)
self.logger.info(f"Created content: {created_content['content_id']}")
return True
return False
except Exception as e:
self.logger.error(f"Failed to create content: {e}")
return False
def _create_content_tags(self, content_id: str, tags: List[str]):
"""为内容创建标签关系"""
with self.driver.session() as session:
for tag in tags:
cypher = """
MATCH (c:Content {content_id: $content_id})
MERGE (t:Tag {name: $tag_name})
MERGE (c)-[:TAGGED_WITH]->(t)
"""
session.run(cypher, {
'content_id': content_id,
'tag_name': tag
})
def create_relationship(self, user1_id: str, user2_id: str,
relationship_type: RelationshipType,
properties: Dict[str, Any] = None) -> bool:
"""创建用户关系"""
with self.driver.session() as session:
try:
# 构建属性字符串
if properties:
prop_items = [f"{k}: ${k}" for k in properties.keys()]
prop_str = "{" + ", ".join(prop_items) + "}"
else:
prop_str = ""
properties = {}
cypher = f"""
MATCH (u1:User {{user_id: $user1_id}})
MATCH (u2:User {{user_id: $user2_id}})
CREATE (u1)-[r:{relationship_type.value} {prop_str}]->(u2)
RETURN id(r) as rel_id
"""
result = session.run(cypher, {
**properties,
'user1_id': user1_id,
'user2_id': user2_id
})
rel = result.single()
if rel:
self.logger.info(f"Created {relationship_type.value} relationship: {user1_id} -> {user2_id}")
return True
return False
except Exception as e:
self.logger.error(f"Failed to create relationship: {e}")
return False
def create_content_interaction(self, user_id: str, content_id: str,
interaction_type: RelationshipType,
properties: Dict[str, Any] = None) -> bool:
"""创建内容交互"""
with self.driver.session() as session:
try:
if properties is None:
properties = {}
properties['timestamp'] = datetime.now().isoformat()
# 构建属性字符串
prop_items = [f"{k}: ${k}" for k in properties.keys()]
prop_str = "{" + ", ".join(prop_items) + "}"
cypher = f"""
MATCH (u:User {{user_id: $user_id}})
MATCH (c:Content {{content_id: $content_id}})
CREATE (u)-[r:{interaction_type.value} {prop_str}]->(c)
RETURN id(r) as rel_id
"""
result = session.run(cypher, {
**properties,
'user_id': user_id,
'content_id': content_id
})
rel = result.single()
if rel:
# 更新内容统计
self._update_content_stats(content_id, interaction_type)
self.logger.info(f"Created {interaction_type.value} interaction: {user_id} -> {content_id}")
return True
return False
except Exception as e:
self.logger.error(f"Failed to create content interaction: {e}")
return False
def _update_content_stats(self, content_id: str, interaction_type: RelationshipType):
"""更新内容统计信息"""
with self.driver.session() as session:
if interaction_type == RelationshipType.LIKE:
cypher = """
MATCH (c:Content {content_id: $content_id})
SET c.like_count = c.like_count + 1
"""
elif interaction_type == RelationshipType.COMMENT:
cypher = """
MATCH (c:Content {content_id: $content_id})
SET c.comment_count = c.comment_count + 1
"""
elif interaction_type == RelationshipType.SHARE:
cypher = """
MATCH (c:Content {content_id: $content_id})
SET c.share_count = c.share_count + 1
"""
else:
return
session.run(cypher, {'content_id': content_id})
def find_mutual_friends(self, user1_id: str, user2_id: str) -> List[Dict[str, Any]]:
"""查找共同好友"""
with self.driver.session() as session:
cypher = """
MATCH (u1:User {user_id: $user1_id})-[:FRIEND]-(mutual:User)-[:FRIEND]-(u2:User {user_id: $user2_id})
WHERE u1 <> u2 AND u1 <> mutual AND u2 <> mutual
RETURN mutual.user_id as user_id, mutual.username as username, mutual.location as location
ORDER BY mutual.username
"""
result = session.run(cypher, {
'user1_id': user1_id,
'user2_id': user2_id
})
return [record.data() for record in result]
def recommend_friends(self, user_id: str, limit: int = 10) -> List[Dict[str, Any]]:
"""推荐好友"""
with self.driver.session() as session:
cypher = """
MATCH (u:User {user_id: $user_id})-[:FRIEND]-(friend:User)-[:FRIEND]-(potential:User)
WHERE u <> potential AND NOT (u)-[:FRIEND]-(potential)
WITH potential, count(*) as mutual_friends
MATCH (potential:User)
OPTIONAL MATCH (potential)-[:AUTHORED]->(content:Content)
WITH potential, mutual_friends, count(content) as content_count
RETURN potential.user_id as user_id,
potential.username as username,
potential.location as location,
potential.interests as interests,
mutual_friends,
content_count,
(mutual_friends * 2 + content_count * 0.1) as score
ORDER BY score DESC
LIMIT $limit
"""
result = session.run(cypher, {
'user_id': user_id,
'limit': limit
})
return [record.data() for record in result]
def analyze_user_influence(self, user_id: str) -> Dict[str, Any]:
"""分析用户影响力"""
with self.driver.session() as session:
# 基础统计
basic_stats_cypher = """
MATCH (u:User {user_id: $user_id})
OPTIONAL MATCH (u)-[:FRIEND]-(friends:User)
OPTIONAL MATCH (u)<-[:FOLLOW]-(followers:User)
OPTIONAL MATCH (u)-[:FOLLOW]->(following:User)
OPTIONAL MATCH (u)-[:AUTHORED]->(content:Content)
OPTIONAL MATCH (content)<-[:LIKE]-(likers:User)
OPTIONAL MATCH (content)<-[:COMMENT]-(commenters:User)
OPTIONAL MATCH (content)<-[:SHARE]-(sharers:User)
RETURN u.username as username,
count(DISTINCT friends) as friend_count,
count(DISTINCT followers) as follower_count,
count(DISTINCT following) as following_count,
count(DISTINCT content) as content_count,
count(DISTINCT likers) as total_likes,
count(DISTINCT commenters) as total_comments,
count(DISTINCT sharers) as total_shares
"""
basic_result = session.run(basic_stats_cypher, {'user_id': user_id})
basic_stats = basic_result.single().data()
# 计算影响力分数
influence_score = (
basic_stats['follower_count'] * 2 +
basic_stats['total_likes'] * 0.5 +
basic_stats['total_comments'] * 1.0 +
basic_stats['total_shares'] * 1.5 +
basic_stats['content_count'] * 0.3
)
# 网络中心性分析
centrality_cypher = """
MATCH (u:User {user_id: $user_id})
OPTIONAL MATCH path = shortestPath((u)-[:FRIEND*1..3]-(other:User))
WHERE u <> other
WITH u, collect(DISTINCT other) as reachable_users, count(DISTINCT other) as reach
RETURN reach
"""
centrality_result = session.run(centrality_cypher, {'user_id': user_id})
reach = centrality_result.single()['reach']
return {
'user_id': user_id,
'username': basic_stats['username'],
'basic_stats': basic_stats,
'influence_score': influence_score,
'network_reach': reach,
'engagement_rate': (
(basic_stats['total_likes'] + basic_stats['total_comments'] + basic_stats['total_shares']) /
max(basic_stats['content_count'], 1)
)
}
def detect_communities(self, algorithm: str = 'louvain') -> List[Dict[str, Any]]:
"""社区检测"""
with self.driver.session() as session:
try:
if algorithm == 'louvain':
# 使用Louvain算法
cypher = """
CALL gds.louvain.stream({
nodeProjection: 'User',
relationshipProjection: 'FRIEND'
})
YIELD nodeId, communityId
MATCH (u:User) WHERE id(u) = nodeId
RETURN u.user_id as user_id, u.username as username, communityId
ORDER BY communityId, username
"""
else:
# 使用标签传播算法
cypher = """
CALL gds.labelPropagation.stream({
nodeProjection: 'User',
relationshipProjection: 'FRIEND'
})
YIELD nodeId, communityId
MATCH (u:User) WHERE id(u) = nodeId
RETURN u.user_id as user_id, u.username as username, communityId
ORDER BY communityId, username
"""
result = session.run(cypher)
return [record.data() for record in result]
except Exception as e:
self.logger.warning(f"GDS algorithm failed, using simple clustering: {e}")
return self._simple_community_detection()
def _simple_community_detection(self) -> List[Dict[str, Any]]:
"""简单社区检测(基于连接密度)"""
with self.driver.session() as session:
cypher = """
MATCH (u:User)
OPTIONAL MATCH (u)-[:FRIEND]-(friend:User)
WITH u, count(friend) as friend_count
RETURN u.user_id as user_id,
u.username as username,
CASE
WHEN friend_count = 0 THEN 0
WHEN friend_count <= 5 THEN 1
WHEN friend_count <= 15 THEN 2
WHEN friend_count <= 50 THEN 3
ELSE 4
END as communityId
ORDER BY communityId, username
"""
result = session.run(cypher)
return [record.data() for record in result]
def analyze_content_trends(self, days: int = 30) -> Dict[str, Any]:
"""分析内容趋势"""
with self.driver.session() as session:
start_date = (datetime.now() - timedelta(days=days)).isoformat()
# 内容类型趋势
content_type_cypher = """
MATCH (c:Content)
WHERE c.created_at >= $start_date
RETURN c.content_type as content_type, count(*) as count
ORDER BY count DESC
"""
content_type_result = session.run(content_type_cypher, {'start_date': start_date})
content_types = [record.data() for record in content_type_result]
# 热门标签
popular_tags_cypher = """
MATCH (c:Content)-[:TAGGED_WITH]->(t:Tag)
WHERE c.created_at >= $start_date
RETURN t.name as tag, count(*) as usage_count
ORDER BY usage_count DESC
LIMIT 20
"""
tags_result = session.run(popular_tags_cypher, {'start_date': start_date})
popular_tags = [record.data() for record in tags_result]
# 每日内容发布量
daily_content_cypher = """
MATCH (c:Content)
WHERE c.created_at >= $start_date
WITH date(c.created_at) as publish_date, count(*) as daily_count
RETURN publish_date, daily_count
ORDER BY publish_date
"""
daily_result = session.run(daily_content_cypher, {'start_date': start_date})
daily_content = [record.data() for record in daily_result]
# 高互动内容
top_content_cypher = """
MATCH (c:Content)
WHERE c.created_at >= $start_date
RETURN c.content_id as content_id,
c.title as title,
c.content_type as content_type,
c.like_count as likes,
c.comment_count as comments,
c.share_count as shares,
(c.like_count + c.comment_count * 2 + c.share_count * 3) as engagement_score
ORDER BY engagement_score DESC
LIMIT 10
"""
top_content_result = session.run(top_content_cypher, {'start_date': start_date})
top_content = [record.data() for record in top_content_result]
return {
'period_days': days,
'content_type_distribution': content_types,
'popular_tags': popular_tags,
'daily_content_volume': daily_content,
'top_engaging_content': top_content
}
def generate_user_activity_report(self, user_id: str) -> Dict[str, Any]:
"""生成用户活动报告"""
with self.driver.session() as session:
# 用户基本信息
user_info_cypher = """
MATCH (u:User {user_id: $user_id})
RETURN u.username as username, u.location as location,
u.interests as interests, u.join_date as join_date
"""
user_info = session.run(user_info_cypher, {'user_id': user_id}).single().data()
# 内容创作统计
content_stats_cypher = """
MATCH (u:User {user_id: $user_id})-[:AUTHORED]->(c:Content)
RETURN c.content_type as content_type, count(*) as count,
sum(c.like_count) as total_likes,
sum(c.comment_count) as total_comments,
sum(c.share_count) as total_shares
ORDER BY count DESC
"""
content_stats = [record.data() for record in session.run(content_stats_cypher, {'user_id': user_id})]
# 社交活动统计
social_stats_cypher = """
MATCH (u:User {user_id: $user_id})
OPTIONAL MATCH (u)-[:LIKE]->(liked_content:Content)
OPTIONAL MATCH (u)-[:COMMENT]->(commented_content:Content)
OPTIONAL MATCH (u)-[:SHARE]->(shared_content:Content)
OPTIONAL MATCH (u)-[:FRIEND]-(friends:User)
OPTIONAL MATCH (u)-[:FOLLOW]->(following:User)
OPTIONAL MATCH (u)<-[:FOLLOW]-(followers:User)
RETURN count(DISTINCT liked_content) as likes_given,
count(DISTINCT commented_content) as comments_made,
count(DISTINCT shared_content) as shares_made,
count(DISTINCT friends) as friend_count,
count(DISTINCT following) as following_count,
count(DISTINCT followers) as follower_count
"""
social_stats = session.run(social_stats_cypher, {'user_id': user_id}).single().data()
# 最近活动
recent_activity_cypher = """
MATCH (u:User {user_id: $user_id})
OPTIONAL MATCH (u)-[r:LIKE|COMMENT|SHARE]->(c:Content)
WITH u, r, c, type(r) as activity_type
WHERE r.timestamp IS NOT NULL
RETURN activity_type, c.content_id as content_id, c.title as content_title, r.timestamp as timestamp
ORDER BY r.timestamp DESC
LIMIT 20
"""
recent_activity = [record.data() for record in session.run(recent_activity_cypher, {'user_id': user_id})]
return {
'user_info': user_info,
'content_creation': content_stats,
'social_engagement': social_stats,
'recent_activity': recent_activity,
'influence_analysis': self.analyze_user_influence(user_id)
}
def close(self):
"""关闭连接"""
if self.driver:
self.driver.close()
self.logger.info("Neo4j connection closed")
# 数据生成器
class SocialNetworkDataGenerator:
"""社交网络数据生成器"""
def __init__(self, analyzer: SocialNetworkAnalyzer):
self.analyzer = analyzer
self.users = []
self.contents = []
def generate_sample_data(self, num_users: int = 100, num_contents: int = 500):
"""生成示例数据"""
# 生成用户
self._generate_users(num_users)
# 生成内容
self._generate_contents(num_contents)
# 生成关系
self._generate_relationships()
# 生成交互
self._generate_interactions()
def _generate_users(self, num_users: int):
"""生成用户数据"""
locations = ['北京', '上海', '广州', '深圳', '杭州', '成都', '武汉', '西安', '南京', '重庆']
interests_pool = ['科技', '音乐', '电影', '旅行', '美食', '运动', '读书', '摄影', '游戏', '艺术']
for i in range(num_users):
user = User(
user_id=f"user_{i:04d}",
username=f"user_{i:04d}",
email=f"user_{i:04d}@example.com",
age=random.randint(18, 65),
gender=random.choice(['M', 'F']),
location=random.choice(locations),
interests=random.sample(interests_pool, random.randint(2, 5)),
join_date=datetime.now() - timedelta(days=random.randint(1, 365)),
is_verified=random.random() < 0.1
)
self.users.append(user)
self.analyzer.create_user(user)
def _generate_contents(self, num_contents: int):
"""生成内容数据"""
content_titles = [
"今天的天气真不错", "分享一个有趣的发现", "推荐一部好电影",
"美食探店记录", "旅行日记", "技术分享", "生活感悟",
"摄影作品展示", "音乐推荐", "读书笔记"
]
tags_pool = ['生活', '科技', '娱乐', '美食', '旅行', '摄影', '音乐', '电影', '读书', '运动']
for i in range(num_contents):
content = Content(
content_id=f"content_{i:04d}",
author_id=random.choice(self.users).user_id,
content_type=random.choice(list(ContentType)),
title=random.choice(content_titles),
text=f"这是内容 {i} 的详细描述...",
tags=random.sample(tags_pool, random.randint(1, 3)),
created_at=datetime.now() - timedelta(days=random.randint(1, 30))
)
self.contents.append(content)
self.analyzer.create_content(content)
def _generate_relationships(self):
"""生成用户关系"""
# 生成好友关系
for user in self.users:
num_friends = random.randint(5, 20)
potential_friends = [u for u in self.users if u.user_id != user.user_id]
friends = random.sample(potential_friends, min(num_friends, len(potential_friends)))
for friend in friends:
if random.random() < 0.7: # 70%概率创建好友关系
self.analyzer.create_relationship(
user.user_id, friend.user_id, RelationshipType.FRIEND,
{'created_at': datetime.now().isoformat()}
)
# 生成关注关系
for user in self.users:
num_following = random.randint(10, 50)
potential_following = [u for u in self.users if u.user_id != user.user_id]
following = random.sample(potential_following, min(num_following, len(potential_following)))
for follow_user in following:
if random.random() < 0.3: # 30%概率创建关注关系
self.analyzer.create_relationship(
user.user_id, follow_user.user_id, RelationshipType.FOLLOW,
{'created_at': datetime.now().isoformat()}
)
def _generate_interactions(self):
"""生成内容交互"""
for content in self.contents:
# 随机选择用户进行交互
interacting_users = random.sample(self.users, random.randint(5, 30))
for user in interacting_users:
# 点赞
if random.random() < 0.6:
self.analyzer.create_content_interaction(
user.user_id, content.content_id, RelationshipType.LIKE
)
# 评论
if random.random() < 0.2:
self.analyzer.create_content_interaction(
user.user_id, content.content_id, RelationshipType.COMMENT,
{'comment_text': f"来自 {user.username} 的评论"}
)
# 分享
if random.random() < 0.1:
self.analyzer.create_content_interaction(
user.user_id, content.content_id, RelationshipType.SHARE
)
# 使用示例
neo4j_config = {
'uri': 'bolt://localhost:7687',
'username': 'neo4j',
'password': 'password'
}
# 创建分析器
analyzer = SocialNetworkAnalyzer(neo4j_config)
# 生成示例数据
data_generator = SocialNetworkDataGenerator(analyzer)
data_generator.generate_sample_data(num_users=50, num_contents=200)
# 分析示例
print("=== 用户影响力分析 ===")
influence = analyzer.analyze_user_influence('user_0001')
print(f"用户: {influence['username']}")
print(f"影响力分数: {influence['influence_score']:.2f}")
print(f"网络覆盖: {influence['network_reach']} 用户")
print(f"互动率: {influence['engagement_rate']:.2f}")
print("\n=== 好友推荐 ===")
recommendations = analyzer.recommend_friends('user_0001', limit=5)
for rec in recommendations:
print(f"推荐用户: {rec['username']}, 共同好友: {rec['mutual_friends']}, 分数: {rec['score']:.2f}")
print("\n=== 内容趋势分析 ===")
trends = analyzer.analyze_content_trends(days=30)
print("内容类型分布:")
for ct in trends['content_type_distribution']:
print(f" {ct['content_type']}: {ct['count']} 条")
print("\n热门标签:")
for tag in trends['popular_tags'][:5]:
print(f" {tag['tag']}: {tag['usage_count']} 次使用")
print("\n=== 社区检测 ===")
communities = analyzer.detect_communities()
community_stats = {}
for member in communities:
community_id = member['communityId']
if community_id not in community_stats:
community_stats[community_id] = 0
community_stats[community_id] += 1
print("社区分布:")
for community_id, size in sorted(community_stats.items()):
print(f" 社区 {community_id}: {size} 成员")
# 生成用户活动报告
print("\n=== 用户活动报告 ===")
report = analyzer.generate_user_activity_report('user_0001')
print(f"用户: {report['user_info']['username']}")
print(f"位置: {report['user_info']['location']}")
print(f"兴趣: {', '.join(report['user_info']['interests'])}")
print(f"好友数: {report['social_engagement']['friend_count']}")
print(f"关注数: {report['social_engagement']['following_count']}")
print(f"粉丝数: {report['social_engagement']['follower_count']}")
# 关闭连接
analyzer.close()
9.1.2 可视化分析
import matplotlib.pyplot as plt
import seaborn as sns
import networkx as nx
from matplotlib.patches import Patch
import plotly.graph_objects as go
import plotly.express as px
from plotly.subplots import make_subplots
class SocialNetworkVisualizer:
"""社交网络可视化工具"""
def __init__(self, analyzer: SocialNetworkAnalyzer):
self.analyzer = analyzer
plt.style.use('seaborn-v0_8')
sns.set_palette("husl")
def visualize_network_graph(self, max_users: int = 50, save_path: str = None):
"""可视化网络图"""
with self.analyzer.driver.session() as session:
# 获取用户和关系数据
cypher = f"""
MATCH (u1:User)-[:FRIEND]-(u2:User)
RETURN u1.user_id as user1, u1.username as username1,
u2.user_id as user2, u2.username as username2
LIMIT {max_users * 10}
"""
result = session.run(cypher)
edges = [(record['user1'], record['user2']) for record in result]
if not edges:
print("没有找到好友关系数据")
return
# 创建NetworkX图
G = nx.Graph()
G.add_edges_from(edges)
# 限制节点数量
if len(G.nodes()) > max_users:
# 选择度数最高的节点
degree_dict = dict(G.degree())
top_nodes = sorted(degree_dict.items(), key=lambda x: x[1], reverse=True)[:max_users]
top_node_ids = [node[0] for node in top_nodes]
G = G.subgraph(top_node_ids)
# 计算布局
pos = nx.spring_layout(G, k=1, iterations=50)
# 绘制图形
plt.figure(figsize=(15, 12))
# 绘制边
nx.draw_networkx_edges(G, pos, alpha=0.5, width=0.5, edge_color='gray')
# 绘制节点
node_sizes = [G.degree(node) * 100 + 200 for node in G.nodes()]
nx.draw_networkx_nodes(G, pos, node_size=node_sizes,
node_color='lightblue', alpha=0.8)
# 添加标签
labels = {node: node.split('_')[1] for node in G.nodes()}
nx.draw_networkx_labels(G, pos, labels, font_size=8)
plt.title('社交网络图', fontsize=16, fontweight='bold')
plt.axis('off')
if save_path:
plt.savefig(save_path, dpi=300, bbox_inches='tight')
plt.show()
# 网络统计
print(f"网络节点数: {len(G.nodes())}")
print(f"网络边数: {len(G.edges())}")
print(f"平均度数: {sum(dict(G.degree()).values()) / len(G.nodes()):.2f}")
print(f"聚类系数: {nx.average_clustering(G):.3f}")
def plot_user_influence_distribution(self, save_path: str = None):
"""绘制用户影响力分布"""
with self.analyzer.driver.session() as session:
cypher = """
MATCH (u:User)
OPTIONAL MATCH (u)<-[:FOLLOW]-(followers:User)
OPTIONAL MATCH (u)-[:AUTHORED]->(content:Content)
OPTIONAL MATCH (content)<-[:LIKE]-(likers:User)
WITH u, count(DISTINCT followers) as follower_count,
count(DISTINCT content) as content_count,
count(DISTINCT likers) as total_likes
RETURN u.username as username,
follower_count,
content_count,
total_likes,
(follower_count * 2 + total_likes * 0.5 + content_count * 0.3) as influence_score
ORDER BY influence_score DESC
"""
result = session.run(cypher)
data = [record.data() for record in result]
if not data:
print("没有找到用户数据")
return
# 创建子图
fig, axes = plt.subplots(2, 2, figsize=(15, 12))
# 影响力分数分布
influence_scores = [d['influence_score'] for d in data]
axes[0, 0].hist(influence_scores, bins=20, alpha=0.7, color='skyblue')
axes[0, 0].set_title('影响力分数分布')
axes[0, 0].set_xlabel('影响力分数')
axes[0, 0].set_ylabel('用户数量')
# 粉丝数分布
follower_counts = [d['follower_count'] for d in data]
axes[0, 1].hist(follower_counts, bins=20, alpha=0.7, color='lightgreen')
axes[0, 1].set_title('粉丝数分布')
axes[0, 1].set_xlabel('粉丝数')
axes[0, 1].set_ylabel('用户数量')
# 内容数量分布
content_counts = [d['content_count'] for d in data]
axes[1, 0].hist(content_counts, bins=20, alpha=0.7, color='salmon')
axes[1, 0].set_title('内容发布数量分布')
axes[1, 0].set_xlabel('内容数量')
axes[1, 0].set_ylabel('用户数量')
# Top 10 影响力用户
top_users = data[:10]
usernames = [u['username'] for u in top_users]
scores = [u['influence_score'] for u in top_users]
axes[1, 1].barh(usernames, scores, color='gold')
axes[1, 1].set_title('Top 10 影响力用户')
axes[1, 1].set_xlabel('影响力分数')
plt.tight_layout()
if save_path:
plt.savefig(save_path, dpi=300, bbox_inches='tight')
plt.show()
def plot_content_trends(self, days: int = 30, save_path: str = None):
"""绘制内容趋势"""
trends = self.analyzer.analyze_content_trends(days)
# 创建子图
fig, axes = plt.subplots(2, 2, figsize=(15, 12))
# 内容类型分布
content_types = [ct['content_type'] for ct in trends['content_type_distribution']]
content_counts = [ct['count'] for ct in trends['content_type_distribution']]
axes[0, 0].pie(content_counts, labels=content_types, autopct='%1.1f%%')
axes[0, 0].set_title('内容类型分布')
# 热门标签
if trends['popular_tags']:
tags = [tag['tag'] for tag in trends['popular_tags'][:10]]
tag_counts = [tag['usage_count'] for tag in trends['popular_tags'][:10]]
axes[0, 1].barh(tags, tag_counts, color='lightcoral')
axes[0, 1].set_title('热门标签 Top 10')
axes[0, 1].set_xlabel('使用次数')
# 每日内容发布量
if trends['daily_content_volume']:
dates = [d['publish_date'] for d in trends['daily_content_volume']]
daily_counts = [d['daily_count'] for d in trends['daily_content_volume']]
axes[1, 0].plot(dates, daily_counts, marker='o', linewidth=2, markersize=4)
axes[1, 0].set_title('每日内容发布量')
axes[1, 0].set_xlabel('日期')
axes[1, 0].set_ylabel('发布数量')
axes[1, 0].tick_params(axis='x', rotation=45)
# 高互动内容
if trends['top_engaging_content']:
content_titles = [c['title'][:20] + '...' if len(c['title']) > 20 else c['title']
for c in trends['top_engaging_content'][:10]]
engagement_scores = [c['engagement_score'] for c in trends['top_engaging_content'][:10]]
axes[1, 1].barh(content_titles, engagement_scores, color='mediumpurple')
axes[1, 1].set_title('高互动内容 Top 10')
axes[1, 1].set_xlabel('互动分数')
plt.tight_layout()
if save_path:
plt.savefig(save_path, dpi=300, bbox_inches='tight')
plt.show()
def create_interactive_network(self, max_users: int = 100):
"""创建交互式网络图"""
with self.analyzer.driver.session() as session:
# 获取用户数据
user_cypher = f"""
MATCH (u:User)
OPTIONAL MATCH (u)<-[:FOLLOW]-(followers:User)
OPTIONAL MATCH (u)-[:AUTHORED]->(content:Content)
WITH u, count(DISTINCT followers) as follower_count,
count(DISTINCT content) as content_count
RETURN u.user_id as user_id, u.username as username,
u.location as location, follower_count, content_count
LIMIT {max_users}
"""
users_result = session.run(user_cypher)
users_data = {record['user_id']: record.data() for record in users_result}
# 获取关系数据
edge_cypher = """
MATCH (u1:User)-[:FRIEND]-(u2:User)
WHERE u1.user_id IN $user_ids AND u2.user_id IN $user_ids
RETURN u1.user_id as source, u2.user_id as target
"""
user_ids = list(users_data.keys())
edges_result = session.run(edge_cypher, {'user_ids': user_ids})
edges_data = [record.data() for record in edges_result]
if not edges_data:
print("没有找到关系数据")
return
# 创建NetworkX图
G = nx.Graph()
for user_id, user_info in users_data.items():
G.add_node(user_id, **user_info)
for edge in edges_data:
G.add_edge(edge['source'], edge['target'])
# 计算布局
pos = nx.spring_layout(G, k=1, iterations=50)
# 准备Plotly数据
edge_x = []
edge_y = []
for edge in G.edges():
x0, y0 = pos[edge[0]]
x1, y1 = pos[edge[1]]
edge_x.extend([x0, x1, None])
edge_y.extend([y0, y1, None])
edge_trace = go.Scatter(x=edge_x, y=edge_y,
line=dict(width=0.5, color='#888'),
hoverinfo='none',
mode='lines')
node_x = []
node_y = []
node_text = []
node_size = []
node_color = []
for node in G.nodes():
x, y = pos[node]
node_x.append(x)
node_y.append(y)
node_info = G.nodes[node]
node_text.append(f"用户: {node_info['username']}<br>"
f"位置: {node_info['location']}<br>"
f"粉丝: {node_info['follower_count']}<br>"
f"内容: {node_info['content_count']}")
# 节点大小基于度数
node_size.append(G.degree(node) * 5 + 10)
# 节点颜色基于粉丝数
node_color.append(node_info['follower_count'])
node_trace = go.Scatter(x=node_x, y=node_y,
mode='markers',
hoverinfo='text',
text=node_text,
marker=dict(showscale=True,
colorscale='YlOrRd',
reversescale=True,
color=node_color,
size=node_size,
colorbar=dict(
thickness=15,
len=0.5,
x=1.02,
title="粉丝数"
),
line=dict(width=2)))
# 创建图形
fig = go.Figure(data=[edge_trace, node_trace],
layout=go.Layout(
title='交互式社交网络图',
titlefont_size=16,
showlegend=False,
hovermode='closest',
margin=dict(b=20,l=5,r=5,t=40),
annotations=[ dict(
text="节点大小表示连接数,颜色表示粉丝数",
showarrow=False,
xref="paper", yref="paper",
x=0.005, y=-0.002,
xanchor="left", yanchor="bottom",
font=dict(size=12)
)],
xaxis=dict(showgrid=False, zeroline=False, showticklabels=False),
yaxis=dict(showgrid=False, zeroline=False, showticklabels=False))
)
fig.show()
# 可视化示例
visualizer = SocialNetworkVisualizer(analyzer)
# 绘制网络图
print("生成网络图...")
visualizer.visualize_network_graph(max_users=30)
# 绘制用户影响力分布
print("\n生成影响力分析图...")
visualizer.plot_user_influence_distribution()
# 绘制内容趋势
print("\n生成内容趋势图...")
visualizer.plot_content_trends(days=30)
# 创建交互式网络图
print("\n生成交互式网络图...")
visualizer.create_interactive_network(max_users=50)
9.2 推荐系统
9.2.1 基于图的推荐算法
from typing import Dict, List, Tuple, Set
import math
from collections import defaultdict, Counter
from dataclasses import dataclass
from enum import Enum
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.feature_extraction.text import TfidfVectorizer
class RecommendationType(Enum):
"""推荐类型"""
COLLABORATIVE_FILTERING = "collaborative_filtering"
CONTENT_BASED = "content_based"
GRAPH_BASED = "graph_based"
HYBRID = "hybrid"
@dataclass
class RecommendationResult:
"""推荐结果"""
item_id: str
item_type: str
score: float
reason: str
metadata: Dict[str, Any] = None
class GraphRecommendationEngine:
"""基于图的推荐引擎"""
def __init__(self, neo4j_config: Dict[str, Any]):
self.driver = GraphDatabase.driver(
neo4j_config['uri'],
auth=(neo4j_config['username'], neo4j_config['password'])
)
# 推荐参数
self.recommendation_params = {
'collaborative_weight': 0.4,
'content_weight': 0.3,
'graph_weight': 0.3,
'diversity_factor': 0.1,
'popularity_penalty': 0.05
}
logging.basicConfig(level=logging.INFO)
self.logger = logging.getLogger(__name__)
def recommend_friends(self, user_id: str, limit: int = 10) -> List[RecommendationResult]:
"""推荐好友"""
with self.driver.session() as session:
# 基于共同好友的推荐
mutual_friends_cypher = """
MATCH (u:User {user_id: $user_id})-[:FRIEND]-(friend:User)-[:FRIEND]-(potential:User)
WHERE u <> potential AND NOT (u)-[:FRIEND]-(potential)
WITH potential, count(*) as mutual_count
RETURN potential.user_id as user_id,
potential.username as username,
potential.location as location,
potential.interests as interests,
mutual_count
ORDER BY mutual_count DESC
LIMIT $limit
"""
mutual_result = session.run(mutual_friends_cypher, {
'user_id': user_id, 'limit': limit * 2
})
mutual_candidates = [record.data() for record in mutual_result]
# 基于兴趣相似度的推荐
interest_similarity_cypher = """
MATCH (u:User {user_id: $user_id})
MATCH (potential:User)
WHERE u <> potential AND NOT (u)-[:FRIEND]-(potential)
WITH u, potential,
[interest IN u.interests WHERE interest IN potential.interests] as common_interests,
size(u.interests) as u_interests_count,
size(potential.interests) as p_interests_count
WHERE size(common_interests) > 0
WITH potential, common_interests,
(size(common_interests) * 2.0) / (u_interests_count + p_interests_count) as similarity
RETURN potential.user_id as user_id,
potential.username as username,
potential.location as location,
potential.interests as interests,
similarity,
common_interests
ORDER BY similarity DESC
LIMIT $limit
"""
interest_result = session.run(interest_similarity_cypher, {
'user_id': user_id, 'limit': limit * 2
})
interest_candidates = [record.data() for record in interest_result]
# 基于地理位置的推荐
location_cypher = """
MATCH (u:User {user_id: $user_id})
MATCH (potential:User {location: u.location})
WHERE u <> potential AND NOT (u)-[:FRIEND]-(potential)
OPTIONAL MATCH (potential)<-[:FOLLOW]-(followers:User)
WITH potential, count(followers) as popularity
RETURN potential.user_id as user_id,
potential.username as username,
potential.location as location,
potential.interests as interests,
popularity
ORDER BY popularity DESC
LIMIT $limit
"""
location_result = session.run(location_cypher, {
'user_id': user_id, 'limit': limit
})
location_candidates = [record.data() for record in location_result]
# 合并和评分
recommendations = self._merge_friend_recommendations(
mutual_candidates, interest_candidates, location_candidates, limit
)
return recommendations
def _merge_friend_recommendations(self, mutual_candidates: List[Dict],
interest_candidates: List[Dict],
location_candidates: List[Dict],
limit: int) -> List[RecommendationResult]:
"""合并好友推荐结果"""
candidate_scores = defaultdict(lambda: {'score': 0, 'reasons': [], 'data': None})
# 共同好友分数
for candidate in mutual_candidates:
user_id = candidate['user_id']
score = candidate['mutual_count'] * 0.5
candidate_scores[user_id]['score'] += score
candidate_scores[user_id]['reasons'].append(f"有 {candidate['mutual_count']} 个共同好友")
candidate_scores[user_id]['data'] = candidate
# 兴趣相似度分数
for candidate in interest_candidates:
user_id = candidate['user_id']
score = candidate['similarity'] * 0.3
candidate_scores[user_id]['score'] += score
common_interests = ', '.join(candidate['common_interests'])
candidate_scores[user_id]['reasons'].append(f"共同兴趣: {common_interests}")
if candidate_scores[user_id]['data'] is None:
candidate_scores[user_id]['data'] = candidate
# 地理位置分数
for candidate in location_candidates:
user_id = candidate['user_id']
score = 0.2 + (candidate['popularity'] * 0.01)
candidate_scores[user_id]['score'] += score
candidate_scores[user_id]['reasons'].append(f"同城用户 ({candidate['location']})")
if candidate_scores[user_id]['data'] is None:
candidate_scores[user_id]['data'] = candidate
# 排序并返回结果
sorted_candidates = sorted(candidate_scores.items(),
key=lambda x: x[1]['score'], reverse=True)
recommendations = []
for user_id, info in sorted_candidates[:limit]:
recommendations.append(RecommendationResult(
item_id=user_id,
item_type='user',
score=info['score'],
reason='; '.join(info['reasons']),
metadata=info['data']
))
return recommendations
def recommend_content(self, user_id: str, limit: int = 10) -> List[RecommendationResult]:
"""推荐内容"""
with self.driver.session() as session:
# 基于用户历史行为的协同过滤
collaborative_cypher = """
MATCH (u:User {user_id: $user_id})-[:LIKE|COMMENT|SHARE]->(liked_content:Content)
MATCH (liked_content)<-[:LIKE|COMMENT|SHARE]-(similar_user:User)
WHERE u <> similar_user
MATCH (similar_user)-[:LIKE|COMMENT|SHARE]->(recommended_content:Content)
WHERE NOT (u)-[:LIKE|COMMENT|SHARE]->(recommended_content)
WITH recommended_content, count(*) as recommendation_strength,
collect(DISTINCT similar_user.username) as recommenders
RETURN recommended_content.content_id as content_id,
recommended_content.title as title,
recommended_content.content_type as content_type,
recommendation_strength,
recommenders
ORDER BY recommendation_strength DESC
LIMIT $limit
"""
collaborative_result = session.run(collaborative_cypher, {
'user_id': user_id, 'limit': limit * 2
})
collaborative_candidates = [record.data() for record in collaborative_result]
# 基于内容标签的推荐
content_based_cypher = """
MATCH (u:User {user_id: $user_id})-[:LIKE|COMMENT|SHARE]->(liked_content:Content)-[:TAGGED_WITH]->(tag:Tag)
WITH u, collect(DISTINCT tag.name) as user_tags
MATCH (recommended_content:Content)-[:TAGGED_WITH]->(tag:Tag)
WHERE tag.name IN user_tags AND NOT (u)-[:LIKE|COMMENT|SHARE]->(recommended_content)
WITH recommended_content,
[tag_name IN collect(DISTINCT tag.name) WHERE tag_name IN user_tags] as matching_tags,
count(DISTINCT tag) as tag_match_count
RETURN recommended_content.content_id as content_id,
recommended_content.title as title,
recommended_content.content_type as content_type,
tag_match_count,
matching_tags
ORDER BY tag_match_count DESC
LIMIT $limit
"""
content_result = session.run(content_based_cypher, {
'user_id': user_id, 'limit': limit * 2
})
content_candidates = [record.data() for record in content_result]
# 基于好友活动的推荐
social_cypher = """
MATCH (u:User {user_id: $user_id})-[:FRIEND]-(friend:User)
MATCH (friend)-[:LIKE|COMMENT|SHARE]->(friend_content:Content)
WHERE NOT (u)-[:LIKE|COMMENT|SHARE]->(friend_content)
WITH friend_content, count(*) as friend_interactions,
collect(DISTINCT friend.username) as active_friends
RETURN friend_content.content_id as content_id,
friend_content.title as title,
friend_content.content_type as content_type,
friend_interactions,
active_friends
ORDER BY friend_interactions DESC
LIMIT $limit
"""
social_result = session.run(social_cypher, {
'user_id': user_id, 'limit': limit * 2
})
social_candidates = [record.data() for record in social_result]
# 热门内容推荐
trending_cypher = """
MATCH (content:Content)
WHERE NOT (:User {user_id: $user_id})-[:LIKE|COMMENT|SHARE]->(content)
AND content.created_at >= date() - duration('P7D')
WITH content,
(content.like_count + content.comment_count * 2 + content.share_count * 3) as popularity_score
RETURN content.content_id as content_id,
content.title as title,
content.content_type as content_type,
popularity_score
ORDER BY popularity_score DESC
LIMIT $limit
"""
trending_result = session.run(trending_cypher, {
'user_id': user_id, 'limit': limit
})
trending_candidates = [record.data() for record in trending_result]
# 合并和评分
recommendations = self._merge_content_recommendations(
collaborative_candidates, content_candidates,
social_candidates, trending_candidates, limit
)
return recommendations
def _merge_content_recommendations(self, collaborative_candidates: List[Dict],
content_candidates: List[Dict],
social_candidates: List[Dict],
trending_candidates: List[Dict],
limit: int) -> List[RecommendationResult]:
"""合并内容推荐结果"""
candidate_scores = defaultdict(lambda: {'score': 0, 'reasons': [], 'data': None})
# 协同过滤分数
for candidate in collaborative_candidates:
content_id = candidate['content_id']
score = candidate['recommendation_strength'] * 0.4
candidate_scores[content_id]['score'] += score
recommenders = ', '.join(candidate['recommenders'][:3])
candidate_scores[content_id]['reasons'].append(f"相似用户推荐 ({recommenders})")
candidate_scores[content_id]['data'] = candidate
# 基于内容的分数
for candidate in content_candidates:
content_id = candidate['content_id']
score = candidate['tag_match_count'] * 0.3
candidate_scores[content_id]['score'] += score
matching_tags = ', '.join(candidate['matching_tags'])
candidate_scores[content_id]['reasons'].append(f"标签匹配: {matching_tags}")
if candidate_scores[content_id]['data'] is None:
candidate_scores[content_id]['data'] = candidate
# 社交推荐分数
for candidate in social_candidates:
content_id = candidate['content_id']
score = candidate['friend_interactions'] * 0.25
candidate_scores[content_id]['score'] += score
active_friends = ', '.join(candidate['active_friends'][:3])
candidate_scores[content_id]['reasons'].append(f"好友互动 ({active_friends})")
if candidate_scores[content_id]['data'] is None:
candidate_scores[content_id]['data'] = candidate
# 热门内容分数
for candidate in trending_candidates:
content_id = candidate['content_id']
score = math.log(candidate['popularity_score'] + 1) * 0.15
candidate_scores[content_id]['score'] += score
candidate_scores[content_id]['reasons'].append("热门内容")
if candidate_scores[content_id]['data'] is None:
candidate_scores[content_id]['data'] = candidate
# 多样性调整
self._apply_diversity_adjustment(candidate_scores)
# 排序并返回结果
sorted_candidates = sorted(candidate_scores.items(),
key=lambda x: x[1]['score'], reverse=True)
recommendations = []
for content_id, info in sorted_candidates[:limit]:
recommendations.append(RecommendationResult(
item_id=content_id,
item_type='content',
score=info['score'],
reason='; '.join(info['reasons']),
metadata=info['data']
))
return recommendations
def _apply_diversity_adjustment(self, candidate_scores: Dict):
"""应用多样性调整"""
content_types = defaultdict(int)
# 统计内容类型分布
for content_id, info in candidate_scores.items():
if info['data'] and 'content_type' in info['data']:
content_type = info['data']['content_type']
content_types[content_type] += 1
# 对过度集中的类型进行惩罚
for content_id, info in candidate_scores.items():
if info['data'] and 'content_type' in info['data']:
content_type = info['data']['content_type']
type_count = content_types[content_type]
if type_count > 3: # 如果某类型内容过多
penalty = (type_count - 3) * 0.1
info['score'] *= (1 - penalty)
def recommend_tags(self, user_id: str, limit: int = 10) -> List[RecommendationResult]:
"""推荐标签"""
with self.driver.session() as session:
cypher = """
MATCH (u:User {user_id: $user_id})-[:LIKE|COMMENT|SHARE]->(content:Content)-[:TAGGED_WITH]->(user_tag:Tag)
WITH u, collect(DISTINCT user_tag.name) as user_tags
MATCH (similar_user:User)-[:LIKE|COMMENT|SHARE]->(similar_content:Content)-[:TAGGED_WITH]->(similar_tag:Tag)
WHERE similar_user <> u
AND any(tag IN user_tags WHERE tag IN
[(similar_user)-[:LIKE|COMMENT|SHARE]->(sc:Content)-[:TAGGED_WITH]->(st:Tag) | st.name])
AND NOT similar_tag.name IN user_tags
WITH similar_tag, count(*) as recommendation_strength
MATCH (similar_tag)<-[:TAGGED_WITH]-(tagged_content:Content)
WITH similar_tag, recommendation_strength, count(tagged_content) as tag_popularity
RETURN similar_tag.name as tag_name,
recommendation_strength,
tag_popularity,
(recommendation_strength * 0.7 + log(tag_popularity) * 0.3) as score
ORDER BY score DESC
LIMIT $limit
"""
result = session.run(cypher, {'user_id': user_id, 'limit': limit})
recommendations = []
for record in result:
recommendations.append(RecommendationResult(
item_id=record['tag_name'],
item_type='tag',
score=record['score'],
reason=f"基于相似用户兴趣,使用频率: {record['tag_popularity']}",
metadata={
'recommendation_strength': record['recommendation_strength'],
'popularity': record['tag_popularity']
}
))
return recommendations
def get_recommendation_explanation(self, user_id: str, item_id: str,
item_type: str) -> Dict[str, Any]:
"""获取推荐解释"""
with self.driver.session() as session:
if item_type == 'user':
return self._explain_user_recommendation(session, user_id, item_id)
elif item_type == 'content':
return self._explain_content_recommendation(session, user_id, item_id)
else:
return {'explanation': '未知推荐类型'}
def _explain_user_recommendation(self, session, user_id: str, recommended_user_id: str) -> Dict[str, Any]:
"""解释用户推荐"""
# 共同好友
mutual_friends_cypher = """
MATCH (u:User {user_id: $user_id})-[:FRIEND]-(mutual:User)-[:FRIEND]-(recommended:User {user_id: $recommended_user_id})
RETURN collect(mutual.username) as mutual_friends
"""
mutual_result = session.run(mutual_friends_cypher, {
'user_id': user_id, 'recommended_user_id': recommended_user_id
})
mutual_friends = mutual_result.single()['mutual_friends']
# 共同兴趣
common_interests_cypher = """
MATCH (u:User {user_id: $user_id}), (recommended:User {user_id: $recommended_user_id})
RETURN [interest IN u.interests WHERE interest IN recommended.interests] as common_interests,
u.interests as user_interests,
recommended.interests as recommended_interests
"""
interests_result = session.run(common_interests_cypher, {
'user_id': user_id, 'recommended_user_id': recommended_user_id
})
interests_data = interests_result.single()
return {
'mutual_friends': mutual_friends,
'common_interests': interests_data['common_interests'],
'user_interests': interests_data['user_interests'],
'recommended_interests': interests_data['recommended_interests'],
'explanation': self._generate_user_explanation(mutual_friends, interests_data)
}
def _explain_content_recommendation(self, session, user_id: str, content_id: str) -> Dict[str, Any]:
"""解释内容推荐"""
# 相似用户行为
similar_users_cypher = """
MATCH (u:User {user_id: $user_id})-[:LIKE|COMMENT|SHARE]->(liked:Content)
MATCH (liked)<-[:LIKE|COMMENT|SHARE]-(similar:User)
MATCH (similar)-[:LIKE|COMMENT|SHARE]->(recommended:Content {content_id: $content_id})
WHERE u <> similar
RETURN collect(DISTINCT similar.username) as similar_users
"""
similar_result = session.run(similar_users_cypher, {
'user_id': user_id, 'content_id': content_id
})
similar_users = similar_result.single()['similar_users']
# 标签匹配
tag_match_cypher = """
MATCH (u:User {user_id: $user_id})-[:LIKE|COMMENT|SHARE]->(liked:Content)-[:TAGGED_WITH]->(user_tag:Tag)
MATCH (recommended:Content {content_id: $content_id})-[:TAGGED_WITH]->(content_tag:Tag)
WHERE user_tag = content_tag
RETURN collect(DISTINCT user_tag.name) as matching_tags
"""
tag_result = session.run(tag_match_cypher, {
'user_id': user_id, 'content_id': content_id
})
matching_tags = tag_result.single()['matching_tags']
# 好友活动
friend_activity_cypher = """
MATCH (u:User {user_id: $user_id})-[:FRIEND]-(friend:User)
MATCH (friend)-[:LIKE|COMMENT|SHARE]->(recommended:Content {content_id: $content_id})
RETURN collect(DISTINCT friend.username) as active_friends
"""
friend_result = session.run(friend_activity_cypher, {
'user_id': user_id, 'content_id': content_id
})
active_friends = friend_result.single()['active_friends']
return {
'similar_users': similar_users,
'matching_tags': matching_tags,
'active_friends': active_friends,
'explanation': self._generate_content_explanation(similar_users, matching_tags, active_friends)
}
def _generate_user_explanation(self, mutual_friends: List[str], interests_data: Dict) -> str:
"""生成用户推荐解释"""
explanations = []
if mutual_friends:
explanations.append(f"你们有 {len(mutual_friends)} 个共同好友: {', '.join(mutual_friends[:3])}")
if interests_data['common_interests']:
common = ', '.join(interests_data['common_interests'])
explanations.append(f"你们都对以下内容感兴趣: {common}")
if not explanations:
explanations.append("基于用户行为模式的相似性推荐")
return '; '.join(explanations)
def _generate_content_explanation(self, similar_users: List[str],
matching_tags: List[str],
active_friends: List[str]) -> str:
"""生成内容推荐解释"""
explanations = []
if similar_users:
explanations.append(f"与你兴趣相似的用户 ({', '.join(similar_users[:3])}) 也喜欢这个内容")
if matching_tags:
explanations.append(f"包含你感兴趣的标签: {', '.join(matching_tags)}")
if active_friends:
explanations.append(f"你的好友 ({', '.join(active_friends[:3])}) 与此内容有互动")
if not explanations:
explanations.append("基于热门趋势推荐")
return '; '.join(explanations)
def evaluate_recommendations(self, user_id: str, recommendations: List[RecommendationResult],
actual_interactions: List[str]) -> Dict[str, float]:
"""评估推荐效果"""
if not recommendations or not actual_interactions:
return {'precision': 0.0, 'recall': 0.0, 'f1': 0.0}
recommended_items = {rec.item_id for rec in recommendations}
actual_items = set(actual_interactions)
# 计算精确率、召回率和F1分数
true_positives = len(recommended_items & actual_items)
precision = true_positives / len(recommended_items) if recommended_items else 0.0
recall = true_positives / len(actual_items) if actual_items else 0.0
f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0.0
return {
'precision': precision,
'recall': recall,
'f1': f1,
'true_positives': true_positives,
'recommended_count': len(recommended_items),
'actual_count': len(actual_items)
}
def close(self):
"""关闭连接"""
if self.driver:
self.driver.close()
# 推荐系统使用示例
recommendation_engine = GraphRecommendationEngine(neo4j_config)
print("=== 好友推荐 ===")
friend_recommendations = recommendation_engine.recommend_friends('user_0001', limit=5)
for rec in friend_recommendations:
print(f"推荐用户: {rec.metadata['username']} (分数: {rec.score:.2f})")
print(f" 原因: {rec.reason}")
print(f" 位置: {rec.metadata['location']}")
print()
print("=== 内容推荐 ===")
content_recommendations = recommendation_engine.recommend_content('user_0001', limit=5)
for rec in content_recommendations:
print(f"推荐内容: {rec.metadata['title']} (分数: {rec.score:.2f})")
print(f" 类型: {rec.metadata['content_type']}")
print(f" 原因: {rec.reason}")
print()
print("=== 标签推荐 ===")
tag_recommendations = recommendation_engine.recommend_tags('user_0001', limit=5)
for rec in tag_recommendations:
print(f"推荐标签: {rec.item_id} (分数: {rec.score:.2f})")
print(f" 原因: {rec.reason}")
print()
# 推荐解释
if friend_recommendations:
print("=== 推荐解释 ===")
explanation = recommendation_engine.get_recommendation_explanation(
'user_0001', friend_recommendations[0].item_id, 'user'
)
print(f"推荐 {friend_recommendations[0].metadata['username']} 的原因:")
print(explanation['explanation'])
recommendation_engine.close()
9.2.2 实时推荐系统
import asyncio
import aioredis
from kafka import KafkaProducer, KafkaConsumer
import json
from datetime import datetime, timedelta
from typing import AsyncGenerator
import threading
from concurrent.futures import ThreadPoolExecutor
class RealTimeRecommendationSystem:
"""实时推荐系统"""
def __init__(self, neo4j_config: Dict[str, Any], redis_config: Dict[str, Any],
kafka_config: Dict[str, Any]):
self.recommendation_engine = GraphRecommendationEngine(neo4j_config)
self.redis_config = redis_config
self.kafka_config = kafka_config
# Kafka生产者和消费者
self.producer = KafkaProducer(
bootstrap_servers=kafka_config['bootstrap_servers'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
# 推荐缓存配置
self.cache_ttl = 3600 # 1小时
self.batch_size = 100
# 线程池
self.executor = ThreadPoolExecutor(max_workers=10)
logging.basicConfig(level=logging.INFO)
self.logger = logging.getLogger(__name__)
async def get_redis_connection(self):
"""获取Redis连接"""
return await aioredis.from_url(
f"redis://{self.redis_config['host']}:{self.redis_config['port']}",
password=self.redis_config.get('password'),
db=self.redis_config.get('db', 0)
)
async def get_cached_recommendations(self, user_id: str,
recommendation_type: str) -> List[RecommendationResult]:
"""获取缓存的推荐结果"""
redis = await self.get_redis_connection()
try:
cache_key = f"recommendations:{user_id}:{recommendation_type}"
cached_data = await redis.get(cache_key)
if cached_data:
data = json.loads(cached_data)
recommendations = []
for item in data:
recommendations.append(RecommendationResult(
item_id=item['item_id'],
item_type=item['item_type'],
score=item['score'],
reason=item['reason'],
metadata=item.get('metadata')
))
return recommendations
return None
finally:
await redis.close()
async def cache_recommendations(self, user_id: str, recommendation_type: str,
recommendations: List[RecommendationResult]):
"""缓存推荐结果"""
redis = await self.get_redis_connection()
try:
cache_key = f"recommendations:{user_id}:{recommendation_type}"
# 序列化推荐结果
data = []
for rec in recommendations:
data.append({
'item_id': rec.item_id,
'item_type': rec.item_type,
'score': rec.score,
'reason': rec.reason,
'metadata': rec.metadata
})
await redis.setex(cache_key, self.cache_ttl, json.dumps(data))
finally:
await redis.close()
async def get_real_time_recommendations(self, user_id: str,
recommendation_type: str,
limit: int = 10) -> List[RecommendationResult]:
"""获取实时推荐"""
# 首先检查缓存
cached_recommendations = await self.get_cached_recommendations(
user_id, recommendation_type
)
if cached_recommendations:
self.logger.info(f"返回用户 {user_id} 的缓存推荐")
return cached_recommendations[:limit]
# 生成新的推荐
self.logger.info(f"为用户 {user_id} 生成新的推荐")
if recommendation_type == 'friends':
recommendations = await asyncio.get_event_loop().run_in_executor(
self.executor,
self.recommendation_engine.recommend_friends,
user_id, limit
)
elif recommendation_type == 'content':
recommendations = await asyncio.get_event_loop().run_in_executor(
self.executor,
self.recommendation_engine.recommend_content,
user_id, limit
)
elif recommendation_type == 'tags':
recommendations = await asyncio.get_event_loop().run_in_executor(
self.executor,
self.recommendation_engine.recommend_tags,
user_id, limit
)
else:
return []
# 缓存结果
await self.cache_recommendations(user_id, recommendation_type, recommendations)
return recommendations
def publish_user_event(self, event_type: str, user_id: str,
item_id: str = None, metadata: Dict = None):
"""发布用户事件"""
event = {
'event_type': event_type,
'user_id': user_id,
'item_id': item_id,
'metadata': metadata or {},
'timestamp': datetime.now().isoformat()
}
self.producer.send('user_events', value=event)
self.logger.info(f"发布事件: {event_type} for user {user_id}")
async def invalidate_user_cache(self, user_id: str):
"""使用户缓存失效"""
redis = await self.get_redis_connection()
try:
# 删除所有相关的推荐缓存
pattern = f"recommendations:{user_id}:*"
keys = await redis.keys(pattern)
if keys:
await redis.delete(*keys)
self.logger.info(f"清除用户 {user_id} 的推荐缓存")
finally:
await redis.close()
def start_event_consumer(self):
"""启动事件消费者"""
def consume_events():
consumer = KafkaConsumer(
'user_events',
bootstrap_servers=self.kafka_config['bootstrap_servers'],
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
group_id='recommendation_system'
)
for message in consumer:
event = message.value
asyncio.run(self.process_user_event(event))
# 在后台线程中运行消费者
consumer_thread = threading.Thread(target=consume_events, daemon=True)
consumer_thread.start()
self.logger.info("事件消费者已启动")
async def process_user_event(self, event: Dict):
"""处理用户事件"""
event_type = event['event_type']
user_id = event['user_id']
# 根据事件类型决定是否需要更新推荐
if event_type in ['like', 'comment', 'share', 'follow', 'friend_request_accepted']:
await self.invalidate_user_cache(user_id)
# 如果是社交事件,也要更新相关用户的缓存
if event_type in ['follow', 'friend_request_accepted'] and 'target_user_id' in event:
await self.invalidate_user_cache(event['target_user_id'])
self.logger.info(f"处理事件: {event_type} for user {user_id}")
async def batch_update_recommendations(self, user_ids: List[str]):
"""批量更新推荐"""
tasks = []
for user_id in user_ids:
# 为每个用户生成所有类型的推荐
for rec_type in ['friends', 'content', 'tags']:
task = self.get_real_time_recommendations(user_id, rec_type)
tasks.append(task)
# 并发执行所有任务
await asyncio.gather(*tasks, return_exceptions=True)
self.logger.info(f"批量更新了 {len(user_ids)} 个用户的推荐")
def close(self):
"""关闭连接"""
self.producer.close()
self.recommendation_engine.close()
self.executor.shutdown(wait=True)
# 实时推荐系统使用示例
async def real_time_recommendation_demo():
# 配置信息
redis_config = {
'host': 'localhost',
'port': 6379,
'password': None,
'db': 0
}
kafka_config = {
'bootstrap_servers': ['localhost:9092']
}
# 创建实时推荐系统
real_time_system = RealTimeRecommendationSystem(
neo4j_config, redis_config, kafka_config
)
# 启动事件消费者
real_time_system.start_event_consumer()
# 获取实时推荐
print("=== 实时好友推荐 ===")
friend_recs = await real_time_system.get_real_time_recommendations(
'user_0001', 'friends', limit=5
)
for rec in friend_recs:
print(f"推荐: {rec.metadata['username']} (分数: {rec.score:.2f})")
# 模拟用户事件
real_time_system.publish_user_event(
'like', 'user_0001', 'content_0001',
{'content_type': 'post', 'author': 'user_0002'}
)
# 等待事件处理
await asyncio.sleep(1)
# 再次获取推荐(应该会重新生成)
print("\n=== 事件后的内容推荐 ===")
content_recs = await real_time_system.get_real_time_recommendations(
'user_0001', 'content', limit=5
)
for rec in content_recs:
print(f"推荐: {rec.metadata['title']} (分数: {rec.score:.2f})")
# 批量更新推荐
await real_time_system.batch_update_recommendations(['user_0001', 'user_0002'])
real_time_system.close()
# 运行实时推荐演示
# asyncio.run(real_time_recommendation_demo())
9.3 知识图谱
9.3.1 知识图谱构建
from typing import Dict, List, Tuple, Optional, Set
from dataclasses import dataclass
from enum import Enum
import spacy
import re
from collections import defaultdict
import requests
from bs4 import BeautifulSoup
import nltk
from nltk.tokenize import sent_tokenize, word_tokenize
from nltk.tag import pos_tag
from nltk.chunk import ne_chunk
from nltk.tree import Tree
class EntityType(Enum):
"""实体类型"""
PERSON = "Person"
ORGANIZATION = "Organization"
LOCATION = "Location"
EVENT = "Event"
CONCEPT = "Concept"
PRODUCT = "Product"
DATE = "Date"
UNKNOWN = "Unknown"
class RelationType(Enum):
"""关系类型"""
WORKS_FOR = "WORKS_FOR"
LOCATED_IN = "LOCATED_IN"
FOUNDED_BY = "FOUNDED_BY"
PARTICIPATED_IN = "PARTICIPATED_IN"
RELATED_TO = "RELATED_TO"
INSTANCE_OF = "INSTANCE_OF"
SUBCLASS_OF = "SUBCLASS_OF"
BORN_IN = "BORN_IN"
DIED_IN = "DIED_IN"
CREATED_BY = "CREATED_BY"
@dataclass
class Entity:
"""实体"""
id: str
name: str
entity_type: EntityType
properties: Dict[str, Any] = None
aliases: List[str] = None
description: str = None
@dataclass
class Relation:
"""关系"""
source_entity: str
target_entity: str
relation_type: RelationType
properties: Dict[str, Any] = None
confidence: float = 1.0
class KnowledgeGraphBuilder:
"""知识图谱构建器"""
def __init__(self, neo4j_config: Dict[str, Any]):
self.driver = GraphDatabase.driver(
neo4j_config['uri'],
auth=(neo4j_config['username'], neo4j_config['password'])
)
# 加载NLP模型
try:
self.nlp = spacy.load("en_core_web_sm")
except OSError:
print("请安装spaCy英文模型: python -m spacy download en_core_web_sm")
self.nlp = None
# 下载NLTK数据
try:
nltk.data.find('tokenizers/punkt')
except LookupError:
nltk.download('punkt')
try:
nltk.data.find('taggers/averaged_perceptron_tagger')
except LookupError:
nltk.download('averaged_perceptron_tagger')
try:
nltk.data.find('chunkers/maxent_ne_chunker')
except LookupError:
nltk.download('maxent_ne_chunker')
try:
nltk.data.find('corpora/words')
except LookupError:
nltk.download('words')
# 实体和关系存储
self.entities = {}
self.relations = []
# 预定义的关系模式
self.relation_patterns = {
r'(.+) works? (?:for|at) (.+)': RelationType.WORKS_FOR,
r'(.+) (?:is|was) (?:born|founded) in (.+)': RelationType.BORN_IN,
r'(.+) (?:is|was) located in (.+)': RelationType.LOCATED_IN,
r'(.+) (?:founded|established|created) (.+)': RelationType.FOUNDED_BY,
r'(.+) participated in (.+)': RelationType.PARTICIPATED_IN,
}
logging.basicConfig(level=logging.INFO)
self.logger = logging.getLogger(__name__)
def setup_constraints_and_indexes(self):
"""设置约束和索引"""
with self.driver.session() as session:
# 创建唯一约束
constraints = [
"CREATE CONSTRAINT entity_id_unique IF NOT EXISTS FOR (e:Entity) REQUIRE e.id IS UNIQUE",
"CREATE CONSTRAINT person_id_unique IF NOT EXISTS FOR (p:Person) REQUIRE p.id IS UNIQUE",
"CREATE CONSTRAINT organization_id_unique IF NOT EXISTS FOR (o:Organization) REQUIRE o.id IS UNIQUE",
"CREATE CONSTRAINT location_id_unique IF NOT EXISTS FOR (l:Location) REQUIRE l.id IS UNIQUE",
"CREATE CONSTRAINT concept_id_unique IF NOT EXISTS FOR (c:Concept) REQUIRE c.id IS UNIQUE"
]
for constraint in constraints:
try:
session.run(constraint)
except Exception as e:
self.logger.warning(f"约束创建失败: {e}")
# 创建索引
indexes = [
"CREATE INDEX entity_name_index IF NOT EXISTS FOR (e:Entity) ON (e.name)",
"CREATE INDEX entity_type_index IF NOT EXISTS FOR (e:Entity) ON (e.entity_type)"
]
for index in indexes:
try:
session.run(index)
except Exception as e:
self.logger.warning(f"索引创建失败: {e}")
def extract_entities_spacy(self, text: str) -> List[Entity]:
"""使用spaCy提取实体"""
if not self.nlp:
return []
doc = self.nlp(text)
entities = []
for ent in doc.ents:
entity_type = self._map_spacy_label_to_entity_type(ent.label_)
entity_id = self._generate_entity_id(ent.text, entity_type)
entity = Entity(
id=entity_id,
name=ent.text.strip(),
entity_type=entity_type,
properties={
'spacy_label': ent.label_,
'start_char': ent.start_char,
'end_char': ent.end_char
}
)
entities.append(entity)
return entities
def extract_entities_nltk(self, text: str) -> List[Entity]:
"""使用NLTK提取实体"""
sentences = sent_tokenize(text)
entities = []
for sentence in sentences:
tokens = word_tokenize(sentence)
pos_tags = pos_tag(tokens)
chunks = ne_chunk(pos_tags)
for chunk in chunks:
if isinstance(chunk, Tree):
entity_name = ' '.join([token for token, pos in chunk.leaves()])
entity_type = self._map_nltk_label_to_entity_type(chunk.label())
entity_id = self._generate_entity_id(entity_name, entity_type)
entity = Entity(
id=entity_id,
name=entity_name.strip(),
entity_type=entity_type,
properties={'nltk_label': chunk.label()}
)
entities.append(entity)
return entities
def _map_spacy_label_to_entity_type(self, label: str) -> EntityType:
"""映射spaCy标签到实体类型"""
mapping = {
'PERSON': EntityType.PERSON,
'ORG': EntityType.ORGANIZATION,
'GPE': EntityType.LOCATION, # Geopolitical entity
'LOC': EntityType.LOCATION,
'EVENT': EntityType.EVENT,
'PRODUCT': EntityType.PRODUCT,
'DATE': EntityType.DATE,
'TIME': EntityType.DATE
}
return mapping.get(label, EntityType.UNKNOWN)
def _map_nltk_label_to_entity_type(self, label: str) -> EntityType:
"""映射NLTK标签到实体类型"""
mapping = {
'PERSON': EntityType.PERSON,
'ORGANIZATION': EntityType.ORGANIZATION,
'GPE': EntityType.LOCATION,
'LOCATION': EntityType.LOCATION
}
return mapping.get(label, EntityType.UNKNOWN)
def _generate_entity_id(self, name: str, entity_type: EntityType) -> str:
"""生成实体ID"""
clean_name = re.sub(r'[^a-zA-Z0-9\s]', '', name).replace(' ', '_').lower()
return f"{entity_type.value.lower()}_{clean_name}"
def extract_relations_pattern_based(self, text: str, entities: List[Entity]) -> List[Relation]:
"""基于模式提取关系"""
relations = []
entity_names = {entity.name.lower(): entity.id for entity in entities}
sentences = sent_tokenize(text)
for sentence in sentences:
sentence_lower = sentence.lower()
for pattern, relation_type in self.relation_patterns.items():
matches = re.finditer(pattern, sentence_lower)
for match in matches:
entity1_text = match.group(1).strip()
entity2_text = match.group(2).strip()
# 查找匹配的实体
entity1_id = self._find_entity_by_text(entity1_text, entity_names)
entity2_id = self._find_entity_by_text(entity2_text, entity_names)
if entity1_id and entity2_id:
relation = Relation(
source_entity=entity1_id,
target_entity=entity2_id,
relation_type=relation_type,
properties={'extracted_from': sentence},
confidence=0.8
)
relations.append(relation)
return relations
def _find_entity_by_text(self, text: str, entity_names: Dict[str, str]) -> Optional[str]:
"""根据文本查找实体ID"""
text = text.strip().lower()
# 精确匹配
if text in entity_names:
return entity_names[text]
# 部分匹配
for entity_name, entity_id in entity_names.items():
if text in entity_name or entity_name in text:
return entity_id
return None
def extract_relations_dependency_parsing(self, text: str, entities: List[Entity]) -> List[Relation]:
"""基于依存句法分析提取关系"""
if not self.nlp:
return []
doc = self.nlp(text)
relations = []
entity_spans = {(ent.start_char, ent.end_char): ent for ent in entities}
for sent in doc.sents:
for token in sent:
# 查找动词及其主语和宾语
if token.pos_ == 'VERB':
subject = None
obj = None
for child in token.children:
if child.dep_ in ['nsubj', 'nsubjpass']:
subject = child
elif child.dep_ in ['dobj', 'pobj']:
obj = child
if subject and obj:
# 查找对应的实体
subject_entity = self._find_entity_by_token(subject, entity_spans)
obj_entity = self._find_entity_by_token(obj, entity_spans)
if subject_entity and obj_entity:
relation_type = self._infer_relation_type(
token.lemma_, subject_entity, obj_entity
)
relation = Relation(
source_entity=subject_entity.id,
target_entity=obj_entity.id,
relation_type=relation_type,
properties={
'verb': token.lemma_,
'sentence': sent.text
},
confidence=0.6
)
relations.append(relation)
return relations
def _find_entity_by_token(self, token, entity_spans: Dict) -> Optional[Entity]:
"""根据token查找实体"""
for (start, end), entity in entity_spans.items():
if start <= token.idx < end:
return entity
return None
def _infer_relation_type(self, verb: str, subject_entity: Entity,
obj_entity: Entity) -> RelationType:
"""推断关系类型"""
verb_mappings = {
'work': RelationType.WORKS_FOR,
'found': RelationType.FOUNDED_BY,
'create': RelationType.CREATED_BY,
'locate': RelationType.LOCATED_IN,
'born': RelationType.BORN_IN,
'participate': RelationType.PARTICIPATED_IN
}
return verb_mappings.get(verb, RelationType.RELATED_TO)
def process_text(self, text: str) -> Tuple[List[Entity], List[Relation]]:
"""处理文本,提取实体和关系"""
# 提取实体
spacy_entities = self.extract_entities_spacy(text)
nltk_entities = self.extract_entities_nltk(text)
# 合并实体(去重)
all_entities = self._merge_entities(spacy_entities + nltk_entities)
# 提取关系
pattern_relations = self.extract_relations_pattern_based(text, all_entities)
dependency_relations = self.extract_relations_dependency_parsing(text, all_entities)
all_relations = pattern_relations + dependency_relations
return all_entities, all_relations
def _merge_entities(self, entities: List[Entity]) -> List[Entity]:
"""合并重复实体"""
entity_dict = {}
for entity in entities:
key = (entity.name.lower(), entity.entity_type)
if key not in entity_dict:
entity_dict[key] = entity
else:
# 合并属性
existing_entity = entity_dict[key]
if existing_entity.properties:
existing_entity.properties.update(entity.properties or {})
else:
existing_entity.properties = entity.properties
return list(entity_dict.values())
def save_to_neo4j(self, entities: List[Entity], relations: List[Relation]):
"""保存到Neo4j"""
with self.driver.session() as session:
# 保存实体
for entity in entities:
self._create_entity(session, entity)
# 保存关系
for relation in relations:
self._create_relation(session, relation)
def _create_entity(self, session, entity: Entity):
"""创建实体节点"""
labels = ['Entity', entity.entity_type.value]
label_str = ':'.join(labels)
cypher = f"""
MERGE (e:{label_str} {{id: $id}})
SET e.name = $name,
e.entity_type = $entity_type,
e.description = $description,
e.aliases = $aliases,
e.properties = $properties,
e.updated_at = datetime()
"""
session.run(cypher, {
'id': entity.id,
'name': entity.name,
'entity_type': entity.entity_type.value,
'description': entity.description,
'aliases': entity.aliases or [],
'properties': entity.properties or {}
})
def _create_relation(self, session, relation: Relation):
"""创建关系"""
cypher = f"""
MATCH (source:Entity {{id: $source_id}})
MATCH (target:Entity {{id: $target_id}})
MERGE (source)-[r:{relation.relation_type.value}]->(target)
SET r.confidence = $confidence,
r.properties = $properties,
r.created_at = datetime()
"""
session.run(cypher, {
'source_id': relation.source_entity,
'target_id': relation.target_entity,
'confidence': relation.confidence,
'properties': relation.properties or {}
})
def build_knowledge_graph_from_text(self, text: str):
"""从文本构建知识图谱"""
self.logger.info("开始从文本构建知识图谱")
# 设置约束和索引
self.setup_constraints_and_indexes()
# 处理文本
entities, relations = self.process_text(text)
self.logger.info(f"提取到 {len(entities)} 个实体和 {len(relations)} 个关系")
# 保存到Neo4j
self.save_to_neo4j(entities, relations)
self.logger.info("知识图谱构建完成")
return entities, relations
def build_knowledge_graph_from_web(self, urls: List[str]):
"""从网页构建知识图谱"""
all_entities = []
all_relations = []
for url in urls:
try:
self.logger.info(f"处理网页: {url}")
text = self._extract_text_from_url(url)
if text:
entities, relations = self.process_text(text)
all_entities.extend(entities)
all_relations.extend(relations)
except Exception as e:
self.logger.error(f"处理网页 {url} 时出错: {e}")
# 合并所有实体和关系
merged_entities = self._merge_entities(all_entities)
# 保存到Neo4j
self.save_to_neo4j(merged_entities, all_relations)
return merged_entities, all_relations
def _extract_text_from_url(self, url: str) -> str:
"""从URL提取文本"""
try:
response = requests.get(url, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.content, 'html.parser')
# 移除脚本和样式元素
for script in soup(["script", "style"]):
script.decompose()
# 获取文本
text = soup.get_text()
# 清理文本
lines = (line.strip() for line in text.splitlines())
chunks = (phrase.strip() for line in lines for phrase in line.split(" "))
text = ' '.join(chunk for chunk in chunks if chunk)
return text
except Exception as e:
self.logger.error(f"从 {url} 提取文本失败: {e}")
return ""
def close(self):
"""关闭连接"""
if self.driver:
self.driver.close()
# 知识图谱构建示例
kg_builder = KnowledgeGraphBuilder(neo4j_config)
# 示例文本
sample_text = """
Apple Inc. is an American multinational technology company headquartered in Cupertino, California.
Tim Cook is the CEO of Apple Inc. The company was founded by Steve Jobs, Steve Wozniak, and Ronald Wayne in 1976.
Apple is known for products like the iPhone, iPad, and Mac computers.
Steve Jobs was born in San Francisco, California and died in Palo Alto, California.
Apple participated in the development of the personal computer revolution.
"""
print("=== 从文本构建知识图谱 ===")
entities, relations = kg_builder.build_knowledge_graph_from_text(sample_text)
print(f"\n提取的实体 ({len(entities)} 个):")
for entity in entities[:10]: # 显示前10个
print(f" {entity.name} ({entity.entity_type.value})")
print(f"\n提取的关系 ({len(relations)} 个):")
for relation in relations[:10]: # 显示前10个
print(f" {relation.source_entity} --{relation.relation_type.value}--> {relation.target_entity}")
kg_builder.close()
9.3.2 知识图谱查询与推理
from typing import Dict, List, Any, Optional, Tuple
from dataclasses import dataclass
from collections import defaultdict
import networkx as nx
from itertools import combinations
@dataclass
class QueryResult:
"""查询结果"""
entity_id: str
entity_name: str
entity_type: str
properties: Dict[str, Any]
score: float = 0.0
path: List[str] = None
explanation: str = None
@dataclass
class InferenceResult:
"""推理结果"""
source_entity: str
target_entity: str
inferred_relation: str
confidence: float
reasoning_path: List[str]
evidence: List[Dict[str, Any]]
class KnowledgeGraphQueryEngine:
"""知识图谱查询引擎"""
def __init__(self, neo4j_config: Dict[str, Any]):
self.driver = GraphDatabase.driver(
neo4j_config['uri'],
auth=(neo4j_config['username'], neo4j_config['password'])
)
logging.basicConfig(level=logging.INFO)
self.logger = logging.getLogger(__name__)
def find_entity_by_name(self, name: str, entity_type: str = None) -> List[QueryResult]:
"""根据名称查找实体"""
with self.driver.session() as session:
if entity_type:
cypher = """
MATCH (e:Entity)
WHERE e.name CONTAINS $name AND e.entity_type = $entity_type
RETURN e.id as id, e.name as name, e.entity_type as type,
e.properties as properties, e.description as description
ORDER BY e.name
"""
result = session.run(cypher, {'name': name, 'entity_type': entity_type})
else:
cypher = """
MATCH (e:Entity)
WHERE e.name CONTAINS $name
RETURN e.id as id, e.name as name, e.entity_type as type,
e.properties as properties, e.description as description
ORDER BY e.name
"""
result = session.run(cypher, {'name': name})
entities = []
for record in result:
entities.append(QueryResult(
entity_id=record['id'],
entity_name=record['name'],
entity_type=record['type'],
properties=record['properties'] or {}
))
return entities
def find_related_entities(self, entity_id: str, relation_type: str = None,
direction: str = 'both', max_depth: int = 2) -> List[QueryResult]:
"""查找相关实体"""
with self.driver.session() as session:
if relation_type:
if direction == 'outgoing':
cypher = f"""
MATCH (source:Entity {{id: $entity_id}})-[r:{relation_type}*1..{max_depth}]->(target:Entity)
RETURN DISTINCT target.id as id, target.name as name, target.entity_type as type,
target.properties as properties, length(r) as distance
ORDER BY distance, target.name
"""
elif direction == 'incoming':
cypher = f"""
MATCH (source:Entity {{id: $entity_id}})<-[r:{relation_type}*1..{max_depth}]-(target:Entity)
RETURN DISTINCT target.id as id, target.name as name, target.entity_type as type,
target.properties as properties, length(r) as distance
ORDER BY distance, target.name
"""
else: # both
cypher = f"""
MATCH (source:Entity {{id: $entity_id}})-[r:{relation_type}*1..{max_depth}]-(target:Entity)
RETURN DISTINCT target.id as id, target.name as name, target.entity_type as type,
target.properties as properties, length(r) as distance
ORDER BY distance, target.name
"""
else:
if direction == 'outgoing':
cypher = f"""
MATCH (source:Entity {{id: $entity_id}})-[r*1..{max_depth}]->(target:Entity)
RETURN DISTINCT target.id as id, target.name as name, target.entity_type as type,
target.properties as properties, length(r) as distance
ORDER BY distance, target.name
"""
elif direction == 'incoming':
cypher = f"""
MATCH (source:Entity {{id: $entity_id}})<-[r*1..{max_depth}]-(target:Entity)
RETURN DISTINCT target.id as id, target.name as name, target.entity_type as type,
target.properties as properties, length(r) as distance
ORDER BY distance, target.name
"""
else: # both
cypher = f"""
MATCH (source:Entity {{id: $entity_id}})-[r*1..{max_depth}]-(target:Entity)
RETURN DISTINCT target.id as id, target.name as name, target.entity_type as type,
target.properties as properties, length(r) as distance
ORDER BY distance, target.name
"""
result = session.run(cypher, {'entity_id': entity_id})
entities = []
for record in result:
entities.append(QueryResult(
entity_id=record['id'],
entity_name=record['name'],
entity_type=record['type'],
properties=record['properties'] or {},
score=1.0 / (record['distance'] + 1) # 距离越近分数越高
))
return entities
def find_shortest_path(self, source_id: str, target_id: str,
max_length: int = 6) -> Optional[QueryResult]:
"""查找最短路径"""
with self.driver.session() as session:
cypher = f"""
MATCH path = shortestPath((source:Entity {{id: $source_id}})-[*1..{max_length}]-(target:Entity {{id: $target_id}}))
RETURN path,
[node in nodes(path) | {{id: node.id, name: node.name, type: node.entity_type}}] as nodes,
[rel in relationships(path) | type(rel)] as relations,
length(path) as path_length
"""
result = session.run(cypher, {
'source_id': source_id,
'target_id': target_id
})
record = result.single()
if record:
nodes = record['nodes']
relations = record['relations']
path_length = record['path_length']
# 构建路径描述
path_description = []
for i in range(len(nodes) - 1):
path_description.append(f"{nodes[i]['name']} --{relations[i]}--> {nodes[i+1]['name']}")
return QueryResult(
entity_id=target_id,
entity_name=nodes[-1]['name'],
entity_type=nodes[-1]['type'],
properties={},
score=1.0 / (path_length + 1),
path=path_description,
explanation=f"通过 {path_length} 步路径连接"
)
return None
def semantic_search(self, query: str, entity_types: List[str] = None,
limit: int = 10) -> List[QueryResult]:
"""语义搜索"""
with self.driver.session() as session:
# 简单的文本匹配搜索(实际应用中可以使用向量搜索)
query_words = query.lower().split()
if entity_types:
type_filter = "AND e.entity_type IN $entity_types"
else:
type_filter = ""
cypher = f"""
MATCH (e:Entity)
WHERE any(word in $query_words WHERE e.name CONTAINS word OR
any(alias in e.aliases WHERE alias CONTAINS word) OR
e.description CONTAINS word)
{type_filter}
RETURN e.id as id, e.name as name, e.entity_type as type,
e.properties as properties, e.description as description
ORDER BY e.name
LIMIT $limit
"""
result = session.run(cypher, {
'query_words': query_words,
'entity_types': entity_types,
'limit': limit
})
entities = []
for record in result:
# 计算相关性分数
score = self._calculate_relevance_score(
query_words, record['name'], record['description']
)
entities.append(QueryResult(
entity_id=record['id'],
entity_name=record['name'],
entity_type=record['type'],
properties=record['properties'] or {},
score=score
))
# 按分数排序
entities.sort(key=lambda x: x.score, reverse=True)
return entities
def _calculate_relevance_score(self, query_words: List[str],
name: str, description: str) -> float:
"""计算相关性分数"""
score = 0.0
name_lower = name.lower()
desc_lower = (description or "").lower()
for word in query_words:
if word in name_lower:
score += 2.0 # 名称匹配权重更高
if word in desc_lower:
score += 1.0
return score / len(query_words)
def infer_relationships(self, entity_id: str, max_depth: int = 3) -> List[InferenceResult]:
"""推理潜在关系"""
inferences = []
# 基于共同邻居推理
common_neighbor_inferences = self._infer_by_common_neighbors(entity_id, max_depth)
inferences.extend(common_neighbor_inferences)
# 基于路径模式推理
path_pattern_inferences = self._infer_by_path_patterns(entity_id, max_depth)
inferences.extend(path_pattern_inferences)
# 基于类型规则推理
type_rule_inferences = self._infer_by_type_rules(entity_id)
inferences.extend(type_rule_inferences)
# 按置信度排序
inferences.sort(key=lambda x: x.confidence, reverse=True)
return inferences
def _infer_by_common_neighbors(self, entity_id: str, max_depth: int) -> List[InferenceResult]:
"""基于共同邻居推理关系"""
with self.driver.session() as session:
cypher = f"""
MATCH (source:Entity {{id: $entity_id}})-[r1]-(common)-[r2]-(target:Entity)
WHERE source <> target AND NOT (source)-[]-(target)
WITH source, target, common, type(r1) as rel1_type, type(r2) as rel2_type,
count(*) as common_neighbors
WHERE common_neighbors >= 2
RETURN target.id as target_id, target.name as target_name,
rel1_type, rel2_type, common_neighbors,
collect(common.name) as common_entities
ORDER BY common_neighbors DESC
LIMIT 20
"""
result = session.run(cypher, {'entity_id': entity_id})
inferences = []
for record in result:
# 推理关系类型
inferred_relation = self._infer_relation_from_pattern(
record['rel1_type'], record['rel2_type']
)
if inferred_relation:
confidence = min(0.8, record['common_neighbors'] * 0.1)
inference = InferenceResult(
source_entity=entity_id,
target_entity=record['target_id'],
inferred_relation=inferred_relation,
confidence=confidence,
reasoning_path=[f"共同邻居: {', '.join(record['common_entities'][:3])}"],
evidence=[{
'type': 'common_neighbors',
'count': record['common_neighbors'],
'entities': record['common_entities']
}]
)
inferences.append(inference)
return inferences
def _infer_by_path_patterns(self, entity_id: str, max_depth: int) -> List[InferenceResult]:
"""基于路径模式推理关系"""
with self.driver.session() as session:
# 查找特定的路径模式,如 A-WORKS_FOR->B-LOCATED_IN->C 可能推理出 A-LOCATED_IN->C
cypher = """
MATCH (source:Entity {id: $entity_id})-[r1:WORKS_FOR]->(org:Organization)-[r2:LOCATED_IN]->(location:Location)
WHERE NOT (source)-[:LOCATED_IN]->(location)
RETURN location.id as target_id, location.name as target_name,
org.name as intermediate_org
"""
result = session.run(cypher, {'entity_id': entity_id})
inferences = []
for record in result:
inference = InferenceResult(
source_entity=entity_id,
target_entity=record['target_id'],
inferred_relation='LOCATED_IN',
confidence=0.7,
reasoning_path=[f"工作于 {record['intermediate_org']},该组织位于目标位置"],
evidence=[{
'type': 'path_pattern',
'pattern': 'WORKS_FOR -> LOCATED_IN',
'intermediate': record['intermediate_org']
}]
)
inferences.append(inference)
return inferences
def _infer_by_type_rules(self, entity_id: str) -> List[InferenceResult]:
"""基于类型规则推理关系"""
with self.driver.session() as session:
# 获取实体信息
cypher = """
MATCH (e:Entity {id: $entity_id})
RETURN e.entity_type as entity_type, e.name as entity_name
"""
result = session.run(cypher, {'entity_id': entity_id})
record = result.single()
if not record:
return []
entity_type = record['entity_type']
inferences = []
# 基于实体类型的推理规则
if entity_type == 'Person':
# 人员可能与同一组织的其他人员有关系
cypher = """
MATCH (person:Person {id: $entity_id})-[:WORKS_FOR]->(org:Organization)<-[:WORKS_FOR]-(colleague:Person)
WHERE person <> colleague AND NOT (person)-[:KNOWS]-(colleague)
RETURN colleague.id as target_id, colleague.name as target_name, org.name as org_name
LIMIT 10
"""
result = session.run(cypher, {'entity_id': entity_id})
for record in result:
inference = InferenceResult(
source_entity=entity_id,
target_entity=record['target_id'],
inferred_relation='KNOWS',
confidence=0.6,
reasoning_path=[f"同在 {record['org_name']} 工作的同事"],
evidence=[{
'type': 'type_rule',
'rule': 'colleagues_know_each_other',
'organization': record['org_name']
}]
)
inferences.append(inference)
return inferences
def _infer_relation_from_pattern(self, rel1_type: str, rel2_type: str) -> Optional[str]:
"""从关系模式推理新关系"""
patterns = {
('WORKS_FOR', 'LOCATED_IN'): 'LOCATED_IN',
('FOUNDED_BY', 'BORN_IN'): 'RELATED_TO',
('WORKS_FOR', 'WORKS_FOR'): 'KNOWS', # 同事关系
}
return patterns.get((rel1_type, rel2_type))
def explain_relationship(self, source_id: str, target_id: str) -> Dict[str, Any]:
"""解释两个实体之间的关系"""
with self.driver.session() as session:
# 查找直接关系
cypher = """
MATCH (source:Entity {id: $source_id})-[r]-(target:Entity {id: $target_id})
RETURN type(r) as relation_type, r.confidence as confidence,
r.properties as properties
"""
result = session.run(cypher, {
'source_id': source_id,
'target_id': target_id
})
direct_relations = []
for record in result:
direct_relations.append({
'type': record['relation_type'],
'confidence': record['confidence'],
'properties': record['properties'] or {}
})
# 查找间接路径
shortest_path = self.find_shortest_path(source_id, target_id)
# 查找共同邻居
cypher = """
MATCH (source:Entity {id: $source_id})-[]-(common)-[]-(target:Entity {id: $target_id})
RETURN common.id as common_id, common.name as common_name,
common.entity_type as common_type
LIMIT 10
"""
result = session.run(cypher, {
'source_id': source_id,
'target_id': target_id
})
common_neighbors = []
for record in result:
common_neighbors.append({
'id': record['common_id'],
'name': record['common_name'],
'type': record['common_type']
})
return {
'direct_relations': direct_relations,
'shortest_path': shortest_path,
'common_neighbors': common_neighbors,
'explanation': self._generate_relationship_explanation(
direct_relations, shortest_path, common_neighbors
)
}
def _generate_relationship_explanation(self, direct_relations: List[Dict],
shortest_path: Optional[QueryResult],
common_neighbors: List[Dict]) -> str:
"""生成关系解释"""
explanations = []
if direct_relations:
rel_types = [rel['type'] for rel in direct_relations]
explanations.append(f"直接关系: {', '.join(rel_types)}")
if shortest_path and shortest_path.path:
explanations.append(f"间接关系: {' -> '.join(shortest_path.path)}")
if common_neighbors:
neighbor_names = [neighbor['name'] for neighbor in common_neighbors[:3]]
explanations.append(f"共同邻居: {', '.join(neighbor_names)}")
if not explanations:
return "未发现明显关系"
return "; ".join(explanations)
def close(self):
"""关闭连接"""
if self.driver:
self.driver.close()
# 知识图谱查询示例
query_engine = KnowledgeGraphQueryEngine(neo4j_config)
print("=== 知识图谱查询示例 ===")
# 1. 实体搜索
print("\n1. 搜索Apple相关实体:")
apple_entities = query_engine.find_entity_by_name("Apple")
for entity in apple_entities:
print(f" {entity.entity_name} ({entity.entity_type})")
# 2. 查找相关实体
if apple_entities:
apple_id = apple_entities[0].entity_id
print(f"\n2. 查找与 {apple_entities[0].entity_name} 相关的实体:")
related_entities = query_engine.find_related_entities(apple_id, max_depth=2)
for entity in related_entities[:5]:
print(f" {entity.entity_name} ({entity.entity_type}) - 分数: {entity.score:.2f}")
# 3. 语义搜索
print("\n3. 语义搜索 'technology company':")
search_results = query_engine.semantic_search("technology company", limit=5)
for result in search_results:
print(f" {result.entity_name} ({result.entity_type}) - 分数: {result.score:.2f}")
# 4. 关系推理
if apple_entities:
print(f"\n4. 为 {apple_entities[0].entity_name} 推理潜在关系:")
inferences = query_engine.infer_relationships(apple_id, max_depth=2)
for inference in inferences[:3]:
print(f" 可能与 {inference.target_entity} 有 {inference.inferred_relation} 关系")
print(f" 置信度: {inference.confidence:.2f}")
print(f" 推理路径: {', '.join(inference.reasoning_path)}")
query_engine.close()
9.4 章节总结
9.4.1 核心知识点
社交网络分析
- 用户关系建模和分析
- 社区检测算法应用
- 影响力分析和传播模型
- 网络可视化技术
推荐系统
- 基于图的协同过滤
- 多类型推荐算法
- 实时推荐架构
- 推荐解释和效果评估
知识图谱
- 实体识别和关系抽取
- 知识图谱构建流程
- 语义查询和推理
- 知识融合和质量控制
9.4.2 最佳实践
数据建模
- 合理设计节点和关系类型
- 使用适当的属性存储元数据
- 考虑数据的时间维度
- 建立数据质量检查机制
性能优化
- 创建必要的索引和约束
- 优化Cypher查询语句
- 使用缓存提高响应速度
- 实现数据分片和负载均衡
系统架构
- 采用微服务架构设计
- 实现异步处理机制
- 建立监控和告警系统
- 考虑数据备份和恢复
9.4.3 技术栈集成
数据处理
- Apache Kafka用于实时数据流
- Redis用于缓存和会话管理
- Elasticsearch用于全文搜索
- Apache Spark用于大数据处理
机器学习
- scikit-learn用于传统机器学习
- PyTorch/TensorFlow用于深度学习
- NetworkX用于图分析
- spaCy/NLTK用于自然语言处理
可视化
- D3.js用于交互式图可视化
- Plotly用于数据可视化
- Gephi用于网络分析
- Cytoscape用于生物网络分析
9.4.4 练习题
基础练习
社交网络分析
- 实现用户相似度计算算法
- 构建好友推荐系统
- 分析用户活跃度模式
- 检测虚假账户和异常行为
推荐系统
- 实现基于内容的推荐算法
- 构建混合推荐系统
- 评估推荐系统效果
- 处理冷启动问题
知识图谱
- 从结构化数据构建知识图谱
- 实现实体链接算法
- 构建领域本体
- 实现知识图谱问答系统
进阶练习
实时系统
- 构建实时社交网络分析系统
- 实现流式推荐算法
- 建立实时异常检测系统
- 优化系统延迟和吞吐量
大规模处理
- 处理百万级用户的社交网络
- 实现分布式图算法
- 优化内存使用和查询性能
- 实现数据分片策略
智能分析
- 实现图神经网络模型
- 构建多模态知识图谱
- 实现因果推理算法
- 建立知识图谱质量评估体系
项目练习
完整系统开发
- 开发社交媒体分析平台
- 构建电商推荐系统
- 建立企业知识管理系统
- 实现智能问答系统
行业应用
- 金融风险分析系统
- 医疗知识图谱应用
- 供应链网络分析
- 学术论文关系挖掘
通过这些实战案例的学习和实践,你将能够掌握Neo4j在实际项目中的应用技巧,理解图数据库在不同场景下的优势,并能够设计和实现复杂的图数据应用系统。