3.1 Cypher简介
什么是Cypher
Cypher是Neo4j的声明式图查询语言,专门为图数据库设计。它的语法直观,接近自然语言,使用ASCII艺术来表示图模式。
Cypher特点
- 声明式:描述想要什么,而不是如何获取
- 模式匹配:使用图模式来查询数据
- 表达力强:支持复杂的图遍历和分析
- 可读性好:语法接近自然语言
- 功能丰富:支持CRUD操作、聚合、排序等
基本语法结构
-- 基本查询结构
MATCH (pattern)
WHERE condition
RETURN result
ORDER BY field
LIMIT count
3.2 节点和关系语法
节点语法
-- 匿名节点
()
-- 带变量的节点
(n)
-- 带标签的节点
(n:Person)
-- 多标签节点
(n:Person:Employee)
-- 带属性的节点
(n:Person {name: 'Alice'})
-- 带变量和属性的节点
(alice:Person {name: 'Alice', age: 30})
关系语法
-- 无方向关系
(a)--(b)
-- 有方向关系
(a)-->(b)
(a)<--(b)
-- 带类型的关系
(a)-[:KNOWS]->(b)
-- 带变量的关系
(a)-[r:KNOWS]->(b)
-- 带属性的关系
(a)-[:KNOWS {since: '2020-01-01'}]->(b)
-- 多类型关系
(a)-[:KNOWS|FRIENDS]->(b)
-- 可变长度关系
(a)-[:KNOWS*1..3]->(b) -- 1到3跳
(a)-[:KNOWS*]->(b) -- 任意长度
(a)-[:KNOWS*2]->(b) -- 恰好2跳
3.3 基本CRUD操作
CREATE - 创建数据
-- 创建单个节点
CREATE (alice:Person {name: 'Alice', age: 30, city: 'Beijing'})
-- 创建多个节点
CREATE
(alice:Person {name: 'Alice', age: 30}),
(bob:Person {name: 'Bob', age: 25}),
(company:Company {name: 'TechCorp'})
-- 创建节点和关系
CREATE (alice:Person {name: 'Alice'})-[:WORKS_FOR {since: '2020-01-01'}]->(company:Company {name: 'TechCorp'})
-- 基于现有节点创建关系
MATCH (alice:Person {name: 'Alice'})
MATCH (bob:Person {name: 'Bob'})
CREATE (alice)-[:KNOWS {since: '2023-01-01'}]->(bob)
MATCH - 查询数据
-- 查询所有节点
MATCH (n) RETURN n
-- 查询特定标签的节点
MATCH (p:Person) RETURN p
-- 查询带条件的节点
MATCH (p:Person {name: 'Alice'}) RETURN p
-- 查询关系
MATCH (p:Person)-[:WORKS_FOR]->(c:Company)
RETURN p.name, c.name
-- 查询路径
MATCH path = (p:Person)-[:KNOWS*1..2]->(friend:Person)
RETURN path
SET - 更新数据
-- 设置属性
MATCH (p:Person {name: 'Alice'})
SET p.age = 31
-- 设置多个属性
MATCH (p:Person {name: 'Alice'})
SET p.age = 31, p.city = 'Shanghai'
-- 使用映射设置属性
MATCH (p:Person {name: 'Alice'})
SET p += {age: 31, email: 'alice@example.com'}
-- 添加标签
MATCH (p:Person {name: 'Alice'})
SET p:Employee
-- 设置关系属性
MATCH (p:Person {name: 'Alice'})-[r:WORKS_FOR]->(c:Company)
SET r.position = 'Senior Developer'
DELETE - 删除数据
-- 删除节点(必须先删除关系)
MATCH (p:Person {name: 'Alice'})
DETACH DELETE p
-- 删除关系
MATCH (p:Person {name: 'Alice'})-[r:KNOWS]->()
DELETE r
-- 删除属性
MATCH (p:Person {name: 'Alice'})
REMOVE p.age
-- 删除标签
MATCH (p:Person {name: 'Alice'})
REMOVE p:Employee
3.4 WHERE子句
基本条件
-- 等值条件
MATCH (p:Person)
WHERE p.age = 30
RETURN p
-- 比较条件
MATCH (p:Person)
WHERE p.age > 25 AND p.age < 35
RETURN p
-- 字符串条件
MATCH (p:Person)
WHERE p.name STARTS WITH 'A'
RETURN p
MATCH (p:Person)
WHERE p.name ENDS WITH 'e'
RETURN p
MATCH (p:Person)
WHERE p.name CONTAINS 'lic'
RETURN p
正则表达式
-- 正则匹配
MATCH (p:Person)
WHERE p.email =~ '.*@gmail\.com'
RETURN p
-- 不区分大小写
MATCH (p:Person)
WHERE p.name =~ '(?i)alice.*'
RETURN p
列表和范围
-- IN操作符
MATCH (p:Person)
WHERE p.name IN ['Alice', 'Bob', 'Charlie']
RETURN p
-- 范围检查
MATCH (p:Person)
WHERE p.age IN range(25, 35)
RETURN p
空值检查
-- 检查属性是否存在
MATCH (p:Person)
WHERE p.email IS NOT NULL
RETURN p
-- 检查属性是否不存在
MATCH (p:Person)
WHERE p.phone IS NULL
RETURN p
路径条件
-- 检查路径是否存在
MATCH (p:Person {name: 'Alice'})
WHERE EXISTS((p)-[:KNOWS]->(:Person {name: 'Bob'}))
RETURN p
-- 路径长度条件
MATCH (p:Person)-[path:KNOWS*]->(friend:Person)
WHERE length(path) <= 3
RETURN p, friend
3.5 RETURN子句
基本返回
-- 返回节点
MATCH (p:Person) RETURN p
-- 返回属性
MATCH (p:Person) RETURN p.name, p.age
-- 返回关系
MATCH (p:Person)-[r:WORKS_FOR]->(c:Company)
RETURN p.name, r.since, c.name
别名和表达式
-- 使用别名
MATCH (p:Person)
RETURN p.name AS person_name, p.age AS person_age
-- 计算表达式
MATCH (p:Person)
RETURN p.name, p.age, p.age + 10 AS age_plus_ten
-- 字符串连接
MATCH (p:Person)
RETURN p.name + ' (' + toString(p.age) + ')' AS name_with_age
聚合函数
-- 计数
MATCH (p:Person) RETURN count(p)
-- 求和
MATCH (p:Person) RETURN sum(p.age)
-- 平均值
MATCH (p:Person) RETURN avg(p.age)
-- 最大值和最小值
MATCH (p:Person) RETURN max(p.age), min(p.age)
-- 收集到列表
MATCH (p:Person) RETURN collect(p.name)
DISTINCT和排序
-- 去重
MATCH (p:Person) RETURN DISTINCT p.city
-- 排序
MATCH (p:Person)
RETURN p.name, p.age
ORDER BY p.age DESC
-- 多字段排序
MATCH (p:Person)
RETURN p.name, p.age, p.city
ORDER BY p.city ASC, p.age DESC
-- 限制结果数量
MATCH (p:Person)
RETURN p.name, p.age
ORDER BY p.age DESC
LIMIT 5
-- 跳过和限制
MATCH (p:Person)
RETURN p.name, p.age
ORDER BY p.age DESC
SKIP 5 LIMIT 10
3.6 WITH子句
管道操作
-- 基本WITH用法
MATCH (p:Person)
WITH p, p.age AS age
WHERE age > 25
RETURN p.name, age
-- 聚合后过滤
MATCH (p:Person)-[:WORKS_FOR]->(c:Company)
WITH c, count(p) AS employee_count
WHERE employee_count > 5
RETURN c.name, employee_count
排序和限制
-- 中间排序
MATCH (p:Person)
WITH p ORDER BY p.age DESC LIMIT 3
MATCH (p)-[:WORKS_FOR]->(c:Company)
RETURN p.name, c.name
变量重命名
-- 重命名变量
MATCH (person:Person)
WITH person AS p
MATCH (p)-[:KNOWS]->(friend:Person)
RETURN p.name, friend.name
3.7 UNION操作
-- 合并结果集
MATCH (p:Person {city: 'Beijing'})
RETURN p.name AS name, 'Beijing' AS location
UNION
MATCH (p:Person {city: 'Shanghai'})
RETURN p.name AS name, 'Shanghai' AS location
-- UNION ALL(包含重复)
MATCH (p:Person)
RETURN p.name
UNION ALL
MATCH (c:Company)
RETURN c.name
3.8 Python中使用Cypher
基础查询类
from neo4j import GraphDatabase
from typing import List, Dict, Any, Optional
import logging
class CypherQueryExecutor:
def __init__(self, uri: str, username: str, password: str):
self.driver = GraphDatabase.driver(uri, auth=(username, password))
self.logger = logging.getLogger(__name__)
def close(self):
"""关闭数据库连接"""
if self.driver:
self.driver.close()
def execute_query(self, query: str, parameters: Dict[str, Any] = None) -> List[Dict[str, Any]]:
"""执行查询并返回结果"""
with self.driver.session() as session:
try:
result = session.run(query, parameters or {})
return [record.data() for record in result]
except Exception as e:
self.logger.error(f"查询执行失败: {e}")
self.logger.error(f"查询语句: {query}")
self.logger.error(f"参数: {parameters}")
raise
def execute_write(self, query: str, parameters: Dict[str, Any] = None) -> Dict[str, Any]:
"""执行写操作"""
with self.driver.session() as session:
try:
result = session.run(query, parameters or {})
summary = result.consume()
return {
'nodes_created': summary.counters.nodes_created,
'nodes_deleted': summary.counters.nodes_deleted,
'relationships_created': summary.counters.relationships_created,
'relationships_deleted': summary.counters.relationships_deleted,
'properties_set': summary.counters.properties_set
}
except Exception as e:
self.logger.error(f"写操作执行失败: {e}")
raise
def execute_transaction(self, queries: List[tuple]) -> List[Dict[str, Any]]:
"""执行事务"""
def transaction_function(tx):
results = []
for query, parameters in queries:
result = tx.run(query, parameters or {})
results.append([record.data() for record in result])
return results
with self.driver.session() as session:
return session.write_transaction(transaction_function)
# 使用示例
executor = CypherQueryExecutor("bolt://localhost:7687", "neo4j", "password")
CRUD操作封装
class PersonManager:
def __init__(self, executor: CypherQueryExecutor):
self.executor = executor
def create_person(self, name: str, age: int, city: str = None, **kwargs) -> Dict[str, Any]:
"""创建人员节点"""
properties = {'name': name, 'age': age}
if city:
properties['city'] = city
properties.update(kwargs)
query = """
CREATE (p:Person $properties)
RETURN p
"""
result = self.executor.execute_query(query, {'properties': properties})
return result[0]['p'] if result else None
def find_person(self, name: str = None, age: int = None, city: str = None) -> List[Dict[str, Any]]:
"""查找人员"""
conditions = []
parameters = {}
if name:
conditions.append("p.name = $name")
parameters['name'] = name
if age:
conditions.append("p.age = $age")
parameters['age'] = age
if city:
conditions.append("p.city = $city")
parameters['city'] = city
where_clause = " AND ".join(conditions) if conditions else "true"
query = f"""
MATCH (p:Person)
WHERE {where_clause}
RETURN p
"""
result = self.executor.execute_query(query, parameters)
return [record['p'] for record in result]
def update_person(self, name: str, **updates) -> bool:
"""更新人员信息"""
if not updates:
return False
set_clauses = [f"p.{key} = ${key}" for key in updates.keys()]
set_clause = ", ".join(set_clauses)
query = f"""
MATCH (p:Person {{name: $name}})
SET {set_clause}
RETURN p
"""
parameters = {'name': name}
parameters.update(updates)
result = self.executor.execute_query(query, parameters)
return len(result) > 0
def delete_person(self, name: str) -> bool:
"""删除人员"""
query = """
MATCH (p:Person {name: $name})
DETACH DELETE p
"""
summary = self.executor.execute_write(query, {'name': name})
return summary['nodes_deleted'] > 0
def create_friendship(self, person1: str, person2: str, since: str = None) -> bool:
"""创建好友关系"""
properties = {}
if since:
properties['since'] = since
query = """
MATCH (p1:Person {name: $person1})
MATCH (p2:Person {name: $person2})
CREATE (p1)-[:KNOWS $properties]->(p2)
CREATE (p2)-[:KNOWS $properties]->(p1)
"""
summary = self.executor.execute_write(query, {
'person1': person1,
'person2': person2,
'properties': properties
})
return summary['relationships_created'] > 0
def find_friends(self, name: str, max_depth: int = 1) -> List[Dict[str, Any]]:
"""查找朋友"""
query = f"""
MATCH (p:Person {{name: $name}})-[:KNOWS*1..{max_depth}]-(friend:Person)
WHERE friend.name <> $name
RETURN DISTINCT friend,
shortestPath((p)-[:KNOWS*]-(friend)) as path
ORDER BY length(path), friend.name
"""
result = self.executor.execute_query(query, {'name': name})
return [{
'friend': record['friend'],
'distance': len(record['path']) if record['path'] else 0
} for record in result]
def recommend_friends(self, name: str, limit: int = 5) -> List[Dict[str, Any]]:
"""好友推荐"""
query = """
MATCH (p:Person {name: $name})-[:KNOWS]-(friend)-[:KNOWS]-(recommendation)
WHERE NOT (p)-[:KNOWS]-(recommendation) AND p <> recommendation
WITH recommendation, count(*) as mutual_friends
RETURN recommendation, mutual_friends
ORDER BY mutual_friends DESC, recommendation.name
LIMIT $limit
"""
result = self.executor.execute_query(query, {
'name': name,
'limit': limit
})
return [{
'person': record['recommendation'],
'mutual_friends': record['mutual_friends']
} for record in result]
# 使用示例
person_manager = PersonManager(executor)
# 创建人员
person_manager.create_person("Alice", 30, "Beijing")
person_manager.create_person("Bob", 25, "Shanghai")
person_manager.create_person("Charlie", 35, "Guangzhou")
# 创建关系
person_manager.create_friendship("Alice", "Bob", "2020-01-01")
person_manager.create_friendship("Bob", "Charlie", "2021-01-01")
# 查询朋友
friends = person_manager.find_friends("Alice", max_depth=2)
print("Alice的朋友:", friends)
# 好友推荐
recommendations = person_manager.recommend_friends("Alice")
print("推荐给Alice的朋友:", recommendations)
查询构建器
class CypherQueryBuilder:
def __init__(self):
self.reset()
def reset(self):
"""重置查询构建器"""
self._match_clauses = []
self._where_clauses = []
self._with_clauses = []
self._return_clauses = []
self._order_by_clauses = []
self._limit_clause = None
self._skip_clause = None
self._parameters = {}
return self
def match(self, pattern: str) -> 'CypherQueryBuilder':
"""添加MATCH子句"""
self._match_clauses.append(pattern)
return self
def where(self, condition: str, **params) -> 'CypherQueryBuilder':
"""添加WHERE条件"""
self._where_clauses.append(condition)
self._parameters.update(params)
return self
def with_clause(self, expression: str) -> 'CypherQueryBuilder':
"""添加WITH子句"""
self._with_clauses.append(expression)
return self
def return_clause(self, expression: str) -> 'CypherQueryBuilder':
"""添加RETURN子句"""
self._return_clauses.append(expression)
return self
def order_by(self, field: str, direction: str = 'ASC') -> 'CypherQueryBuilder':
"""添加ORDER BY子句"""
self._order_by_clauses.append(f"{field} {direction}")
return self
def limit(self, count: int) -> 'CypherQueryBuilder':
"""添加LIMIT子句"""
self._limit_clause = count
return self
def skip(self, count: int) -> 'CypherQueryBuilder':
"""添加SKIP子句"""
self._skip_clause = count
return self
def build(self) -> tuple:
"""构建查询语句"""
query_parts = []
# MATCH子句
if self._match_clauses:
query_parts.append("MATCH " + ", ".join(self._match_clauses))
# WHERE子句
if self._where_clauses:
query_parts.append("WHERE " + " AND ".join(self._where_clauses))
# WITH子句
if self._with_clauses:
query_parts.append("WITH " + ", ".join(self._with_clauses))
# RETURN子句
if self._return_clauses:
query_parts.append("RETURN " + ", ".join(self._return_clauses))
# ORDER BY子句
if self._order_by_clauses:
query_parts.append("ORDER BY " + ", ".join(self._order_by_clauses))
# SKIP子句
if self._skip_clause is not None:
query_parts.append(f"SKIP {self._skip_clause}")
# LIMIT子句
if self._limit_clause is not None:
query_parts.append(f"LIMIT {self._limit_clause}")
query = "\n".join(query_parts)
return query, self._parameters
# 使用示例
builder = CypherQueryBuilder()
# 构建复杂查询
query, params = (builder
.match("(p:Person)")
.where("p.age > $min_age", min_age=25)
.where("p.city = $city", city="Beijing")
.return_clause("p.name, p.age")
.order_by("p.age", "DESC")
.limit(10)
.build())
print("生成的查询:")
print(query)
print("参数:", params)
# 执行查询
result = executor.execute_query(query, params)
print("查询结果:", result)
3.9 性能优化技巧
索引使用
-- 创建索引
CREATE INDEX person_name_index FOR (p:Person) ON (p.name)
CREATE INDEX person_age_index FOR (p:Person) ON (p.age)
-- 复合索引
CREATE INDEX person_name_age_index FOR (p:Person) ON (p.name, p.age)
-- 查看索引使用情况
EXPLAIN MATCH (p:Person {name: 'Alice'}) RETURN p
查询优化
class QueryOptimizer:
def __init__(self, executor: CypherQueryExecutor):
self.executor = executor
def explain_query(self, query: str, parameters: Dict[str, Any] = None) -> Dict[str, Any]:
"""分析查询执行计划"""
explain_query = f"EXPLAIN {query}"
result = self.executor.execute_query(explain_query, parameters)
return result
def profile_query(self, query: str, parameters: Dict[str, Any] = None) -> Dict[str, Any]:
"""分析查询性能"""
profile_query = f"PROFILE {query}"
result = self.executor.execute_query(profile_query, parameters)
return result
def optimize_relationship_query(self, start_label: str, rel_type: str, end_label: str) -> str:
"""优化关系查询"""
# 使用索引友好的查询模式
return f"""
MATCH (start:{start_label})
WHERE start.id = $start_id
MATCH (start)-[:{rel_type}]->(end:{end_label})
RETURN end
"""
def batch_create_nodes(self, label: str, nodes: List[Dict[str, Any]], batch_size: int = 1000) -> int:
"""批量创建节点"""
total_created = 0
for i in range(0, len(nodes), batch_size):
batch = nodes[i:i + batch_size]
query = f"""
UNWIND $nodes AS node
CREATE (n:{label})
SET n = node
"""
summary = self.executor.execute_write(query, {'nodes': batch})
total_created += summary['nodes_created']
return total_created
# 使用示例
optimizer = QueryOptimizer(executor)
# 分析查询
query = "MATCH (p:Person {name: 'Alice'})-[:KNOWS]->(friend) RETURN friend"
explain_result = optimizer.explain_query(query)
print("查询执行计划:", explain_result)
3.10 章节总结
核心知识点
- Cypher语法:节点、关系、标签、属性的表示方法
- CRUD操作:CREATE、MATCH、SET、DELETE的使用
- 查询子句:WHERE、RETURN、WITH、ORDER BY、LIMIT
- 模式匹配:图模式的表达和匹配
- 聚合函数:count、sum、avg、collect等
- Python集成:使用neo4j驱动执行Cypher查询
最佳实践
- 参数化查询:避免SQL注入,提高性能
- 索引优化:为常用查询字段创建索引
- 批量操作:大量数据操作时使用批处理
- 查询分析:使用EXPLAIN和PROFILE分析性能
- 事务管理:合理使用事务保证数据一致性
练习题
- 设计一个电影推荐系统的Cypher查询
- 实现一个社交网络的好友推荐算法
- 编写批量数据导入的Python脚本
- 优化复杂图遍历查询的性能
- 实现一个通用的Cypher查询构建器
下一章预告:在下一章中,我们将学习图数据建模的方法和最佳实践。