5.1 高级Cypher查询技术
5.1.1 条件表达式和CASE语句
-- CASE表达式
MATCH (p:Person)
RETURN p.name,
CASE
WHEN p.age < 18 THEN '未成年'
WHEN p.age < 65 THEN '成年人'
ELSE '老年人'
END AS age_group
-- 简单CASE表达式
MATCH (p:Person)
RETURN p.name,
CASE p.status
WHEN 'active' THEN '活跃'
WHEN 'inactive' THEN '非活跃'
ELSE '未知'
END AS status_cn
-- 条件聚合
MATCH (p:Person)-[:WORKS_FOR]->(c:Company)
RETURN c.name,
count(p) AS total_employees,
count(CASE WHEN p.age < 30 THEN 1 END) AS young_employees,
count(CASE WHEN p.salary > 100000 THEN 1 END) AS high_earners
5.1.2 列表操作和UNWIND
-- 创建和操作列表
MATCH (p:Person)
WITH collect(p.name) AS names
RETURN names,
size(names) AS count,
head(names) AS first_name,
last(names) AS last_name,
names[0..3] AS first_three
-- UNWIND展开列表
WITH ['Alice', 'Bob', 'Charlie'] AS names
UNWIND names AS name
MATCH (p:Person {name: name})
RETURN p
-- 列表推导
MATCH (p:Person)
RETURN [x IN p.skills WHERE x CONTAINS 'Python'] AS python_skills
-- 列表过滤和转换
MATCH (p:Person)
RETURN p.name,
[skill IN p.skills WHERE skill STARTS WITH 'Java'] AS java_skills,
[age IN [p.age] WHERE age > 25 | age * 12] AS months_if_over_25
5.1.3 子查询和EXISTS
-- EXISTS子查询
MATCH (p:Person)
WHERE EXISTS {
MATCH (p)-[:KNOWS]->(friend:Person)
WHERE friend.age > p.age
}
RETURN p.name, p.age
-- COUNT子查询
MATCH (p:Person)
WHERE COUNT {
(p)-[:KNOWS]->(friend:Person)
} > 5
RETURN p.name, p.age
-- COLLECT子查询
MATCH (company:Company)
RETURN company.name,
COLLECT {
MATCH (company)<-[:WORKS_FOR]-(employee:Person)
WHERE employee.salary > 80000
RETURN employee.name
} AS high_earners
5.1.4 路径查询和变长关系
-- 最短路径
MATCH path = shortestPath((start:Person {name: 'Alice'})-[*]-(end:Person {name: 'Bob'}))
RETURN path, length(path)
-- 所有最短路径
MATCH path = allShortestPaths((start:Person {name: 'Alice'})-[*]-(end:Person {name: 'Bob'}))
RETURN path, length(path)
-- 变长路径查询
MATCH (start:Person {name: 'Alice'})-[:KNOWS*1..3]-(connected:Person)
RETURN DISTINCT connected.name,
shortestPath((start)-[:KNOWS*]-(connected)) AS path
-- 路径过滤
MATCH path = (start:Person)-[:KNOWS*2..4]-(end:Person)
WHERE start.name = 'Alice'
AND end.name <> 'Alice'
AND ALL(person IN nodes(path) WHERE person.age > 18)
RETURN path, length(path)
-- 路径中的节点和关系
MATCH path = (start:Person {name: 'Alice'})-[:KNOWS*1..3]-(end:Person)
RETURN nodes(path) AS path_nodes,
relationships(path) AS path_relationships,
[node IN nodes(path) | node.name] AS names_in_path
5.2 Python中的高级查询实现
5.2.1 高级查询类
from neo4j import GraphDatabase
from typing import List, Dict, Any, Optional, Tuple
from dataclasses import dataclass
from datetime import datetime, timedelta
import json
@dataclass
class PathResult:
"""路径查询结果"""
nodes: List[Dict[str, Any]]
relationships: List[Dict[str, Any]]
length: int
total_weight: float = 0.0
@dataclass
class QueryStats:
"""查询统计信息"""
execution_time: float
nodes_scanned: int
relationships_scanned: int
result_count: int
class AdvancedQueryExecutor:
"""高级查询执行器"""
def __init__(self, uri: str, username: str, password: str):
self.driver = GraphDatabase.driver(uri, auth=(username, password))
def close(self):
if self.driver:
self.driver.close()
def find_shortest_paths(self, start_node: Dict[str, Any], end_node: Dict[str, Any],
relationship_types: List[str] = None, max_length: int = 10) -> List[PathResult]:
"""查找最短路径"""
# 构建关系类型过滤
rel_filter = ""
if relationship_types:
rel_filter = ":".join(relationship_types)
rel_filter = f":{rel_filter}"
# 构建节点匹配条件
start_conditions = " AND ".join([f"start.{k} = ${k}_start" for k in start_node.keys()])
end_conditions = " AND ".join([f"end.{k} = ${k}_end" for k in end_node.keys()])
query = f"""
MATCH (start), (end)
WHERE {start_conditions} AND {end_conditions}
MATCH path = allShortestPaths((start)-[{rel_filter}*1..{max_length}]-(end))
RETURN path,
nodes(path) as path_nodes,
relationships(path) as path_relationships,
length(path) as path_length
ORDER BY path_length
"""
# 准备参数
parameters = {}
for k, v in start_node.items():
parameters[f"{k}_start"] = v
for k, v in end_node.items():
parameters[f"{k}_end"] = v
with self.driver.session() as session:
result = session.run(query, parameters)
paths = []
for record in result:
path_result = PathResult(
nodes=[dict(node) for node in record['path_nodes']],
relationships=[dict(rel) for rel in record['path_relationships']],
length=record['path_length']
)
paths.append(path_result)
return paths
def find_paths_with_conditions(self, start_node: Dict[str, Any],
path_conditions: List[str],
max_length: int = 5) -> List[PathResult]:
"""查找满足条件的路径"""
start_conditions = " AND ".join([f"start.{k} = ${k}" for k in start_node.keys()])
path_condition_str = " AND ".join(path_conditions)
query = f"""
MATCH (start)
WHERE {start_conditions}
MATCH path = (start)-[*1..{max_length}]-(end)
WHERE {path_condition_str}
RETURN path,
nodes(path) as path_nodes,
relationships(path) as path_relationships,
length(path) as path_length
ORDER BY path_length
LIMIT 100
"""
with self.driver.session() as session:
result = session.run(query, start_node)
paths = []
for record in result:
path_result = PathResult(
nodes=[dict(node) for node in record['path_nodes']],
relationships=[dict(rel) for rel in record['path_relationships']],
length=record['path_length']
)
paths.append(path_result)
return paths
def execute_conditional_aggregation(self, base_query: str,
conditions: Dict[str, str],
parameters: Dict[str, Any] = None) -> List[Dict[str, Any]]:
"""执行条件聚合查询"""
# 构建条件聚合表达式
agg_expressions = []
for condition_name, condition_expr in conditions.items():
agg_expressions.append(f"count(CASE WHEN {condition_expr} THEN 1 END) AS {condition_name}")
agg_clause = ", ".join(agg_expressions)
# 在基础查询后添加聚合
if "RETURN" in base_query.upper():
# 替换RETURN子句
parts = base_query.upper().split("RETURN")
base_part = parts[0]
query = f"{base_part} RETURN {agg_clause}"
else:
query = f"{base_query} RETURN {agg_clause}"
with self.driver.session() as session:
result = session.run(query, parameters or {})
return [record.data() for record in result]
def execute_list_operations(self, collection_query: str,
operations: List[str],
parameters: Dict[str, Any] = None) -> List[Dict[str, Any]]:
"""执行列表操作"""
# 构建列表操作表达式
operation_expressions = []
for i, operation in enumerate(operations):
operation_expressions.append(f"({operation}) AS operation_{i+1}")
operations_clause = ", ".join(operation_expressions)
query = f"""
{collection_query}
RETURN {operations_clause}
"""
with self.driver.session() as session:
result = session.run(query, parameters or {})
return [record.data() for record in result]
def profile_query(self, query: str, parameters: Dict[str, Any] = None) -> Tuple[List[Dict[str, Any]], QueryStats]:
"""分析查询性能"""
profile_query = f"PROFILE {query}"
with self.driver.session() as session:
start_time = datetime.now()
result = session.run(profile_query, parameters or {})
records = [record.data() for record in result]
end_time = datetime.now()
summary = result.consume()
stats = QueryStats(
execution_time=(end_time - start_time).total_seconds(),
nodes_scanned=getattr(summary.profile, 'db_hits', 0),
relationships_scanned=0, # Neo4j不直接提供这个信息
result_count=len(records)
)
return records, stats
# 使用示例
executor = AdvancedQueryExecutor("bolt://localhost:7687", "neo4j", "password")
# 查找最短路径
start = {"name": "Alice"}
end = {"name": "Bob"}
paths = executor.find_shortest_paths(start, end, ["KNOWS", "FRIENDS"])
print(f"找到 {len(paths)} 条路径")
for i, path in enumerate(paths[:3]):
print(f"路径 {i+1}: 长度 {path.length}, 节点数 {len(path.nodes)}")
# 条件聚合查询
base_query = "MATCH (p:Person)-[:WORKS_FOR]->(c:Company)"
conditions = {
"young_employees": "p.age < 30",
"senior_employees": "p.age >= 50",
"high_earners": "p.salary > 100000"
}
result = executor.execute_conditional_aggregation(base_query, conditions)
print("聚合结果:", result)
5.2.2 图算法实现
from collections import defaultdict, deque
import heapq
from typing import Set, Tuple
class GraphAlgorithms:
"""图算法实现"""
def __init__(self, executor: AdvancedQueryExecutor):
self.executor = executor
def pagerank(self, node_label: str = None, relationship_type: str = None,
iterations: int = 20, damping_factor: float = 0.85) -> Dict[str, float]:
"""PageRank算法实现"""
# 构建查询条件
node_filter = f":{node_label}" if node_label else ""
rel_filter = f":{relationship_type}" if relationship_type else ""
# 获取所有节点和关系
query = f"""
MATCH (n{node_filter})-[r{rel_filter}]->(m{node_filter})
RETURN id(n) as source_id, n.name as source_name,
id(m) as target_id, m.name as target_name
"""
with self.executor.driver.session() as session:
result = session.run(query)
edges = [(record['source_id'], record['target_id']) for record in result]
# 获取所有节点
nodes_query = f"MATCH (n{node_filter}) RETURN id(n) as node_id, n.name as name"
result = session.run(nodes_query)
nodes = {record['node_id']: record['name'] for record in result}
if not nodes:
return {}
# 构建邻接表
graph = defaultdict(list)
out_degree = defaultdict(int)
for source, target in edges:
graph[source].append(target)
out_degree[source] += 1
# 初始化PageRank值
pagerank_scores = {node_id: 1.0 / len(nodes) for node_id in nodes}
# 迭代计算
for _ in range(iterations):
new_scores = {}
for node_id in nodes:
score = (1 - damping_factor) / len(nodes)
# 计算来自其他节点的贡献
for source_id in nodes:
if node_id in graph[source_id]:
if out_degree[source_id] > 0:
score += damping_factor * pagerank_scores[source_id] / out_degree[source_id]
new_scores[node_id] = score
pagerank_scores = new_scores
# 转换为节点名称
return {nodes[node_id]: score for node_id, score in pagerank_scores.items()}
def detect_communities_louvain(self, node_label: str = None,
relationship_type: str = None) -> Dict[str, int]:
"""Louvain社区检测算法(简化版)"""
node_filter = f":{node_label}" if node_label else ""
rel_filter = f":{relationship_type}" if relationship_type else ""
# 获取图数据
query = f"""
MATCH (n{node_filter})-[r{rel_filter}]-(m{node_filter})
RETURN id(n) as node1, n.name as name1,
id(m) as node2, m.name as name2,
CASE WHEN r.weight IS NOT NULL THEN r.weight ELSE 1.0 END as weight
"""
with self.executor.driver.session() as session:
result = session.run(query)
edges = []
nodes = set()
for record in result:
node1, node2 = record['node1'], record['node2']
weight = record['weight']
edges.append((node1, node2, weight))
nodes.add(node1)
nodes.add(node2)
# 获取节点名称映射
nodes_query = f"MATCH (n{node_filter}) RETURN id(n) as node_id, n.name as name"
result = session.run(nodes_query)
node_names = {record['node_id']: record['name'] for record in result}
if not nodes:
return {}
# 初始化每个节点为自己的社区
communities = {node: i for i, node in enumerate(nodes)}
# 构建邻接表
graph = defaultdict(list)
for node1, node2, weight in edges:
graph[node1].append((node2, weight))
graph[node2].append((node1, weight))
# 简化的Louvain算法(单次迭代)
improved = True
while improved:
improved = False
for node in nodes:
current_community = communities[node]
best_community = current_community
best_gain = 0
# 检查邻居社区
neighbor_communities = set()
for neighbor, weight in graph[node]:
neighbor_communities.add(communities[neighbor])
for community in neighbor_communities:
if community != current_community:
# 计算模块度增益(简化版)
gain = self._calculate_modularity_gain(node, community, communities, graph)
if gain > best_gain:
best_gain = gain
best_community = community
if best_community != current_community:
communities[node] = best_community
improved = True
# 转换为节点名称
return {node_names[node_id]: community for node_id, community in communities.items()}
def _calculate_modularity_gain(self, node: int, target_community: int,
communities: Dict[int, int],
graph: Dict[int, List[Tuple[int, float]]]) -> float:
"""计算模块度增益(简化版)"""
# 这是一个简化的模块度计算,实际的Louvain算法更复杂
internal_edges = 0
total_edges = 0
for neighbor, weight in graph[node]:
total_edges += weight
if communities[neighbor] == target_community:
internal_edges += weight
return internal_edges / total_edges if total_edges > 0 else 0
def find_strongly_connected_components(self, node_label: str = None,
relationship_type: str = None) -> List[List[str]]:
"""查找强连通分量"""
node_filter = f":{node_label}" if node_label else ""
rel_filter = f":{relationship_type}" if relationship_type else ""
# 获取有向图数据
query = f"""
MATCH (n{node_filter})-[r{rel_filter}]->(m{node_filter})
RETURN id(n) as source, n.name as source_name,
id(m) as target, m.name as target_name
"""
with self.executor.driver.session() as session:
result = session.run(query)
edges = []
nodes = set()
node_names = {}
for record in result:
source, target = record['source'], record['target']
edges.append((source, target))
nodes.add(source)
nodes.add(target)
node_names[source] = record['source_name']
node_names[target] = record['target_name']
if not nodes:
return []
# 构建邻接表
graph = defaultdict(list)
reverse_graph = defaultdict(list)
for source, target in edges:
graph[source].append(target)
reverse_graph[target].append(source)
# Kosaraju算法
visited = set()
finish_order = []
def dfs1(node):
visited.add(node)
for neighbor in graph[node]:
if neighbor not in visited:
dfs1(neighbor)
finish_order.append(node)
# 第一次DFS
for node in nodes:
if node not in visited:
dfs1(node)
# 第二次DFS(在反向图上)
visited = set()
components = []
def dfs2(node, component):
visited.add(node)
component.append(node_names[node])
for neighbor in reverse_graph[node]:
if neighbor not in visited:
dfs2(neighbor, component)
for node in reversed(finish_order):
if node not in visited:
component = []
dfs2(node, component)
components.append(component)
return components
def calculate_centrality_measures(self, node_label: str = None) -> Dict[str, Dict[str, float]]:
"""计算中心性指标"""
node_filter = f":{node_label}" if node_label else ""
# 度中心性
degree_query = f"""
MATCH (n{node_filter})
OPTIONAL MATCH (n)-[r]-()
WITH n, count(r) as degree
RETURN n.name as name, degree
"""
# 接近中心性(使用最短路径)
closeness_query = f"""
MATCH (n{node_filter})
OPTIONAL MATCH path = shortestPath((n)-[*]-(m{node_filter}))
WHERE n <> m
WITH n, collect(length(path)) as distances
WITH n,
CASE WHEN size(distances) > 0
THEN size(distances) / toFloat(reduce(sum = 0, d IN distances | sum + d))
ELSE 0.0
END as closeness
RETURN n.name as name, closeness
"""
# 中介中心性(简化版)
betweenness_query = f"""
MATCH (n{node_filter})
OPTIONAL MATCH path = allShortestPaths((start{node_filter})-[*]-(end{node_filter}))
WHERE start <> end AND n IN nodes(path) AND n <> start AND n <> end
WITH n, count(path) as paths_through_node
RETURN n.name as name, paths_through_node as betweenness
"""
centrality_measures = {}
with self.executor.driver.session() as session:
# 度中心性
result = session.run(degree_query)
for record in result:
name = record['name']
if name not in centrality_measures:
centrality_measures[name] = {}
centrality_measures[name]['degree'] = record['degree']
# 接近中心性
result = session.run(closeness_query)
for record in result:
name = record['name']
if name not in centrality_measures:
centrality_measures[name] = {}
centrality_measures[name]['closeness'] = record['closeness']
# 中介中心性
result = session.run(betweenness_query)
for record in result:
name = record['name']
if name not in centrality_measures:
centrality_measures[name] = {}
centrality_measures[name]['betweenness'] = record['betweenness']
return centrality_measures
# 使用示例
algorithms = GraphAlgorithms(executor)
# PageRank分析
print("PageRank分析:")
pagerank_scores = algorithms.pagerank("Person", "KNOWS")
for name, score in sorted(pagerank_scores.items(), key=lambda x: x[1], reverse=True)[:5]:
print(f" {name}: {score:.4f}")
# 社区检测
print("\n社区检测:")
communities = algorithms.detect_communities_louvain("Person", "KNOWS")
community_groups = defaultdict(list)
for name, community in communities.items():
community_groups[community].append(name)
for community_id, members in community_groups.items():
print(f" 社区 {community_id}: {', '.join(members)}")
# 中心性分析
print("\n中心性分析:")
centrality = algorithms.calculate_centrality_measures("Person")
for name, measures in list(centrality.items())[:5]:
print(f" {name}: 度={measures.get('degree', 0)}, 接近性={measures.get('closeness', 0):.4f}, 中介性={measures.get('betweenness', 0)}")
5.3 图数据库算法库
5.3.1 Neo4j图数据科学库(GDS)
class Neo4jGDSWrapper:
"""Neo4j图数据科学库包装器"""
def __init__(self, executor: AdvancedQueryExecutor):
self.executor = executor
def create_graph_projection(self, graph_name: str, node_labels: List[str],
relationship_types: List[str]) -> bool:
"""创建图投影"""
node_projection = ", ".join([f"'{label}': {{}}" for label in node_labels])
rel_projection = ", ".join([f"'{rel_type}': {{}}" for rel_type in relationship_types])
query = f"""
CALL gds.graph.project(
'{graph_name}',
{{{node_projection}}},
{{{rel_projection}}}
)
YIELD graphName, nodeCount, relationshipCount
RETURN graphName, nodeCount, relationshipCount
"""
try:
with self.executor.driver.session() as session:
result = session.run(query)
record = result.single()
print(f"图投影创建成功: {record['graphName']}, 节点数: {record['nodeCount']}, 关系数: {record['relationshipCount']}")
return True
except Exception as e:
print(f"图投影创建失败: {e}")
return False
def run_pagerank(self, graph_name: str, max_iterations: int = 20,
damping_factor: float = 0.85) -> List[Dict[str, Any]]:
"""运行PageRank算法"""
query = f"""
CALL gds.pageRank.stream('{graph_name}', {{
maxIterations: {max_iterations},
dampingFactor: {damping_factor}
}})
YIELD nodeId, score
RETURN gds.util.asNode(nodeId).name AS name, score
ORDER BY score DESC
"""
with self.executor.driver.session() as session:
result = session.run(query)
return [record.data() for record in result]
def run_louvain(self, graph_name: str) -> List[Dict[str, Any]]:
"""运行Louvain社区检测"""
query = f"""
CALL gds.louvain.stream('{graph_name}')
YIELD nodeId, communityId
RETURN gds.util.asNode(nodeId).name AS name, communityId
ORDER BY communityId, name
"""
with self.executor.driver.session() as session:
result = session.run(query)
return [record.data() for record in result]
def run_betweenness_centrality(self, graph_name: str) -> List[Dict[str, Any]]:
"""运行中介中心性算法"""
query = f"""
CALL gds.betweenness.stream('{graph_name}')
YIELD nodeId, score
RETURN gds.util.asNode(nodeId).name AS name, score
ORDER BY score DESC
"""
with self.executor.driver.session() as session:
result = session.run(query)
return [record.data() for record in result]
def run_shortest_path(self, graph_name: str, source_node: str, target_node: str) -> Dict[str, Any]:
"""运行最短路径算法"""
query = f"""
MATCH (source {{name: $source_name}}), (target {{name: $target_name}})
CALL gds.shortestPath.dijkstra.stream('{graph_name}', {{
sourceNode: source,
targetNode: target
}})
YIELD index, sourceNode, targetNode, totalCost, nodeIds, costs, path
RETURN
gds.util.asNode(sourceNode).name AS source,
gds.util.asNode(targetNode).name AS target,
totalCost,
[nodeId IN nodeIds | gds.util.asNode(nodeId).name] AS pathNodes,
costs
"""
with self.executor.driver.session() as session:
result = session.run(query, {'source_name': source_node, 'target_name': target_node})
record = result.single()
return record.data() if record else {}
def run_node_similarity(self, graph_name: str, similarity_cutoff: float = 0.1) -> List[Dict[str, Any]]:
"""运行节点相似性算法"""
query = f"""
CALL gds.nodeSimilarity.stream('{graph_name}', {{
similarityCutoff: {similarity_cutoff}
}})
YIELD node1, node2, similarity
RETURN
gds.util.asNode(node1).name AS node1_name,
gds.util.asNode(node2).name AS node2_name,
similarity
ORDER BY similarity DESC
"""
with self.executor.driver.session() as session:
result = session.run(query)
return [record.data() for record in result]
def drop_graph_projection(self, graph_name: str) -> bool:
"""删除图投影"""
query = f"CALL gds.graph.drop('{graph_name}')"
try:
with self.executor.driver.session() as session:
session.run(query)
print(f"图投影 {graph_name} 删除成功")
return True
except Exception as e:
print(f"图投影删除失败: {e}")
return False
# 使用示例
gds = Neo4jGDSWrapper(executor)
# 创建图投影
if gds.create_graph_projection("social_network", ["Person"], ["KNOWS", "FRIENDS"]):
# 运行PageRank
print("\nPageRank结果:")
pagerank_results = gds.run_pagerank("social_network")
for result in pagerank_results[:5]:
print(f" {result['name']}: {result['score']:.4f}")
# 运行社区检测
print("\nLouvain社区检测:")
community_results = gds.run_louvain("social_network")
communities = defaultdict(list)
for result in community_results:
communities[result['communityId']].append(result['name'])
for community_id, members in list(communities.items())[:3]:
print(f" 社区 {community_id}: {', '.join(members)}")
# 运行中介中心性
print("\n中介中心性:")
betweenness_results = gds.run_betweenness_centrality("social_network")
for result in betweenness_results[:5]:
print(f" {result['name']}: {result['score']:.4f}")
# 清理图投影
gds.drop_graph_projection("social_network")
5.3.2 自定义图算法
class CustomGraphAlgorithms:
"""自定义图算法实现"""
def __init__(self, executor: AdvancedQueryExecutor):
self.executor = executor
def influence_maximization(self, node_label: str, relationship_type: str,
k: int = 5, model: str = 'independent_cascade') -> List[str]:
"""影响力最大化算法"""
# 获取图结构
query = f"""
MATCH (n:{node_label})-[r:{relationship_type}]->(m:{node_label})
RETURN n.name as source, m.name as target,
CASE WHEN r.influence_prob IS NOT NULL THEN r.influence_prob ELSE 0.1 END as prob
"""
with self.executor.driver.session() as session:
result = session.run(query)
edges = [(record['source'], record['target'], record['prob']) for record in result]
# 获取所有节点
nodes_query = f"MATCH (n:{node_label}) RETURN n.name as name"
result = session.run(nodes_query)
nodes = [record['name'] for record in result]
if not nodes or not edges:
return []
# 构建影响图
influence_graph = defaultdict(list)
for source, target, prob in edges:
influence_graph[source].append((target, prob))
# 贪心算法选择种子节点
selected_seeds = []
for _ in range(min(k, len(nodes))):
best_node = None
best_influence = 0
for candidate in nodes:
if candidate not in selected_seeds:
# 计算边际影响力增益
current_seeds = selected_seeds + [candidate]
influence = self._simulate_influence_spread(current_seeds, influence_graph, model)
if influence > best_influence:
best_influence = influence
best_node = candidate
if best_node:
selected_seeds.append(best_node)
return selected_seeds
def _simulate_influence_spread(self, seeds: List[str], influence_graph: Dict[str, List[Tuple[str, float]]],
model: str, simulations: int = 100) -> float:
"""模拟影响力传播"""
total_influenced = 0
for _ in range(simulations):
influenced = set(seeds)
queue = deque(seeds)
while queue:
current = queue.popleft()
for neighbor, prob in influence_graph.get(current, []):
if neighbor not in influenced:
# 根据模型决定是否被影响
if model == 'independent_cascade':
if random.random() < prob:
influenced.add(neighbor)
queue.append(neighbor)
elif model == 'linear_threshold':
# 简化的线性阈值模型
influence_sum = sum(p for n, p in influence_graph.get(neighbor, []) if n in influenced)
if influence_sum >= 0.5: # 阈值设为0.5
influenced.add(neighbor)
queue.append(neighbor)
total_influenced += len(influenced)
return total_influenced / simulations
def link_prediction(self, node_label: str, relationship_type: str,
method: str = 'common_neighbors') -> List[Tuple[str, str, float]]:
"""链接预测算法"""
# 获取现有连接
existing_query = f"""
MATCH (n:{node_label})-[:{relationship_type}]-(m:{node_label})
RETURN n.name as node1, m.name as node2
"""
# 获取所有节点对(不存在连接的)
candidates_query = f"""
MATCH (n:{node_label}), (m:{node_label})
WHERE n.name < m.name AND NOT (n)-[:{relationship_type}]-(m)
RETURN n.name as node1, m.name as node2
"""
with self.executor.driver.session() as session:
# 获取现有连接
result = session.run(existing_query)
existing_edges = set()
for record in result:
node1, node2 = sorted([record['node1'], record['node2']])
existing_edges.add((node1, node2))
# 获取候选连接
result = session.run(candidates_query)
candidates = [(record['node1'], record['node2']) for record in result]
predictions = []
for node1, node2 in candidates:
if method == 'common_neighbors':
score = self._common_neighbors_score(node1, node2, node_label, relationship_type)
elif method == 'jaccard':
score = self._jaccard_score(node1, node2, node_label, relationship_type)
elif method == 'adamic_adar':
score = self._adamic_adar_score(node1, node2, node_label, relationship_type)
else:
score = 0.0
if score > 0:
predictions.append((node1, node2, score))
# 按分数排序
predictions.sort(key=lambda x: x[2], reverse=True)
return predictions
def _common_neighbors_score(self, node1: str, node2: str, node_label: str, relationship_type: str) -> float:
"""共同邻居分数"""
query = f"""
MATCH (n1:{node_label} {{name: $node1}})-[:{relationship_type}]-(common:{node_label})-[:{relationship_type}]-(n2:{node_label} {{name: $node2}})
RETURN count(DISTINCT common) as common_count
"""
with self.executor.driver.session() as session:
result = session.run(query, {'node1': node1, 'node2': node2})
record = result.single()
return float(record['common_count']) if record else 0.0
def _jaccard_score(self, node1: str, node2: str, node_label: str, relationship_type: str) -> float:
"""Jaccard相似度分数"""
query = f"""
MATCH (n1:{node_label} {{name: $node1}})-[:{relationship_type}]-(neighbor1:{node_label})
MATCH (n2:{node_label} {{name: $node2}})-[:{relationship_type}]-(neighbor2:{node_label})
WITH collect(DISTINCT neighbor1.name) as neighbors1, collect(DISTINCT neighbor2.name) as neighbors2
WITH neighbors1, neighbors2,
[n IN neighbors1 WHERE n IN neighbors2] as intersection,
neighbors1 + [n IN neighbors2 WHERE NOT n IN neighbors1] as union
RETURN
CASE WHEN size(union) > 0
THEN toFloat(size(intersection)) / size(union)
ELSE 0.0
END as jaccard_score
"""
with self.executor.driver.session() as session:
result = session.run(query, {'node1': node1, 'node2': node2})
record = result.single()
return record['jaccard_score'] if record else 0.0
def _adamic_adar_score(self, node1: str, node2: str, node_label: str, relationship_type: str) -> float:
"""Adamic-Adar分数"""
query = f"""
MATCH (n1:{node_label} {{name: $node1}})-[:{relationship_type}]-(common:{node_label})-[:{relationship_type}]-(n2:{node_label} {{name: $node2}})
MATCH (common)-[:{relationship_type}]-(neighbor:{node_label})
WITH common, count(DISTINCT neighbor) as degree
RETURN sum(1.0 / log(degree)) as adamic_adar_score
"""
with self.executor.driver.session() as session:
result = session.run(query, {'node1': node1, 'node2': node2})
record = result.single()
return record['adamic_adar_score'] if record else 0.0
def anomaly_detection(self, node_label: str, relationship_type: str,
threshold: float = 2.0) -> List[Dict[str, Any]]:
"""异常检测算法"""
# 计算节点的各种特征
features_query = f"""
MATCH (n:{node_label})
OPTIONAL MATCH (n)-[r:{relationship_type}]-(neighbor:{node_label})
WITH n, count(DISTINCT neighbor) as degree
OPTIONAL MATCH (n)-[:{relationship_type}*2]-(second_neighbor:{node_label})
WHERE second_neighbor <> n
WITH n, degree, count(DISTINCT second_neighbor) as second_degree
RETURN n.name as name, degree, second_degree,
CASE WHEN degree > 0 THEN toFloat(second_degree) / degree ELSE 0.0 END as clustering_ratio
"""
with self.executor.driver.session() as session:
result = session.run(features_query)
nodes_features = [record.data() for record in result]
if not nodes_features:
return []
# 计算特征的统计信息
degrees = [f['degree'] for f in nodes_features]
clustering_ratios = [f['clustering_ratio'] for f in nodes_features]
degree_mean = sum(degrees) / len(degrees)
degree_std = (sum((d - degree_mean) ** 2 for d in degrees) / len(degrees)) ** 0.5
clustering_mean = sum(clustering_ratios) / len(clustering_ratios)
clustering_std = (sum((c - clustering_mean) ** 2 for c in clustering_ratios) / len(clustering_ratios)) ** 0.5
# 识别异常节点
anomalies = []
for features in nodes_features:
degree_z_score = abs(features['degree'] - degree_mean) / degree_std if degree_std > 0 else 0
clustering_z_score = abs(features['clustering_ratio'] - clustering_mean) / clustering_std if clustering_std > 0 else 0
max_z_score = max(degree_z_score, clustering_z_score)
if max_z_score > threshold:
anomalies.append({
'name': features['name'],
'degree': features['degree'],
'clustering_ratio': features['clustering_ratio'],
'degree_z_score': degree_z_score,
'clustering_z_score': clustering_z_score,
'anomaly_score': max_z_score
})
# 按异常分数排序
anomalies.sort(key=lambda x: x['anomaly_score'], reverse=True)
return anomalies
# 使用示例
import random
custom_algorithms = CustomGraphAlgorithms(executor)
# 影响力最大化
print("影响力最大化:")
influential_nodes = custom_algorithms.influence_maximization("Person", "KNOWS", k=3)
print(f" 最具影响力的节点: {influential_nodes}")
# 链接预测
print("\n链接预测:")
link_predictions = custom_algorithms.link_prediction("Person", "KNOWS", method="common_neighbors")
for node1, node2, score in link_predictions[:5]:
print(f" {node1} - {node2}: {score:.4f}")
# 异常检测
print("\n异常检测:")
anomalies = custom_algorithms.anomaly_detection("Person", "KNOWS", threshold=1.5)
for anomaly in anomalies[:3]:
print(f" {anomaly['name']}: 异常分数 {anomaly['anomaly_score']:.4f}")
5.4 章节总结
核心知识点
- 高级Cypher:条件表达式、列表操作、子查询、路径查询
- 图算法:PageRank、社区检测、中心性分析、最短路径
- Neo4j GDS:图数据科学库的使用和集成
- 自定义算法:影响力最大化、链接预测、异常检测
- 性能优化:查询分析、算法选择、参数调优
最佳实践
- 算法选择:根据具体问题选择合适的算法
- 性能考虑:大图数据的算法优化和并行处理
- 结果验证:算法结果的验证和解释
- 参数调优:根据数据特点调整算法参数
- 可扩展性:设计可扩展的算法框架
练习题
- 实现一个自定义的社区检测算法
- 设计一个实时的异常检测系统
- 优化大规模图的PageRank计算
- 实现一个多层网络的分析算法
- 设计一个图神经网络的特征提取器
下一章预告:在下一章中,我们将学习Neo4j的性能优化和监控技术。