3.1 Cypher简介

什么是Cypher

Cypher是Neo4j的声明式图查询语言,专门为图数据库设计。它的语法直观,接近自然语言,使用ASCII艺术来表示图模式。

Cypher特点

  1. 声明式:描述想要什么,而不是如何获取
  2. 模式匹配:使用图模式来查询数据
  3. 表达力强:支持复杂的图遍历和分析
  4. 可读性好:语法接近自然语言
  5. 功能丰富:支持CRUD操作、聚合、排序等

基本语法结构

-- 基本查询结构
MATCH (pattern)
WHERE condition
RETURN result
ORDER BY field
LIMIT count

3.2 节点和关系语法

节点语法

-- 匿名节点
()

-- 带变量的节点
(n)

-- 带标签的节点
(n:Person)

-- 多标签节点
(n:Person:Employee)

-- 带属性的节点
(n:Person {name: 'Alice'})

-- 带变量和属性的节点
(alice:Person {name: 'Alice', age: 30})

关系语法

-- 无方向关系
(a)--(b)

-- 有方向关系
(a)-->(b)
(a)<--(b)

-- 带类型的关系
(a)-[:KNOWS]->(b)

-- 带变量的关系
(a)-[r:KNOWS]->(b)

-- 带属性的关系
(a)-[:KNOWS {since: '2020-01-01'}]->(b)

-- 多类型关系
(a)-[:KNOWS|FRIENDS]->(b)

-- 可变长度关系
(a)-[:KNOWS*1..3]->(b)  -- 1到3跳
(a)-[:KNOWS*]->(b)      -- 任意长度
(a)-[:KNOWS*2]->(b)     -- 恰好2跳

3.3 基本CRUD操作

CREATE - 创建数据

-- 创建单个节点
CREATE (alice:Person {name: 'Alice', age: 30, city: 'Beijing'})

-- 创建多个节点
CREATE 
  (alice:Person {name: 'Alice', age: 30}),
  (bob:Person {name: 'Bob', age: 25}),
  (company:Company {name: 'TechCorp'})

-- 创建节点和关系
CREATE (alice:Person {name: 'Alice'})-[:WORKS_FOR {since: '2020-01-01'}]->(company:Company {name: 'TechCorp'})

-- 基于现有节点创建关系
MATCH (alice:Person {name: 'Alice'})
MATCH (bob:Person {name: 'Bob'})
CREATE (alice)-[:KNOWS {since: '2023-01-01'}]->(bob)

MATCH - 查询数据

-- 查询所有节点
MATCH (n) RETURN n

-- 查询特定标签的节点
MATCH (p:Person) RETURN p

-- 查询带条件的节点
MATCH (p:Person {name: 'Alice'}) RETURN p

-- 查询关系
MATCH (p:Person)-[:WORKS_FOR]->(c:Company) 
RETURN p.name, c.name

-- 查询路径
MATCH path = (p:Person)-[:KNOWS*1..2]->(friend:Person)
RETURN path

SET - 更新数据

-- 设置属性
MATCH (p:Person {name: 'Alice'})
SET p.age = 31

-- 设置多个属性
MATCH (p:Person {name: 'Alice'})
SET p.age = 31, p.city = 'Shanghai'

-- 使用映射设置属性
MATCH (p:Person {name: 'Alice'})
SET p += {age: 31, email: 'alice@example.com'}

-- 添加标签
MATCH (p:Person {name: 'Alice'})
SET p:Employee

-- 设置关系属性
MATCH (p:Person {name: 'Alice'})-[r:WORKS_FOR]->(c:Company)
SET r.position = 'Senior Developer'

DELETE - 删除数据

-- 删除节点(必须先删除关系)
MATCH (p:Person {name: 'Alice'})
DETACH DELETE p

-- 删除关系
MATCH (p:Person {name: 'Alice'})-[r:KNOWS]->()
DELETE r

-- 删除属性
MATCH (p:Person {name: 'Alice'})
REMOVE p.age

-- 删除标签
MATCH (p:Person {name: 'Alice'})
REMOVE p:Employee

3.4 WHERE子句

基本条件

-- 等值条件
MATCH (p:Person)
WHERE p.age = 30
RETURN p

-- 比较条件
MATCH (p:Person)
WHERE p.age > 25 AND p.age < 35
RETURN p

-- 字符串条件
MATCH (p:Person)
WHERE p.name STARTS WITH 'A'
RETURN p

MATCH (p:Person)
WHERE p.name ENDS WITH 'e'
RETURN p

MATCH (p:Person)
WHERE p.name CONTAINS 'lic'
RETURN p

正则表达式

-- 正则匹配
MATCH (p:Person)
WHERE p.email =~ '.*@gmail\.com'
RETURN p

-- 不区分大小写
MATCH (p:Person)
WHERE p.name =~ '(?i)alice.*'
RETURN p

列表和范围

-- IN操作符
MATCH (p:Person)
WHERE p.name IN ['Alice', 'Bob', 'Charlie']
RETURN p

-- 范围检查
MATCH (p:Person)
WHERE p.age IN range(25, 35)
RETURN p

空值检查

-- 检查属性是否存在
MATCH (p:Person)
WHERE p.email IS NOT NULL
RETURN p

-- 检查属性是否不存在
MATCH (p:Person)
WHERE p.phone IS NULL
RETURN p

路径条件

-- 检查路径是否存在
MATCH (p:Person {name: 'Alice'})
WHERE EXISTS((p)-[:KNOWS]->(:Person {name: 'Bob'}))
RETURN p

-- 路径长度条件
MATCH (p:Person)-[path:KNOWS*]->(friend:Person)
WHERE length(path) <= 3
RETURN p, friend

3.5 RETURN子句

基本返回

-- 返回节点
MATCH (p:Person) RETURN p

-- 返回属性
MATCH (p:Person) RETURN p.name, p.age

-- 返回关系
MATCH (p:Person)-[r:WORKS_FOR]->(c:Company)
RETURN p.name, r.since, c.name

别名和表达式

-- 使用别名
MATCH (p:Person)
RETURN p.name AS person_name, p.age AS person_age

-- 计算表达式
MATCH (p:Person)
RETURN p.name, p.age, p.age + 10 AS age_plus_ten

-- 字符串连接
MATCH (p:Person)
RETURN p.name + ' (' + toString(p.age) + ')' AS name_with_age

聚合函数

-- 计数
MATCH (p:Person) RETURN count(p)

-- 求和
MATCH (p:Person) RETURN sum(p.age)

-- 平均值
MATCH (p:Person) RETURN avg(p.age)

-- 最大值和最小值
MATCH (p:Person) RETURN max(p.age), min(p.age)

-- 收集到列表
MATCH (p:Person) RETURN collect(p.name)

DISTINCT和排序

-- 去重
MATCH (p:Person) RETURN DISTINCT p.city

-- 排序
MATCH (p:Person) 
RETURN p.name, p.age 
ORDER BY p.age DESC

-- 多字段排序
MATCH (p:Person) 
RETURN p.name, p.age, p.city
ORDER BY p.city ASC, p.age DESC

-- 限制结果数量
MATCH (p:Person) 
RETURN p.name, p.age 
ORDER BY p.age DESC 
LIMIT 5

-- 跳过和限制
MATCH (p:Person) 
RETURN p.name, p.age 
ORDER BY p.age DESC 
SKIP 5 LIMIT 10

3.6 WITH子句

管道操作

-- 基本WITH用法
MATCH (p:Person)
WITH p, p.age AS age
WHERE age > 25
RETURN p.name, age

-- 聚合后过滤
MATCH (p:Person)-[:WORKS_FOR]->(c:Company)
WITH c, count(p) AS employee_count
WHERE employee_count > 5
RETURN c.name, employee_count

排序和限制

-- 中间排序
MATCH (p:Person)
WITH p ORDER BY p.age DESC LIMIT 3
MATCH (p)-[:WORKS_FOR]->(c:Company)
RETURN p.name, c.name

变量重命名

-- 重命名变量
MATCH (person:Person)
WITH person AS p
MATCH (p)-[:KNOWS]->(friend:Person)
RETURN p.name, friend.name

3.7 UNION操作

-- 合并结果集
MATCH (p:Person {city: 'Beijing'})
RETURN p.name AS name, 'Beijing' AS location
UNION
MATCH (p:Person {city: 'Shanghai'})
RETURN p.name AS name, 'Shanghai' AS location

-- UNION ALL(包含重复)
MATCH (p:Person)
RETURN p.name
UNION ALL
MATCH (c:Company)
RETURN c.name

3.8 Python中使用Cypher

基础查询类

from neo4j import GraphDatabase
from typing import List, Dict, Any, Optional
import logging

class CypherQueryExecutor:
    def __init__(self, uri: str, username: str, password: str):
        self.driver = GraphDatabase.driver(uri, auth=(username, password))
        self.logger = logging.getLogger(__name__)
    
    def close(self):
        """关闭数据库连接"""
        if self.driver:
            self.driver.close()
    
    def execute_query(self, query: str, parameters: Dict[str, Any] = None) -> List[Dict[str, Any]]:
        """执行查询并返回结果"""
        with self.driver.session() as session:
            try:
                result = session.run(query, parameters or {})
                return [record.data() for record in result]
            except Exception as e:
                self.logger.error(f"查询执行失败: {e}")
                self.logger.error(f"查询语句: {query}")
                self.logger.error(f"参数: {parameters}")
                raise
    
    def execute_write(self, query: str, parameters: Dict[str, Any] = None) -> Dict[str, Any]:
        """执行写操作"""
        with self.driver.session() as session:
            try:
                result = session.run(query, parameters or {})
                summary = result.consume()
                return {
                    'nodes_created': summary.counters.nodes_created,
                    'nodes_deleted': summary.counters.nodes_deleted,
                    'relationships_created': summary.counters.relationships_created,
                    'relationships_deleted': summary.counters.relationships_deleted,
                    'properties_set': summary.counters.properties_set
                }
            except Exception as e:
                self.logger.error(f"写操作执行失败: {e}")
                raise
    
    def execute_transaction(self, queries: List[tuple]) -> List[Dict[str, Any]]:
        """执行事务"""
        def transaction_function(tx):
            results = []
            for query, parameters in queries:
                result = tx.run(query, parameters or {})
                results.append([record.data() for record in result])
            return results
        
        with self.driver.session() as session:
            return session.write_transaction(transaction_function)

# 使用示例
executor = CypherQueryExecutor("bolt://localhost:7687", "neo4j", "password")

CRUD操作封装

class PersonManager:
    def __init__(self, executor: CypherQueryExecutor):
        self.executor = executor
    
    def create_person(self, name: str, age: int, city: str = None, **kwargs) -> Dict[str, Any]:
        """创建人员节点"""
        properties = {'name': name, 'age': age}
        if city:
            properties['city'] = city
        properties.update(kwargs)
        
        query = """
        CREATE (p:Person $properties)
        RETURN p
        """
        
        result = self.executor.execute_query(query, {'properties': properties})
        return result[0]['p'] if result else None
    
    def find_person(self, name: str = None, age: int = None, city: str = None) -> List[Dict[str, Any]]:
        """查找人员"""
        conditions = []
        parameters = {}
        
        if name:
            conditions.append("p.name = $name")
            parameters['name'] = name
        
        if age:
            conditions.append("p.age = $age")
            parameters['age'] = age
        
        if city:
            conditions.append("p.city = $city")
            parameters['city'] = city
        
        where_clause = " AND ".join(conditions) if conditions else "true"
        
        query = f"""
        MATCH (p:Person)
        WHERE {where_clause}
        RETURN p
        """
        
        result = self.executor.execute_query(query, parameters)
        return [record['p'] for record in result]
    
    def update_person(self, name: str, **updates) -> bool:
        """更新人员信息"""
        if not updates:
            return False
        
        set_clauses = [f"p.{key} = ${key}" for key in updates.keys()]
        set_clause = ", ".join(set_clauses)
        
        query = f"""
        MATCH (p:Person {{name: $name}})
        SET {set_clause}
        RETURN p
        """
        
        parameters = {'name': name}
        parameters.update(updates)
        
        result = self.executor.execute_query(query, parameters)
        return len(result) > 0
    
    def delete_person(self, name: str) -> bool:
        """删除人员"""
        query = """
        MATCH (p:Person {name: $name})
        DETACH DELETE p
        """
        
        summary = self.executor.execute_write(query, {'name': name})
        return summary['nodes_deleted'] > 0
    
    def create_friendship(self, person1: str, person2: str, since: str = None) -> bool:
        """创建好友关系"""
        properties = {}
        if since:
            properties['since'] = since
        
        query = """
        MATCH (p1:Person {name: $person1})
        MATCH (p2:Person {name: $person2})
        CREATE (p1)-[:KNOWS $properties]->(p2)
        CREATE (p2)-[:KNOWS $properties]->(p1)
        """
        
        summary = self.executor.execute_write(query, {
            'person1': person1,
            'person2': person2,
            'properties': properties
        })
        
        return summary['relationships_created'] > 0
    
    def find_friends(self, name: str, max_depth: int = 1) -> List[Dict[str, Any]]:
        """查找朋友"""
        query = f"""
        MATCH (p:Person {{name: $name}})-[:KNOWS*1..{max_depth}]-(friend:Person)
        WHERE friend.name <> $name
        RETURN DISTINCT friend, 
               shortestPath((p)-[:KNOWS*]-(friend)) as path
        ORDER BY length(path), friend.name
        """
        
        result = self.executor.execute_query(query, {'name': name})
        return [{
            'friend': record['friend'],
            'distance': len(record['path']) if record['path'] else 0
        } for record in result]
    
    def recommend_friends(self, name: str, limit: int = 5) -> List[Dict[str, Any]]:
        """好友推荐"""
        query = """
        MATCH (p:Person {name: $name})-[:KNOWS]-(friend)-[:KNOWS]-(recommendation)
        WHERE NOT (p)-[:KNOWS]-(recommendation) AND p <> recommendation
        WITH recommendation, count(*) as mutual_friends
        RETURN recommendation, mutual_friends
        ORDER BY mutual_friends DESC, recommendation.name
        LIMIT $limit
        """
        
        result = self.executor.execute_query(query, {
            'name': name,
            'limit': limit
        })
        
        return [{
            'person': record['recommendation'],
            'mutual_friends': record['mutual_friends']
        } for record in result]

# 使用示例
person_manager = PersonManager(executor)

# 创建人员
person_manager.create_person("Alice", 30, "Beijing")
person_manager.create_person("Bob", 25, "Shanghai")
person_manager.create_person("Charlie", 35, "Guangzhou")

# 创建关系
person_manager.create_friendship("Alice", "Bob", "2020-01-01")
person_manager.create_friendship("Bob", "Charlie", "2021-01-01")

# 查询朋友
friends = person_manager.find_friends("Alice", max_depth=2)
print("Alice的朋友:", friends)

# 好友推荐
recommendations = person_manager.recommend_friends("Alice")
print("推荐给Alice的朋友:", recommendations)

查询构建器

class CypherQueryBuilder:
    def __init__(self):
        self.reset()
    
    def reset(self):
        """重置查询构建器"""
        self._match_clauses = []
        self._where_clauses = []
        self._with_clauses = []
        self._return_clauses = []
        self._order_by_clauses = []
        self._limit_clause = None
        self._skip_clause = None
        self._parameters = {}
        return self
    
    def match(self, pattern: str) -> 'CypherQueryBuilder':
        """添加MATCH子句"""
        self._match_clauses.append(pattern)
        return self
    
    def where(self, condition: str, **params) -> 'CypherQueryBuilder':
        """添加WHERE条件"""
        self._where_clauses.append(condition)
        self._parameters.update(params)
        return self
    
    def with_clause(self, expression: str) -> 'CypherQueryBuilder':
        """添加WITH子句"""
        self._with_clauses.append(expression)
        return self
    
    def return_clause(self, expression: str) -> 'CypherQueryBuilder':
        """添加RETURN子句"""
        self._return_clauses.append(expression)
        return self
    
    def order_by(self, field: str, direction: str = 'ASC') -> 'CypherQueryBuilder':
        """添加ORDER BY子句"""
        self._order_by_clauses.append(f"{field} {direction}")
        return self
    
    def limit(self, count: int) -> 'CypherQueryBuilder':
        """添加LIMIT子句"""
        self._limit_clause = count
        return self
    
    def skip(self, count: int) -> 'CypherQueryBuilder':
        """添加SKIP子句"""
        self._skip_clause = count
        return self
    
    def build(self) -> tuple:
        """构建查询语句"""
        query_parts = []
        
        # MATCH子句
        if self._match_clauses:
            query_parts.append("MATCH " + ", ".join(self._match_clauses))
        
        # WHERE子句
        if self._where_clauses:
            query_parts.append("WHERE " + " AND ".join(self._where_clauses))
        
        # WITH子句
        if self._with_clauses:
            query_parts.append("WITH " + ", ".join(self._with_clauses))
        
        # RETURN子句
        if self._return_clauses:
            query_parts.append("RETURN " + ", ".join(self._return_clauses))
        
        # ORDER BY子句
        if self._order_by_clauses:
            query_parts.append("ORDER BY " + ", ".join(self._order_by_clauses))
        
        # SKIP子句
        if self._skip_clause is not None:
            query_parts.append(f"SKIP {self._skip_clause}")
        
        # LIMIT子句
        if self._limit_clause is not None:
            query_parts.append(f"LIMIT {self._limit_clause}")
        
        query = "\n".join(query_parts)
        return query, self._parameters

# 使用示例
builder = CypherQueryBuilder()

# 构建复杂查询
query, params = (builder
    .match("(p:Person)")
    .where("p.age > $min_age", min_age=25)
    .where("p.city = $city", city="Beijing")
    .return_clause("p.name, p.age")
    .order_by("p.age", "DESC")
    .limit(10)
    .build())

print("生成的查询:")
print(query)
print("参数:", params)

# 执行查询
result = executor.execute_query(query, params)
print("查询结果:", result)

3.9 性能优化技巧

索引使用

-- 创建索引
CREATE INDEX person_name_index FOR (p:Person) ON (p.name)
CREATE INDEX person_age_index FOR (p:Person) ON (p.age)

-- 复合索引
CREATE INDEX person_name_age_index FOR (p:Person) ON (p.name, p.age)

-- 查看索引使用情况
EXPLAIN MATCH (p:Person {name: 'Alice'}) RETURN p

查询优化

class QueryOptimizer:
    def __init__(self, executor: CypherQueryExecutor):
        self.executor = executor
    
    def explain_query(self, query: str, parameters: Dict[str, Any] = None) -> Dict[str, Any]:
        """分析查询执行计划"""
        explain_query = f"EXPLAIN {query}"
        result = self.executor.execute_query(explain_query, parameters)
        return result
    
    def profile_query(self, query: str, parameters: Dict[str, Any] = None) -> Dict[str, Any]:
        """分析查询性能"""
        profile_query = f"PROFILE {query}"
        result = self.executor.execute_query(profile_query, parameters)
        return result
    
    def optimize_relationship_query(self, start_label: str, rel_type: str, end_label: str) -> str:
        """优化关系查询"""
        # 使用索引友好的查询模式
        return f"""
        MATCH (start:{start_label})
        WHERE start.id = $start_id
        MATCH (start)-[:{rel_type}]->(end:{end_label})
        RETURN end
        """
    
    def batch_create_nodes(self, label: str, nodes: List[Dict[str, Any]], batch_size: int = 1000) -> int:
        """批量创建节点"""
        total_created = 0
        
        for i in range(0, len(nodes), batch_size):
            batch = nodes[i:i + batch_size]
            
            query = f"""
            UNWIND $nodes AS node
            CREATE (n:{label})
            SET n = node
            """
            
            summary = self.executor.execute_write(query, {'nodes': batch})
            total_created += summary['nodes_created']
        
        return total_created

# 使用示例
optimizer = QueryOptimizer(executor)

# 分析查询
query = "MATCH (p:Person {name: 'Alice'})-[:KNOWS]->(friend) RETURN friend"
explain_result = optimizer.explain_query(query)
print("查询执行计划:", explain_result)

3.10 章节总结

核心知识点

  1. Cypher语法:节点、关系、标签、属性的表示方法
  2. CRUD操作:CREATE、MATCH、SET、DELETE的使用
  3. 查询子句:WHERE、RETURN、WITH、ORDER BY、LIMIT
  4. 模式匹配:图模式的表达和匹配
  5. 聚合函数:count、sum、avg、collect等
  6. Python集成:使用neo4j驱动执行Cypher查询

最佳实践

  1. 参数化查询:避免SQL注入,提高性能
  2. 索引优化:为常用查询字段创建索引
  3. 批量操作:大量数据操作时使用批处理
  4. 查询分析:使用EXPLAIN和PROFILE分析性能
  5. 事务管理:合理使用事务保证数据一致性

练习题

  1. 设计一个电影推荐系统的Cypher查询
  2. 实现一个社交网络的好友推荐算法
  3. 编写批量数据导入的Python脚本
  4. 优化复杂图遍历查询的性能
  5. 实现一个通用的Cypher查询构建器

下一章预告:在下一章中,我们将学习图数据建模的方法和最佳实践。