4.1 图数据建模基础
建模原则
图数据建模与关系型数据库建模有本质区别,需要遵循以下原则:
- 以关系为中心:重点关注实体间的关系
- 白板友好:模型应该直观易懂
- 查询驱动:根据查询需求设计模型
- 演化友好:支持模式的灵活变更
- 性能优先:优化常用查询路径
建模步骤
from typing import Dict, List, Set, Tuple
from dataclasses import dataclass
from enum import Enum
class ModelingStep(Enum):
IDENTIFY_ENTITIES = "识别实体"
DEFINE_RELATIONSHIPS = "定义关系"
ADD_PROPERTIES = "添加属性"
OPTIMIZE_QUERIES = "优化查询"
VALIDATE_MODEL = "验证模型"
@dataclass
class Entity:
"""实体定义"""
name: str
labels: List[str]
properties: Dict[str, str] # 属性名: 属性类型
description: str = ""
@dataclass
class Relationship:
"""关系定义"""
name: str
from_entity: str
to_entity: str
properties: Dict[str, str]
cardinality: str # 1:1, 1:N, N:M
description: str = ""
class GraphDataModeler:
"""图数据建模器"""
def __init__(self):
self.entities: Dict[str, Entity] = {}
self.relationships: Dict[str, Relationship] = {}
self.queries: List[str] = []
def add_entity(self, entity: Entity) -> None:
"""添加实体"""
self.entities[entity.name] = entity
def add_relationship(self, relationship: Relationship) -> None:
"""添加关系"""
self.relationships[relationship.name] = relationship
def add_query_requirement(self, query: str) -> None:
"""添加查询需求"""
self.queries.append(query)
def validate_model(self) -> List[str]:
"""验证模型"""
issues = []
# 检查关系的实体是否存在
for rel_name, rel in self.relationships.items():
if rel.from_entity not in self.entities:
issues.append(f"关系 {rel_name} 的起始实体 {rel.from_entity} 不存在")
if rel.to_entity not in self.entities:
issues.append(f"关系 {rel_name} 的目标实体 {rel.to_entity} 不存在")
# 检查是否有孤立实体
connected_entities = set()
for rel in self.relationships.values():
connected_entities.add(rel.from_entity)
connected_entities.add(rel.to_entity)
isolated_entities = set(self.entities.keys()) - connected_entities
if isolated_entities:
issues.append(f"发现孤立实体: {isolated_entities}")
return issues
def generate_cypher_schema(self) -> str:
"""生成Cypher模式创建语句"""
schema_statements = []
# 生成约束和索引
for entity_name, entity in self.entities.items():
for label in entity.labels:
# 假设第一个属性是唯一标识符
if entity.properties:
first_prop = list(entity.properties.keys())[0]
schema_statements.append(
f"CREATE CONSTRAINT {entity_name.lower()}_{first_prop}_unique "
f"FOR (n:{label}) REQUIRE n.{first_prop} IS UNIQUE"
)
# 为其他属性创建索引
for prop in list(entity.properties.keys())[1:]:
schema_statements.append(
f"CREATE INDEX {entity_name.lower()}_{prop}_index "
f"FOR (n:{label}) ON (n.{prop})"
)
return ";\n".join(schema_statements)
def export_to_dict(self) -> Dict:
"""导出模型为字典"""
return {
'entities': {name: {
'labels': entity.labels,
'properties': entity.properties,
'description': entity.description
} for name, entity in self.entities.items()},
'relationships': {name: {
'from_entity': rel.from_entity,
'to_entity': rel.to_entity,
'properties': rel.properties,
'cardinality': rel.cardinality,
'description': rel.description
} for name, rel in self.relationships.items()},
'queries': self.queries
}
# 使用示例
modeler = GraphDataModeler()
# 定义实体
user_entity = Entity(
name="User",
labels=["User", "Person"],
properties={
"user_id": "string",
"username": "string",
"email": "string",
"created_at": "datetime",
"last_login": "datetime"
},
description="系统用户"
)
post_entity = Entity(
name="Post",
labels=["Post", "Content"],
properties={
"post_id": "string",
"title": "string",
"content": "string",
"created_at": "datetime",
"view_count": "integer"
},
description="用户发布的内容"
)
modeler.add_entity(user_entity)
modeler.add_entity(post_entity)
# 定义关系
authored_rel = Relationship(
name="AUTHORED",
from_entity="User",
to_entity="Post",
properties={
"created_at": "datetime"
},
cardinality="1:N",
description="用户创作内容的关系"
)
modeler.add_relationship(authored_rel)
4.2 常见建模模式
4.2.1 社交网络模型
class SocialNetworkModeler:
"""社交网络建模"""
def __init__(self):
self.modeler = GraphDataModeler()
self._setup_social_model()
def _setup_social_model(self):
"""设置社交网络模型"""
# 用户实体
user = Entity(
name="User",
labels=["User", "Person"],
properties={
"user_id": "string",
"username": "string",
"email": "string",
"full_name": "string",
"bio": "string",
"avatar_url": "string",
"birth_date": "date",
"location": "string",
"created_at": "datetime",
"last_active": "datetime",
"follower_count": "integer",
"following_count": "integer",
"post_count": "integer"
},
description="社交网络用户"
)
# 帖子实体
post = Entity(
name="Post",
labels=["Post", "Content"],
properties={
"post_id": "string",
"content": "string",
"media_urls": "list",
"hashtags": "list",
"mentions": "list",
"created_at": "datetime",
"updated_at": "datetime",
"like_count": "integer",
"comment_count": "integer",
"share_count": "integer",
"visibility": "string" # public, private, friends
},
description="用户发布的内容"
)
# 评论实体
comment = Entity(
name="Comment",
labels=["Comment", "Content"],
properties={
"comment_id": "string",
"content": "string",
"created_at": "datetime",
"updated_at": "datetime",
"like_count": "integer",
"reply_count": "integer"
},
description="帖子评论"
)
# 标签实体
hashtag = Entity(
name="Hashtag",
labels=["Hashtag", "Tag"],
properties={
"tag_id": "string",
"name": "string",
"created_at": "datetime",
"usage_count": "integer",
"trending_score": "float"
},
description="内容标签"
)
# 添加实体
for entity in [user, post, comment, hashtag]:
self.modeler.add_entity(entity)
# 定义关系
relationships = [
Relationship(
name="FOLLOWS",
from_entity="User",
to_entity="User",
properties={
"followed_at": "datetime",
"notification_enabled": "boolean"
},
cardinality="N:M",
description="用户关注关系"
),
Relationship(
name="POSTED",
from_entity="User",
to_entity="Post",
properties={
"posted_at": "datetime"
},
cardinality="1:N",
description="用户发布帖子"
),
Relationship(
name="LIKED",
from_entity="User",
to_entity="Post",
properties={
"liked_at": "datetime"
},
cardinality="N:M",
description="用户点赞帖子"
),
Relationship(
name="COMMENTED",
from_entity="User",
to_entity="Comment",
properties={
"commented_at": "datetime"
},
cardinality="1:N",
description="用户发表评论"
),
Relationship(
name="COMMENT_ON",
from_entity="Comment",
to_entity="Post",
properties={
"commented_at": "datetime"
},
cardinality="N:1",
description="评论所属帖子"
),
Relationship(
name="REPLY_TO",
from_entity="Comment",
to_entity="Comment",
properties={
"replied_at": "datetime"
},
cardinality="N:1",
description="评论回复关系"
),
Relationship(
name="TAGGED",
from_entity="Post",
to_entity="Hashtag",
properties={
"tagged_at": "datetime"
},
cardinality="N:M",
description="帖子标签关系"
),
Relationship(
name="MENTIONED",
from_entity="Post",
to_entity="User",
properties={
"mentioned_at": "datetime"
},
cardinality="N:M",
description="帖子提及用户"
)
]
for rel in relationships:
self.modeler.add_relationship(rel)
def generate_sample_queries(self) -> List[str]:
"""生成示例查询"""
return [
# 用户时间线
"""
MATCH (user:User {username: $username})-[:FOLLOWS]->(followed:User)
MATCH (followed)-[:POSTED]->(post:Post)
WHERE post.created_at > datetime() - duration('P7D')
RETURN post, followed
ORDER BY post.created_at DESC
LIMIT 20
""",
# 好友推荐
"""
MATCH (user:User {username: $username})-[:FOLLOWS]->(friend)-[:FOLLOWS]->(recommendation)
WHERE NOT (user)-[:FOLLOWS]->(recommendation) AND user <> recommendation
WITH recommendation, count(*) as mutual_friends
RETURN recommendation, mutual_friends
ORDER BY mutual_friends DESC
LIMIT 10
""",
# 热门标签
"""
MATCH (post:Post)-[:TAGGED]->(hashtag:Hashtag)
WHERE post.created_at > datetime() - duration('P1D')
WITH hashtag, count(post) as usage_today
RETURN hashtag.name, usage_today
ORDER BY usage_today DESC
LIMIT 10
""",
# 用户影响力分析
"""
MATCH (user:User)-[:POSTED]->(post:Post)
OPTIONAL MATCH (post)<-[:LIKED]-(liker:User)
OPTIONAL MATCH (post)<-[:COMMENT_ON]-(comment:Comment)
WITH user,
count(DISTINCT post) as post_count,
count(DISTINCT liker) as total_likes,
count(DISTINCT comment) as total_comments
RETURN user.username,
post_count,
total_likes,
total_comments,
(total_likes + total_comments * 2) as influence_score
ORDER BY influence_score DESC
"""
]
# 使用示例
social_modeler = SocialNetworkModeler()
print("社交网络模型验证:", social_modeler.modeler.validate_model())
print("\n示例查询:")
for i, query in enumerate(social_modeler.generate_sample_queries(), 1):
print(f"\n查询 {i}:")
print(query.strip())
4.2.2 电商推荐系统模型
class EcommerceModeler:
"""电商推荐系统建模"""
def __init__(self):
self.modeler = GraphDataModeler()
self._setup_ecommerce_model()
def _setup_ecommerce_model(self):
"""设置电商模型"""
# 用户实体
user = Entity(
name="User",
labels=["User", "Customer"],
properties={
"user_id": "string",
"email": "string",
"username": "string",
"full_name": "string",
"phone": "string",
"birth_date": "date",
"gender": "string",
"registration_date": "datetime",
"last_login": "datetime",
"total_orders": "integer",
"total_spent": "float",
"loyalty_level": "string" # bronze, silver, gold, platinum
},
description="电商平台用户"
)
# 产品实体
product = Entity(
name="Product",
labels=["Product", "Item"],
properties={
"product_id": "string",
"name": "string",
"description": "string",
"price": "float",
"cost": "float",
"sku": "string",
"brand": "string",
"weight": "float",
"dimensions": "string",
"color": "string",
"size": "string",
"material": "string",
"created_at": "datetime",
"updated_at": "datetime",
"stock_quantity": "integer",
"rating_avg": "float",
"rating_count": "integer",
"view_count": "integer",
"sales_count": "integer"
},
description="商品产品"
)
# 分类实体
category = Entity(
name="Category",
labels=["Category"],
properties={
"category_id": "string",
"name": "string",
"description": "string",
"level": "integer",
"path": "string",
"created_at": "datetime"
},
description="商品分类"
)
# 订单实体
order = Entity(
name="Order",
labels=["Order", "Transaction"],
properties={
"order_id": "string",
"order_number": "string",
"status": "string", # pending, confirmed, shipped, delivered, cancelled
"total_amount": "float",
"discount_amount": "float",
"tax_amount": "float",
"shipping_cost": "float",
"payment_method": "string",
"created_at": "datetime",
"updated_at": "datetime",
"shipped_at": "datetime",
"delivered_at": "datetime"
},
description="用户订单"
)
# 评价实体
review = Entity(
name="Review",
labels=["Review", "Feedback"],
properties={
"review_id": "string",
"rating": "integer", # 1-5
"title": "string",
"content": "string",
"helpful_count": "integer",
"verified_purchase": "boolean",
"created_at": "datetime",
"updated_at": "datetime"
},
description="商品评价"
)
# 购物车实体
cart = Entity(
name="Cart",
labels=["Cart", "ShoppingCart"],
properties={
"cart_id": "string",
"session_id": "string",
"created_at": "datetime",
"updated_at": "datetime",
"total_items": "integer",
"total_amount": "float"
},
description="购物车"
)
# 添加实体
for entity in [user, product, category, order, review, cart]:
self.modeler.add_entity(entity)
# 定义关系
relationships = [
# 用户行为关系
Relationship(
name="VIEWED",
from_entity="User",
to_entity="Product",
properties={
"viewed_at": "datetime",
"duration": "integer", # 浏览时长(秒)
"source": "string" # search, recommendation, category
},
cardinality="N:M",
description="用户浏览商品"
),
Relationship(
name="PURCHASED",
from_entity="User",
to_entity="Product",
properties={
"purchased_at": "datetime",
"quantity": "integer",
"unit_price": "float",
"total_price": "float"
},
cardinality="N:M",
description="用户购买商品"
),
Relationship(
name="ADDED_TO_CART",
from_entity="User",
to_entity="Product",
properties={
"added_at": "datetime",
"quantity": "integer",
"removed_at": "datetime"
},
cardinality="N:M",
description="用户添加到购物车"
),
Relationship(
name="REVIEWED",
from_entity="User",
to_entity="Review",
properties={
"reviewed_at": "datetime"
},
cardinality="1:N",
description="用户发表评价"
),
Relationship(
name="REVIEW_FOR",
from_entity="Review",
to_entity="Product",
properties={
"reviewed_at": "datetime"
},
cardinality="N:1",
description="评价对应商品"
),
# 商品关系
Relationship(
name="BELONGS_TO",
from_entity="Product",
to_entity="Category",
properties={
"assigned_at": "datetime"
},
cardinality="N:M",
description="商品所属分类"
),
Relationship(
name="SIMILAR_TO",
from_entity="Product",
to_entity="Product",
properties={
"similarity_score": "float",
"calculated_at": "datetime",
"algorithm": "string" # content_based, collaborative, hybrid
},
cardinality="N:M",
description="商品相似性"
),
Relationship(
name="FREQUENTLY_BOUGHT_WITH",
from_entity="Product",
to_entity="Product",
properties={
"confidence": "float",
"support": "float",
"lift": "float",
"calculated_at": "datetime"
},
cardinality="N:M",
description="商品关联购买"
),
# 订单关系
Relationship(
name="PLACED",
from_entity="User",
to_entity="Order",
properties={
"placed_at": "datetime"
},
cardinality="1:N",
description="用户下单"
),
Relationship(
name="CONTAINS",
from_entity="Order",
to_entity="Product",
properties={
"quantity": "integer",
"unit_price": "float",
"total_price": "float"
},
cardinality="N:M",
description="订单包含商品"
),
# 分类层次关系
Relationship(
name="PARENT_OF",
from_entity="Category",
to_entity="Category",
properties={
"created_at": "datetime"
},
cardinality="1:N",
description="分类父子关系"
)
]
for rel in relationships:
self.modeler.add_relationship(rel)
def generate_recommendation_queries(self) -> Dict[str, str]:
"""生成推荐系统查询"""
return {
"collaborative_filtering": """
// 协同过滤推荐
MATCH (user:User {user_id: $user_id})-[:PURCHASED]->(product:Product)
MATCH (product)<-[:PURCHASED]-(other_user:User)-[:PURCHASED]->(recommendation:Product)
WHERE NOT (user)-[:PURCHASED]->(recommendation)
WITH recommendation, count(*) as common_purchases
RETURN recommendation, common_purchases
ORDER BY common_purchases DESC
LIMIT 10
""",
"content_based": """
// 基于内容的推荐
MATCH (user:User {user_id: $user_id})-[:PURCHASED]->(purchased:Product)
MATCH (purchased)-[:SIMILAR_TO]->(similar:Product)
WHERE NOT (user)-[:PURCHASED]->(similar)
WITH similar, avg(similar.rating_avg) as avg_rating, sum(similar.similarity_score) as total_similarity
RETURN similar, avg_rating, total_similarity
ORDER BY total_similarity DESC, avg_rating DESC
LIMIT 10
""",
"frequently_bought_together": """
// 经常一起购买的商品
MATCH (user:User {user_id: $user_id})-[:ADDED_TO_CART]->(product:Product)
MATCH (product)-[:FREQUENTLY_BOUGHT_WITH]->(related:Product)
WHERE NOT (user)-[:ADDED_TO_CART]->(related)
RETURN related, related.confidence as confidence
ORDER BY confidence DESC
LIMIT 5
""",
"trending_in_category": """
// 分类热门商品
MATCH (user:User {user_id: $user_id})-[:VIEWED]->(viewed:Product)-[:BELONGS_TO]->(category:Category)
MATCH (category)<-[:BELONGS_TO]-(trending:Product)
WHERE trending.created_at > datetime() - duration('P30D')
AND NOT (user)-[:PURCHASED]->(trending)
WITH trending, count(*) as category_views
RETURN trending, category_views, trending.sales_count
ORDER BY trending.sales_count DESC, category_views DESC
LIMIT 10
""",
"user_behavior_analysis": """
// 用户行为分析
MATCH (user:User {user_id: $user_id})
OPTIONAL MATCH (user)-[v:VIEWED]->(viewed:Product)
OPTIONAL MATCH (user)-[p:PURCHASED]->(purchased:Product)
OPTIONAL MATCH (user)-[c:ADDED_TO_CART]->(cart_item:Product)
WITH user,
count(DISTINCT viewed) as viewed_count,
count(DISTINCT purchased) as purchased_count,
count(DISTINCT cart_item) as cart_count,
avg(purchased.price) as avg_purchase_price
RETURN user.user_id,
viewed_count,
purchased_count,
cart_count,
avg_purchase_price,
CASE
WHEN purchased_count > 10 THEN 'high_value'
WHEN purchased_count > 3 THEN 'medium_value'
ELSE 'low_value'
END as customer_segment
"""
}
# 使用示例
ecommerce_modeler = EcommerceModeler()
print("电商模型验证:", ecommerce_modeler.modeler.validate_model())
print("\n推荐查询:")
for name, query in ecommerce_modeler.generate_recommendation_queries().items():
print(f"\n{name}:")
print(query.strip())
4.2.3 知识图谱模型
class KnowledgeGraphModeler:
"""知识图谱建模"""
def __init__(self):
self.modeler = GraphDataModeler()
self._setup_knowledge_model()
def _setup_knowledge_model(self):
"""设置知识图谱模型"""
# 实体类型
entities = [
Entity(
name="Person",
labels=["Person", "Entity"],
properties={
"entity_id": "string",
"name": "string",
"birth_date": "date",
"death_date": "date",
"nationality": "string",
"occupation": "list",
"description": "string",
"aliases": "list",
"confidence": "float"
},
description="人物实体"
),
Entity(
name="Organization",
labels=["Organization", "Entity"],
properties={
"entity_id": "string",
"name": "string",
"founded_date": "date",
"dissolved_date": "date",
"headquarters": "string",
"industry": "string",
"type": "string", # company, university, government
"description": "string",
"aliases": "list",
"confidence": "float"
},
description="组织机构实体"
),
Entity(
name="Location",
labels=["Location", "Entity"],
properties={
"entity_id": "string",
"name": "string",
"type": "string", # country, city, landmark
"coordinates": "point",
"population": "integer",
"area": "float",
"timezone": "string",
"description": "string",
"aliases": "list",
"confidence": "float"
},
description="地理位置实体"
),
Entity(
name="Event",
labels=["Event", "Entity"],
properties={
"entity_id": "string",
"name": "string",
"start_date": "datetime",
"end_date": "datetime",
"type": "string", # historical, cultural, sports
"description": "string",
"significance": "string",
"aliases": "list",
"confidence": "float"
},
description="事件实体"
),
Entity(
name="Concept",
labels=["Concept", "Entity"],
properties={
"entity_id": "string",
"name": "string",
"definition": "string",
"domain": "string", # science, technology, philosophy
"type": "string",
"description": "string",
"aliases": "list",
"confidence": "float"
},
description="概念实体"
)
]
for entity in entities:
self.modeler.add_entity(entity)
# 定义关系
relationships = [
# 人物关系
Relationship(
name="BORN_IN",
from_entity="Person",
to_entity="Location",
properties={
"date": "date",
"confidence": "float",
"source": "string"
},
cardinality="N:1",
description="出生地关系"
),
Relationship(
name="WORKS_FOR",
from_entity="Person",
to_entity="Organization",
properties={
"start_date": "date",
"end_date": "date",
"position": "string",
"confidence": "float",
"source": "string"
},
cardinality="N:M",
description="工作关系"
),
Relationship(
name="MARRIED_TO",
from_entity="Person",
to_entity="Person",
properties={
"marriage_date": "date",
"divorce_date": "date",
"confidence": "float",
"source": "string"
},
cardinality="N:M",
description="婚姻关系"
),
Relationship(
name="PARENT_OF",
from_entity="Person",
to_entity="Person",
properties={
"confidence": "float",
"source": "string"
},
cardinality="N:M",
description="父母子女关系"
),
# 组织关系
Relationship(
name="LOCATED_IN",
from_entity="Organization",
to_entity="Location",
properties={
"start_date": "date",
"end_date": "date",
"confidence": "float",
"source": "string"
},
cardinality="N:M",
description="组织位置关系"
),
Relationship(
name="SUBSIDIARY_OF",
from_entity="Organization",
to_entity="Organization",
properties={
"start_date": "date",
"end_date": "date",
"ownership_percentage": "float",
"confidence": "float",
"source": "string"
},
cardinality="N:1",
description="子公司关系"
),
# 事件关系
Relationship(
name="PARTICIPATED_IN",
from_entity="Person",
to_entity="Event",
properties={
"role": "string",
"confidence": "float",
"source": "string"
},
cardinality="N:M",
description="参与事件"
),
Relationship(
name="OCCURRED_IN",
from_entity="Event",
to_entity="Location",
properties={
"confidence": "float",
"source": "string"
},
cardinality="N:M",
description="事件发生地"
),
# 概念关系
Relationship(
name="IS_A",
from_entity="Concept",
to_entity="Concept",
properties={
"confidence": "float",
"source": "string"
},
cardinality="N:M",
description="概念层次关系"
),
Relationship(
name="RELATED_TO",
from_entity="Concept",
to_entity="Concept",
properties={
"relation_type": "string",
"strength": "float",
"confidence": "float",
"source": "string"
},
cardinality="N:M",
description="概念关联关系"
)
]
for rel in relationships:
self.modeler.add_relationship(rel)
def generate_knowledge_queries(self) -> Dict[str, str]:
"""生成知识图谱查询"""
return {
"entity_search": """
// 实体搜索
MATCH (e:Entity)
WHERE e.name CONTAINS $search_term OR any(alias IN e.aliases WHERE alias CONTAINS $search_term)
RETURN e, labels(e) as entity_types
ORDER BY e.confidence DESC
LIMIT 20
""",
"relationship_path": """
// 实体关系路径
MATCH path = shortestPath((start:Entity {name: $start_entity})-[*1..4]-(end:Entity {name: $end_entity}))
RETURN path, length(path) as path_length
ORDER BY path_length
LIMIT 5
""",
"person_profile": """
// 人物档案
MATCH (person:Person {name: $person_name})
OPTIONAL MATCH (person)-[:BORN_IN]->(birthplace:Location)
OPTIONAL MATCH (person)-[:WORKS_FOR]->(org:Organization)
OPTIONAL MATCH (person)-[:MARRIED_TO]->(spouse:Person)
OPTIONAL MATCH (person)-[:PARENT_OF]->(child:Person)
OPTIONAL MATCH (person)-[:PARTICIPATED_IN]->(event:Event)
RETURN person,
birthplace,
collect(DISTINCT org) as organizations,
collect(DISTINCT spouse) as spouses,
collect(DISTINCT child) as children,
collect(DISTINCT event) as events
""",
"organization_network": """
// 组织网络分析
MATCH (org:Organization {name: $org_name})
OPTIONAL MATCH (org)-[:SUBSIDIARY_OF*1..3]->(parent:Organization)
OPTIONAL MATCH (org)<-[:SUBSIDIARY_OF*1..3]-(subsidiary:Organization)
OPTIONAL MATCH (org)<-[:WORKS_FOR]-(employee:Person)
OPTIONAL MATCH (org)-[:LOCATED_IN]->(location:Location)
RETURN org,
collect(DISTINCT parent) as parent_companies,
collect(DISTINCT subsidiary) as subsidiaries,
count(DISTINCT employee) as employee_count,
collect(DISTINCT location) as locations
""",
"concept_hierarchy": """
// 概念层次结构
MATCH (concept:Concept {name: $concept_name})
OPTIONAL MATCH (concept)-[:IS_A*1..3]->(parent:Concept)
OPTIONAL MATCH (concept)<-[:IS_A*1..3]-(child:Concept)
OPTIONAL MATCH (concept)-[:RELATED_TO]-(related:Concept)
RETURN concept,
collect(DISTINCT parent) as parent_concepts,
collect(DISTINCT child) as child_concepts,
collect(DISTINCT related) as related_concepts
""",
"temporal_analysis": """
// 时间线分析
MATCH (entity:Entity)-[r]-(related:Entity)
WHERE r.start_date IS NOT NULL OR r.date IS NOT NULL
WITH entity, related, r,
CASE
WHEN r.start_date IS NOT NULL THEN r.start_date
WHEN r.date IS NOT NULL THEN r.date
ELSE null
END as event_date
WHERE event_date >= date($start_date) AND event_date <= date($end_date)
RETURN entity, related, type(r) as relationship_type, event_date
ORDER BY event_date
"""
}
# 使用示例
knowledge_modeler = KnowledgeGraphModeler()
print("知识图谱模型验证:", knowledge_modeler.modeler.validate_model())
print("\n知识图谱查询:")
for name, query in knowledge_modeler.generate_knowledge_queries().items():
print(f"\n{name}:")
print(query.strip())
4.3 模型优化策略
4.3.1 性能优化
class ModelOptimizer:
"""模型优化器"""
def __init__(self, executor):
self.executor = executor
def analyze_query_performance(self, queries: List[str]) -> Dict[str, Dict]:
"""分析查询性能"""
results = {}
for i, query in enumerate(queries):
try:
# 使用PROFILE分析查询
profile_query = f"PROFILE {query}"
profile_result = self.executor.execute_query(profile_query)
results[f"query_{i+1}"] = {
"query": query,
"profile": profile_result,
"recommendations": self._generate_optimization_recommendations(query)
}
except Exception as e:
results[f"query_{i+1}"] = {
"query": query,
"error": str(e)
}
return results
def _generate_optimization_recommendations(self, query: str) -> List[str]:
"""生成优化建议"""
recommendations = []
# 检查是否使用了索引
if "WHERE" in query.upper() and "=" in query:
recommendations.append("考虑为WHERE子句中的属性创建索引")
# 检查是否有不必要的全图扫描
if query.upper().startswith("MATCH ()") or query.upper().startswith("MATCH (N)"):
recommendations.append("避免全图扫描,使用标签或属性过滤")
# 检查是否使用了LIMIT
if "RETURN" in query.upper() and "LIMIT" not in query.upper():
recommendations.append("考虑添加LIMIT子句限制结果数量")
# 检查复杂路径查询
if "*" in query and ".." in query:
recommendations.append("复杂路径查询可能影响性能,考虑限制路径长度")
return recommendations
def suggest_indexes(self, model: GraphDataModeler) -> List[str]:
"""建议创建的索引"""
index_suggestions = []
for entity_name, entity in model.entities.items():
for label in entity.labels:
# 为每个实体的主要属性创建唯一约束
primary_props = [prop for prop in entity.properties.keys()
if 'id' in prop.lower() or prop.lower() in ['name', 'email']]
for prop in primary_props:
index_suggestions.append(
f"CREATE CONSTRAINT {entity_name.lower()}_{prop}_unique "
f"FOR (n:{label}) REQUIRE n.{prop} IS UNIQUE"
)
# 为其他常用属性创建索引
other_props = [prop for prop in entity.properties.keys()
if prop not in primary_props and
prop.lower() in ['created_at', 'updated_at', 'status', 'type']]
for prop in other_props:
index_suggestions.append(
f"CREATE INDEX {entity_name.lower()}_{prop}_index "
f"FOR (n:{label}) ON (n.{prop})"
)
return index_suggestions
def optimize_model_structure(self, model: GraphDataModeler) -> Dict[str, List[str]]:
"""优化模型结构建议"""
suggestions = {
"denormalization": [],
"relationship_optimization": [],
"property_optimization": []
}
# 检查是否需要反规范化
for rel_name, rel in model.relationships.items():
if rel.cardinality == "N:1" and len(rel.properties) > 3:
suggestions["denormalization"].append(
f"考虑将关系 {rel_name} 的属性移到目标节点 {rel.to_entity} 中"
)
# 检查关系优化
entity_relationships = {}
for rel in model.relationships.values():
if rel.from_entity not in entity_relationships:
entity_relationships[rel.from_entity] = []
entity_relationships[rel.from_entity].append(rel)
for entity, rels in entity_relationships.items():
if len(rels) > 5:
suggestions["relationship_optimization"].append(
f"实体 {entity} 有 {len(rels)} 个关系,考虑重新设计模型结构"
)
# 检查属性优化
for entity_name, entity in model.entities.items():
if len(entity.properties) > 15:
suggestions["property_optimization"].append(
f"实体 {entity_name} 有 {len(entity.properties)} 个属性,考虑拆分为多个实体"
)
return suggestions
# 使用示例
optimizer = ModelOptimizer(executor)
# 分析社交网络模型
social_model = SocialNetworkModeler().modeler
index_suggestions = optimizer.suggest_indexes(social_model)
print("索引建议:")
for suggestion in index_suggestions[:5]: # 显示前5个建议
print(f" {suggestion}")
structure_suggestions = optimizer.optimize_model_structure(social_model)
print("\n结构优化建议:")
for category, suggestions in structure_suggestions.items():
if suggestions:
print(f" {category}:")
for suggestion in suggestions:
print(f" - {suggestion}")
4.3.2 数据质量管理
class DataQualityManager:
"""数据质量管理器"""
def __init__(self, executor):
self.executor = executor
def validate_data_integrity(self) -> Dict[str, List[str]]:
"""验证数据完整性"""
issues = {
"missing_properties": [],
"orphaned_nodes": [],
"invalid_relationships": [],
"duplicate_entities": []
}
# 检查缺失属性
missing_props_query = """
MATCH (n)
WHERE n.id IS NULL OR n.name IS NULL
RETURN labels(n) as node_labels, count(*) as count
"""
result = self.executor.execute_query(missing_props_query)
for record in result:
issues["missing_properties"].append(
f"{record['node_labels']} 节点缺失必要属性: {record['count']} 个"
)
# 检查孤立节点
orphaned_query = """
MATCH (n)
WHERE NOT (n)--() AND NOT n:Singleton
RETURN labels(n) as node_labels, count(*) as count
"""
result = self.executor.execute_query(orphaned_query)
for record in result:
issues["orphaned_nodes"].append(
f"{record['node_labels']} 孤立节点: {record['count']} 个"
)
# 检查重复实体
duplicate_query = """
MATCH (n)
WITH n.name as name, labels(n) as labels, collect(n) as nodes
WHERE size(nodes) > 1 AND name IS NOT NULL
RETURN name, labels, size(nodes) as count
"""
result = self.executor.execute_query(duplicate_query)
for record in result:
issues["duplicate_entities"].append(
f"重复实体 '{record['name']}' ({record['labels']}): {record['count']} 个"
)
return issues
def clean_duplicate_entities(self, entity_label: str, merge_property: str) -> int:
"""清理重复实体"""
merge_query = f"""
MATCH (n:{entity_label})
WITH n.{merge_property} as prop_value, collect(n) as nodes
WHERE size(nodes) > 1 AND prop_value IS NOT NULL
WITH nodes[0] as keep, nodes[1..] as duplicates
UNWIND duplicates as duplicate
MATCH (duplicate)-[r]-(other)
CREATE (keep)-[new_r:{{type(r)}}]->(other)
SET new_r = properties(r)
DELETE r, duplicate
RETURN count(*) as merged_count
"""
result = self.executor.execute_write(merge_query)
return result.get('nodes_deleted', 0)
def add_data_validation_constraints(self) -> List[str]:
"""添加数据验证约束"""
constraints = [
# 用户约束
"CREATE CONSTRAINT user_email_unique FOR (u:User) REQUIRE u.email IS UNIQUE",
"CREATE CONSTRAINT user_id_exists FOR (u:User) REQUIRE u.user_id IS NOT NULL",
# 产品约束
"CREATE CONSTRAINT product_id_unique FOR (p:Product) REQUIRE p.product_id IS UNIQUE",
"CREATE CONSTRAINT product_price_positive FOR (p:Product) REQUIRE p.price >= 0",
# 订单约束
"CREATE CONSTRAINT order_id_unique FOR (o:Order) REQUIRE o.order_id IS UNIQUE",
"CREATE CONSTRAINT order_total_positive FOR (o:Order) REQUIRE o.total_amount >= 0"
]
created_constraints = []
for constraint in constraints:
try:
self.executor.execute_write(constraint)
created_constraints.append(constraint)
except Exception as e:
print(f"约束创建失败: {constraint}, 错误: {e}")
return created_constraints
def generate_data_quality_report(self) -> Dict[str, any]:
"""生成数据质量报告"""
report = {
"timestamp": datetime.now().isoformat(),
"node_statistics": {},
"relationship_statistics": {},
"integrity_issues": {},
"recommendations": []
}
# 节点统计
node_stats_query = """
MATCH (n)
RETURN labels(n) as node_labels, count(*) as count
ORDER BY count DESC
"""
result = self.executor.execute_query(node_stats_query)
report["node_statistics"] = {str(record['node_labels']): record['count'] for record in result}
# 关系统计
rel_stats_query = """
MATCH ()-[r]-()
RETURN type(r) as relationship_type, count(*) as count
ORDER BY count DESC
"""
result = self.executor.execute_query(rel_stats_query)
report["relationship_statistics"] = {record['relationship_type']: record['count'] for record in result}
# 完整性问题
report["integrity_issues"] = self.validate_data_integrity()
# 生成建议
total_issues = sum(len(issues) for issues in report["integrity_issues"].values())
if total_issues > 0:
report["recommendations"].append(f"发现 {total_issues} 个数据质量问题,建议进行数据清理")
if report["node_statistics"]:
max_nodes = max(report["node_statistics"].values())
if max_nodes > 100000:
report["recommendations"].append("大量节点可能影响性能,考虑数据分区或归档")
return report
# 使用示例
quality_manager = DataQualityManager(executor)
# 生成数据质量报告
quality_report = quality_manager.generate_data_quality_report()
print("数据质量报告:")
print(f"节点统计: {quality_report['node_statistics']}")
print(f"关系统计: {quality_report['relationship_statistics']}")
print(f"完整性问题: {quality_report['integrity_issues']}")
print(f"建议: {quality_report['recommendations']}")
4.4 章节总结
核心知识点
- 建模原则:以关系为中心、查询驱动、演化友好
- 建模步骤:识别实体、定义关系、添加属性、优化查询
- 常见模式:社交网络、电商推荐、知识图谱
- 性能优化:索引策略、查询优化、结构调整
- 数据质量:完整性验证、重复清理、约束管理
最佳实践
- 模型设计:从查询需求出发设计模型
- 性能考虑:合理使用索引和约束
- 数据质量:建立数据验证和清理机制
- 文档化:详细记录模型设计决策
- 迭代优化:根据使用情况持续优化模型
练习题
- 设计一个在线教育平台的图数据模型
- 为金融风控系统建立知识图谱模型
- 优化现有电商模型的查询性能
- 实现一个通用的数据质量检查工具
- 设计一个支持多租户的SaaS应用图模型
下一章预告:在下一章中,我们将学习Neo4j的高级查询技术和算法应用。