学习目标

通过本章学习,你将掌握: - 函数的定义、调用和参数传递 - 作用域和命名空间的概念 - 装饰器和闭包的使用 - 模块和包的创建与使用 - Python标准库的常用模块 - 第三方库的安装和使用

6.1 函数基础

函数的定义和调用

# 基本函数定义
def greet(name):
    """问候函数"""
    return f"Hello, {name}!"

# 函数调用
message = greet("Alice")
print(message)  # 输出: Hello, Alice!

# 无参数函数
def get_current_time():
    """获取当前时间"""
    from datetime import datetime
    return datetime.now().strftime('%Y-%m-%d %H:%M:%S')

print(f"当前时间: {get_current_time()}")

# 无返回值函数
def print_separator(char='=', length=50):
    """打印分隔符"""
    print(char * length)

print_separator()
print_separator('-', 30)

# 多返回值函数
def get_name_age():
    """获取姓名和年龄"""
    return "Alice", 25

name, age = get_name_age()
print(f"姓名: {name}, 年龄: {age}")

# 也可以作为元组接收
result = get_name_age()
print(f"结果: {result}")
print(f"类型: {type(result)}")

参数传递

# 1. 位置参数
def add(a, b, c):
    """三个数相加"""
    return a + b + c

result = add(1, 2, 3)
print(f"1 + 2 + 3 = {result}")

# 2. 关键字参数
def create_person(name, age, city):
    """创建人员信息"""
    return {
        'name': name,
        'age': age,
        'city': city
    }

# 使用关键字参数(顺序可以改变)
person1 = create_person(name="Alice", city="Beijing", age=25)
person2 = create_person("Bob", age=30, city="Shanghai")
print(f"Person1: {person1}")
print(f"Person2: {person2}")

# 3. 默认参数
def greet_with_title(name, title="Mr."):
    """带称谓的问候"""
    return f"Hello, {title} {name}!"

print(greet_with_title("Smith"))  # 使用默认值
print(greet_with_title("Johnson", "Dr."))  # 指定值
print(greet_with_title("Brown", title="Ms."))  # 关键字参数

# 4. 可变位置参数 (*args)
def sum_all(*numbers):
    """计算所有数字的和"""
    total = 0
    for num in numbers:
        total += num
    return total

print(f"sum_all(1, 2, 3) = {sum_all(1, 2, 3)}")
print(f"sum_all(1, 2, 3, 4, 5) = {sum_all(1, 2, 3, 4, 5)}")
print(f"sum_all() = {sum_all()}")

# 传递列表给*args
numbers = [1, 2, 3, 4, 5]
print(f"sum_all(*numbers) = {sum_all(*numbers)}")

# 5. 可变关键字参数 (**kwargs)
def create_profile(**info):
    """创建用户档案"""
    profile = {}
    for key, value in info.items():
        profile[key] = value
    return profile

profile1 = create_profile(name="Alice", age=25, city="Beijing")
profile2 = create_profile(name="Bob", occupation="Engineer", salary=50000)
print(f"Profile1: {profile1}")
print(f"Profile2: {profile2}")

# 传递字典给**kwargs
user_data = {'name': 'Charlie', 'age': 35, 'email': 'charlie@example.com'}
profile3 = create_profile(**user_data)
print(f"Profile3: {profile3}")

# 6. 混合参数类型
def complex_function(required_arg, default_arg="default", *args, **kwargs):
    """复杂参数函数"""
    print(f"必需参数: {required_arg}")
    print(f"默认参数: {default_arg}")
    print(f"位置参数: {args}")
    print(f"关键字参数: {kwargs}")
    return f"处理完成"

print("=== 复杂函数调用 ===")
result = complex_function(
    "必需值",
    "自定义默认值",
    1, 2, 3,
    name="Alice",
    age=25
)
print(f"返回值: {result}")

# 7. 强制关键字参数 (Python 3+)
def create_user(name, *, age, email):
    """创建用户(age和email必须使用关键字参数)"""
    return {
        'name': name,
        'age': age,
        'email': email
    }

# 正确调用
user = create_user("Alice", age=25, email="alice@example.com")
print(f"用户: {user}")

# 错误调用(会报错)
# user = create_user("Alice", 25, "alice@example.com")  # TypeError

# 8. 仅位置参数 (Python 3.8+)
def divide(dividend, divisor, /):
    """除法函数(参数只能按位置传递)"""
    if divisor == 0:
        return "除数不能为零"
    return dividend / divisor

# 正确调用
result = divide(10, 2)
print(f"10 / 2 = {result}")

# 错误调用(会报错)
# result = divide(dividend=10, divisor=2)  # TypeError

函数作为对象

# 函数是第一类对象
def square(x):
    """计算平方"""
    return x ** 2

def cube(x):
    """计算立方"""
    return x ** 3

# 函数可以赋值给变量
my_function = square
print(f"my_function(5) = {my_function(5)}")

# 函数可以作为参数传递
def apply_function(func, value):
    """应用函数到值"""
    return func(value)

result1 = apply_function(square, 4)
result2 = apply_function(cube, 3)
print(f"square(4) = {result1}")
print(f"cube(3) = {result2}")

# 函数可以存储在数据结构中
operations = {
    'square': square,
    'cube': cube,
    'double': lambda x: x * 2
}

for name, func in operations.items():
    result = func(3)
    print(f"{name}(3) = {result}")

# 函数可以作为返回值
def get_operation(op_type):
    """根据类型返回对应的操作函数"""
    if op_type == 'square':
        return square
    elif op_type == 'cube':
        return cube
    else:
        return lambda x: x

op = get_operation('square')
print(f"动态获取的函数: {op(6)}")

# 高阶函数示例
def create_multiplier(factor):
    """创建乘法器函数"""
    def multiplier(x):
        return x * factor
    return multiplier

# 创建不同的乘法器
double = create_multiplier(2)
triple = create_multiplier(3)

print(f"double(5) = {double(5)}")
print(f"triple(5) = {triple(5)}")

# 内置高阶函数
numbers = [1, 2, 3, 4, 5]

# map: 对每个元素应用函数
squared = list(map(square, numbers))
print(f"原数组: {numbers}")
print(f"平方后: {squared}")

# filter: 过滤元素
def is_even(x):
    return x % 2 == 0

even_numbers = list(filter(is_even, numbers))
print(f"偶数: {even_numbers}")

# 使用lambda表达式
odd_numbers = list(filter(lambda x: x % 2 == 1, numbers))
print(f"奇数: {odd_numbers}")

# reduce: 累积操作
from functools import reduce

product = reduce(lambda x, y: x * y, numbers)
print(f"所有数字的乘积: {product}")

# 排序中使用函数
students = [
    {'name': 'Alice', 'grade': 85},
    {'name': 'Bob', 'grade': 90},
    {'name': 'Charlie', 'grade': 78}
]

# 按成绩排序
sorted_by_grade = sorted(students, key=lambda s: s['grade'])
print("按成绩排序:")
for student in sorted_by_grade:
    print(f"  {student['name']}: {student['grade']}")

# 按姓名排序
sorted_by_name = sorted(students, key=lambda s: s['name'])
print("按姓名排序:")
for student in sorted_by_name:
    print(f"  {student['name']}: {student['grade']}")

运行函数基础示例:

python function_basics.py

6.2 作用域和命名空间

作用域规则 (LEGB)

# LEGB规则:Local -> Enclosing -> Global -> Built-in

# 全局变量
global_var = "我是全局变量"

def outer_function():
    # 封闭作用域变量
    enclosing_var = "我是封闭作用域变量"
    
    def inner_function():
        # 局部变量
        local_var = "我是局部变量"
        
        print(f"局部变量: {local_var}")
        print(f"封闭变量: {enclosing_var}")
        print(f"全局变量: {global_var}")
        print(f"内置函数: {len([1, 2, 3])}")
    
    inner_function()

print("=== LEGB规则演示 ===")
outer_function()

# 变量遮蔽
def variable_shadowing():
    global_var = "局部的global_var"  # 遮蔽全局变量
    print(f"函数内的global_var: {global_var}")

print(f"\n函数外的global_var: {global_var}")
variable_shadowing()
print(f"函数调用后的global_var: {global_var}")

# global关键字
counter = 0

def increment():
    global counter  # 声明使用全局变量
    counter += 1
    print(f"计数器: {counter}")

print("\n=== global关键字 ===")
print(f"初始计数器: {counter}")
increment()
increment()
increment()

# nonlocal关键字
def create_counter():
    count = 0
    
    def increment():
        nonlocal count  # 声明使用封闭作用域变量
        count += 1
        return count
    
    def decrement():
        nonlocal count
        count -= 1
        return count
    
    def get_count():
        return count
    
    return increment, decrement, get_count

print("\n=== nonlocal关键字 ===")
inc, dec, get = create_counter()
print(f"初始值: {get()}")
print(f"递增: {inc()}")
print(f"递增: {inc()}")
print(f"递减: {dec()}")
print(f"当前值: {get()}")

# 命名空间查看
def namespace_demo():
    local_var = "局部变量"
    
    print("局部命名空间:")
    print(locals())
    
    print("\n全局命名空间的部分内容:")
    global_ns = globals()
    for key in ['global_var', 'counter', '__name__']:
        if key in global_ns:
            print(f"  {key}: {global_ns[key]}")

print("\n=== 命名空间演示 ===")
namespace_demo()

# 内置命名空间
print("\n=== 内置命名空间 ===")
import builtins
print(f"内置函数数量: {len(dir(builtins))}")
print("部分内置函数:", [name for name in dir(builtins) if not name.startswith('_')][:10])

# 作用域陷阱
print("\n=== 作用域陷阱 ===")

# 陷阱1:循环变量的作用域
functions = []
for i in range(3):
    functions.append(lambda: i)  # 错误:所有lambda都引用同一个i

print("错误的闭包:")
for func in functions:
    print(f"  结果: {func()}")  # 都输出2

# 正确的做法
functions_correct = []
for i in range(3):
    functions_correct.append(lambda x=i: x)  # 正确:使用默认参数捕获当前值

print("正确的闭包:")
for func in functions_correct:
    print(f"  结果: {func()}")  # 输出0, 1, 2

# 陷阱2:可变默认参数
def append_to_list(item, target_list=[]):  # 危险!
    target_list.append(item)
    return target_list

print("\n可变默认参数陷阱:")
list1 = append_to_list(1)
list2 = append_to_list(2)
print(f"list1: {list1}")  # [1, 2]
print(f"list2: {list2}")  # [1, 2] - 意外!

# 正确的做法
def append_to_list_correct(item, target_list=None):
    if target_list is None:
        target_list = []
    target_list.append(item)
    return target_list

print("正确的可变默认参数:")
list3 = append_to_list_correct(1)
list4 = append_to_list_correct(2)
print(f"list3: {list3}")  # [1]
print(f"list4: {list4}")  # [2]

# 作用域的实际应用
class ConfigManager:
    """配置管理器"""
    _instance = None
    _config = {}
    
    def __new__(cls):
        if cls._instance is None:
            cls._instance = super().__new__(cls)
        return cls._instance
    
    @classmethod
    def set_config(cls, key, value):
        cls._config[key] = value
    
    @classmethod
    def get_config(cls, key, default=None):
        return cls._config.get(key, default)
    
    @classmethod
    def get_all_config(cls):
        return cls._config.copy()

# 使用配置管理器
print("\n=== 配置管理器示例 ===")
config1 = ConfigManager()
config2 = ConfigManager()

print(f"是否为同一实例: {config1 is config2}")

ConfigManager.set_config('database_url', 'localhost:5432')
ConfigManager.set_config('debug', True)

print(f"数据库URL: {ConfigManager.get_config('database_url')}")
print(f"调试模式: {ConfigManager.get_config('debug')}")
print(f"所有配置: {ConfigManager.get_all_config()}")

闭包详解

# 闭包的基本概念
def outer_function(x):
    """外部函数"""
    
    def inner_function(y):
        """内部函数 - 这是一个闭包"""
        return x + y  # 访问外部函数的变量
    
    return inner_function

print("=== 闭包基础 ===")
# 创建闭包
add_10 = outer_function(10)
add_20 = outer_function(20)

print(f"add_10(5) = {add_10(5)}")  # 15
print(f"add_20(5) = {add_20(5)}")  # 25

# 检查闭包
print(f"add_10是否为闭包: {add_10.__closure__ is not None}")
print(f"闭包变量: {add_10.__closure__[0].cell_contents}")

# 实用的闭包示例
def create_validator(min_length, max_length):
    """创建字符串长度验证器"""
    
    def validate(text):
        """验证字符串长度"""
        length = len(text)
        if length < min_length:
            return False, f"长度不能少于{min_length}个字符"
        elif length > max_length:
            return False, f"长度不能超过{max_length}个字符"
        else:
            return True, "验证通过"
    
    return validate

print("\n=== 验证器闭包 ===")
# 创建不同的验证器
username_validator = create_validator(3, 20)
password_validator = create_validator(8, 50)

# 测试用户名验证
test_usernames = ["ab", "alice", "very_long_username_that_exceeds_limit"]
for username in test_usernames:
    is_valid, message = username_validator(username)
    print(f"用户名 '{username}': {message}")

print()
# 测试密码验证
test_passwords = ["123", "password123", "very_long_password_that_definitely_exceeds_the_maximum_limit"]
for password in test_passwords:
    is_valid, message = password_validator(password)
    print(f"密码 '{password}': {message}")

# 闭包实现计数器
def create_counter(initial=0, step=1):
    """创建计数器闭包"""
    count = initial
    
    def counter():
        nonlocal count
        count += step
        return count
    
    def reset():
        nonlocal count
        count = initial
    
    def get_current():
        return count
    
    # 返回多个函数
    counter.reset = reset
    counter.get_current = get_current
    
    return counter

print("\n=== 计数器闭包 ===")
counter1 = create_counter(0, 1)
counter2 = create_counter(100, 5)

print(f"counter1: {counter1()}")
print(f"counter1: {counter1()}")
print(f"counter1当前值: {counter1.get_current()}")

print(f"counter2: {counter2()}")
print(f"counter2: {counter2()}")
print(f"counter2当前值: {counter2.get_current()}")

counter1.reset()
print(f"counter1重置后: {counter1.get_current()}")

# 闭包实现缓存
def create_cache(max_size=100):
    """创建缓存闭包"""
    cache = {}
    access_order = []
    
    def get(key):
        """获取缓存值"""
        if key in cache:
            # 更新访问顺序
            access_order.remove(key)
            access_order.append(key)
            return cache[key]
        return None
    
    def set(key, value):
        """设置缓存值"""
        if key in cache:
            # 更新现有值
            access_order.remove(key)
        elif len(cache) >= max_size:
            # 删除最久未使用的项
            oldest_key = access_order.pop(0)
            del cache[oldest_key]
        
        cache[key] = value
        access_order.append(key)
    
    def clear():
        """清空缓存"""
        cache.clear()
        access_order.clear()
    
    def size():
        """获取缓存大小"""
        return len(cache)
    
    def keys():
        """获取所有键"""
        return list(cache.keys())
    
    # 创建缓存对象
    cache_obj = type('Cache', (), {
        'get': get,
        'set': set,
        'clear': clear,
        'size': size,
        'keys': keys
    })()
    
    return cache_obj

print("\n=== 缓存闭包 ===")
cache = create_cache(max_size=3)

# 添加缓存项
cache.set('user:1', {'name': 'Alice', 'age': 25})
cache.set('user:2', {'name': 'Bob', 'age': 30})
cache.set('user:3', {'name': 'Charlie', 'age': 35})

print(f"缓存大小: {cache.size()}")
print(f"缓存键: {cache.keys()}")

# 访问缓存
user1 = cache.get('user:1')
print(f"用户1: {user1}")

# 添加新项(会删除最久未使用的)
cache.set('user:4', {'name': 'David', 'age': 28})
print(f"添加user:4后的键: {cache.keys()}")

# 装饰器形式的闭包
def memoize(func):
    """记忆化装饰器"""
    cache = {}
    
    def wrapper(*args, **kwargs):
        # 创建缓存键
        key = str(args) + str(sorted(kwargs.items()))
        
        if key not in cache:
            cache[key] = func(*args, **kwargs)
            print(f"计算并缓存: {func.__name__}{args}")
        else:
            print(f"从缓存获取: {func.__name__}{args}")
        
        return cache[key]
    
    wrapper.cache = cache
    wrapper.clear_cache = lambda: cache.clear()
    
    return wrapper

@memoize
def fibonacci(n):
    """斐波那契数列(递归实现)"""
    if n <= 1:
        return n
    return fibonacci(n-1) + fibonacci(n-2)

print("\n=== 记忆化装饰器 ===")
print(f"fibonacci(10) = {fibonacci(10)}")
print(f"fibonacci(10) = {fibonacci(10)}")  # 第二次调用会使用缓存
print(f"缓存大小: {len(fibonacci.cache)}")

运行作用域和闭包示例:

python scope_closure.py

6.3 装饰器

装饰器基础

import time
import functools
from datetime import datetime

# 简单装饰器
def simple_decorator(func):
    """简单装饰器示例"""
    def wrapper():
        print("函数执行前")
        result = func()
        print("函数执行后")
        return result
    return wrapper

@simple_decorator
def say_hello():
    """问候函数"""
    print("Hello, World!")
    return "greeting_done"

print("=== 简单装饰器 ===")
result = say_hello()
print(f"返回值: {result}")

# 带参数的装饰器
def timing_decorator(func):
    """计时装饰器"""
    @functools.wraps(func)  # 保持原函数的元数据
    def wrapper(*args, **kwargs):
        start_time = time.time()
        result = func(*args, **kwargs)
        end_time = time.time()
        print(f"{func.__name__} 执行时间: {end_time - start_time:.4f}秒")
        return result
    return wrapper

@timing_decorator
def slow_function(n):
    """模拟耗时函数"""
    total = 0
    for i in range(n):
        total += i ** 2
    return total

print("\n=== 计时装饰器 ===")
result = slow_function(100000)
print(f"计算结果: {result}")

# 带参数的装饰器
def retry(max_attempts=3, delay=1):
    """重试装饰器"""
    def decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            for attempt in range(max_attempts):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    if attempt == max_attempts - 1:
                        print(f"函数 {func.__name__} 在 {max_attempts} 次尝试后仍然失败")
                        raise e
                    else:
                        print(f"第 {attempt + 1} 次尝试失败: {e},{delay}秒后重试...")
                        time.sleep(delay)
        return wrapper
    return decorator

# 模拟不稳定的函数
import random

@retry(max_attempts=3, delay=0.5)
def unstable_function():
    """不稳定的函数"""
    if random.random() < 0.7:  # 70%的概率失败
        raise Exception("随机失败")
    return "成功执行"

print("\n=== 重试装饰器 ===")
try:
    result = unstable_function()
    print(f"最终结果: {result}")
except Exception as e:
    print(f"最终失败: {e}")

# 日志装饰器
def log_calls(log_args=True, log_result=True):
    """日志装饰器"""
    def decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            # 记录函数调用
            timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
            log_msg = f"[{timestamp}] 调用 {func.__name__}"
            
            if log_args and (args or kwargs):
                log_msg += f" 参数: args={args}, kwargs={kwargs}"
            
            print(log_msg)
            
            # 执行函数
            try:
                result = func(*args, **kwargs)
                
                if log_result:
                    print(f"[{timestamp}] {func.__name__} 返回: {result}")
                
                return result
            except Exception as e:
                print(f"[{timestamp}] {func.__name__} 异常: {e}")
                raise
        
        return wrapper
    return decorator

@log_calls(log_args=True, log_result=True)
def calculate_area(length, width):
    """计算矩形面积"""
    return length * width

@log_calls(log_args=False, log_result=True)
def get_user_info(user_id):
    """获取用户信息"""
    # 模拟数据库查询
    users = {
        1: {'name': 'Alice', 'email': 'alice@example.com'},
        2: {'name': 'Bob', 'email': 'bob@example.com'}
    }
    return users.get(user_id, None)

print("\n=== 日志装饰器 ===")
area = calculate_area(5, 3)
user = get_user_info(1)

# 缓存装饰器
def cache_result(max_size=128):
    """结果缓存装饰器"""
    def decorator(func):
        cache = {}
        access_order = []
        
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            # 创建缓存键
            key = str(args) + str(sorted(kwargs.items()))
            
            if key in cache:
                # 缓存命中
                access_order.remove(key)
                access_order.append(key)
                print(f"缓存命中: {func.__name__}{args}")
                return cache[key]
            
            # 缓存未命中,执行函数
            result = func(*args, **kwargs)
            
            # 添加到缓存
            if len(cache) >= max_size:
                # 删除最久未使用的项
                oldest_key = access_order.pop(0)
                del cache[oldest_key]
            
            cache[key] = result
            access_order.append(key)
            print(f"缓存存储: {func.__name__}{args}")
            
            return result
        
        # 添加缓存管理方法
        wrapper.cache_info = lambda: {'size': len(cache), 'keys': list(cache.keys())}
        wrapper.cache_clear = lambda: (cache.clear(), access_order.clear())
        
        return wrapper
    return decorator

@cache_result(max_size=5)
def expensive_calculation(n):
    """昂贵的计算"""
    print(f"正在计算 {n} 的阶乘...")
    result = 1
    for i in range(1, n + 1):
        result *= i
    return result

print("\n=== 缓存装饰器 ===")
print(f"5! = {expensive_calculation(5)}")
print(f"3! = {expensive_calculation(3)}")
print(f"5! = {expensive_calculation(5)}")  # 缓存命中
print(f"7! = {expensive_calculation(7)}")
print(f"缓存信息: {expensive_calculation.cache_info()}")

# 权限检查装饰器
def require_permission(permission):
    """权限检查装饰器"""
    def decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            # 模拟当前用户(实际应用中可能从session或token获取)
            current_user = getattr(wrapper, 'current_user', None)
            
            if not current_user:
                raise PermissionError("用户未登录")
            
            user_permissions = current_user.get('permissions', [])
            if permission not in user_permissions:
                raise PermissionError(f"用户缺少权限: {permission}")
            
            return func(*args, **kwargs)
        
        return wrapper
    return decorator

@require_permission('admin')
def delete_user(user_id):
    """删除用户(需要管理员权限)"""
    return f"用户 {user_id} 已删除"

@require_permission('read')
def view_user(user_id):
    """查看用户(需要读取权限)"""
    return f"用户 {user_id} 的信息"

print("\n=== 权限检查装饰器 ===")

# 设置当前用户
delete_user.current_user = {'id': 1, 'name': 'admin', 'permissions': ['read', 'write', 'admin']}
view_user.current_user = {'id': 2, 'name': 'user', 'permissions': ['read']}

try:
    result = delete_user(123)
    print(f"删除结果: {result}")
except PermissionError as e:
    print(f"权限错误: {e}")

try:
    result = view_user(123)
    print(f"查看结果: {result}")
except PermissionError as e:
    print(f"权限错误: {e}")

# 类装饰器
class CountCalls:
    """计数装饰器类"""
    
    def __init__(self, func):
        self.func = func
        self.count = 0
        functools.update_wrapper(self, func)
    
    def __call__(self, *args, **kwargs):
        self.count += 1
        print(f"{self.func.__name__} 被调用了 {self.count} 次")
        return self.func(*args, **kwargs)
    
    def reset_count(self):
        """重置计数"""
        self.count = 0

@CountCalls
def greet(name):
    """问候函数"""
    return f"Hello, {name}!"

print("\n=== 类装饰器 ===")
print(greet("Alice"))
print(greet("Bob"))
print(greet("Charlie"))
greet.reset_count()
print(greet("David"))

# 多个装饰器的组合
@log_calls(log_args=True, log_result=False)
@timing_decorator
@cache_result(max_size=3)
def complex_function(x, y):
    """复杂函数(应用多个装饰器)"""
    time.sleep(0.1)  # 模拟耗时操作
    return x ** y

print("\n=== 多装饰器组合 ===")
result1 = complex_function(2, 3)
result2 = complex_function(3, 2)
result3 = complex_function(2, 3)  # 缓存命中

高级装饰器模式

import asyncio
import threading
import queue
from contextlib import contextmanager

# 单例装饰器
def singleton(cls):
    """单例装饰器"""
    instances = {}
    lock = threading.Lock()
    
    def get_instance(*args, **kwargs):
        if cls not in instances:
            with lock:
                if cls not in instances:
                    instances[cls] = cls(*args, **kwargs)
        return instances[cls]
    
    return get_instance

@singleton
class DatabaseConnection:
    """数据库连接类"""
    
    def __init__(self):
        self.connection_id = id(self)
        print(f"创建数据库连接: {self.connection_id}")
    
    def query(self, sql):
        return f"执行查询: {sql} (连接ID: {self.connection_id})"

print("=== 单例装饰器 ===")
db1 = DatabaseConnection()
db2 = DatabaseConnection()
print(f"db1 is db2: {db1 is db2}")
print(db1.query("SELECT * FROM users"))

# 属性装饰器
def property_decorator(func):
    """属性装饰器"""
    attr_name = f"_{func.__name__}"
    
    def getter(self):
        return getattr(self, attr_name, None)
    
    def setter(self, value):
        if func(self, value):
            setattr(self, attr_name, value)
        else:
            raise ValueError(f"无效的值: {value}")
    
    return property(getter, setter)

class Person:
    """人员类"""
    
    @property_decorator
    def age(self, value):
        """年龄验证"""
        return isinstance(value, int) and 0 <= value <= 150
    
    @property_decorator
    def email(self, value):
        """邮箱验证"""
        return isinstance(value, str) and '@' in value

print("\n=== 属性装饰器 ===")
person = Person()
try:
    person.age = 25
    person.email = "alice@example.com"
    print(f"年龄: {person.age}, 邮箱: {person.email}")
    
    person.age = -5  # 会抛出异常
except ValueError as e:
    print(f"验证错误: {e}")

# 异步装饰器
def async_retry(max_attempts=3, delay=1):
    """异步重试装饰器"""
    def decorator(func):
        @functools.wraps(func)
        async def wrapper(*args, **kwargs):
            for attempt in range(max_attempts):
                try:
                    return await func(*args, **kwargs)
                except Exception as e:
                    if attempt == max_attempts - 1:
                        print(f"异步函数 {func.__name__} 在 {max_attempts} 次尝试后仍然失败")
                        raise e
                    else:
                        print(f"第 {attempt + 1} 次尝试失败: {e},{delay}秒后重试...")
                        await asyncio.sleep(delay)
        return wrapper
    return decorator

@async_retry(max_attempts=3, delay=0.5)
async def async_unstable_function():
    """异步不稳定函数"""
    import random
    if random.random() < 0.7:
        raise Exception("异步随机失败")
    return "异步成功执行"

# 上下文管理装饰器
def with_context(context_manager):
    """上下文管理装饰器"""
    def decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            with context_manager:
                return func(*args, **kwargs)
        return wrapper
    return decorator

@contextmanager
def database_transaction():
    """数据库事务上下文管理器"""
    print("开始事务")
    try:
        yield
        print("提交事务")
    except Exception as e:
        print(f"回滚事务: {e}")
        raise

@with_context(database_transaction())
def update_user_data(user_id, data):
    """更新用户数据"""
    print(f"更新用户 {user_id}: {data}")
    # 模拟可能的异常
    if user_id < 0:
        raise ValueError("无效的用户ID")
    return "更新成功"

print("\n=== 上下文管理装饰器 ===")
try:
    result = update_user_data(1, {'name': 'Alice'})
    print(f"结果: {result}")
except Exception as e:
    print(f"错误: {e}")

try:
    result = update_user_data(-1, {'name': 'Bob'})
except Exception as e:
    print(f"错误: {e}")

# 装饰器工厂
class DecoratorFactory:
    """装饰器工厂类"""
    
    @staticmethod
    def rate_limit(max_calls, time_window):
        """速率限制装饰器"""
        def decorator(func):
            calls = queue.Queue(maxsize=max_calls)
            lock = threading.Lock()
            
            @functools.wraps(func)
            def wrapper(*args, **kwargs):
                current_time = time.time()
                
                with lock:
                    # 清理过期的调用记录
                    while not calls.empty():
                        if current_time - calls.queue[0] > time_window:
                            calls.get()
                        else:
                            break
                    
                    # 检查是否超过限制
                    if calls.full():
                        raise Exception(f"速率限制: 在{time_window}秒内最多调用{max_calls}次")
                    
                    calls.put(current_time)
                
                return func(*args, **kwargs)
            
            return wrapper
        return decorator
    
    @staticmethod
    def validate_types(**type_checks):
        """类型验证装饰器"""
        def decorator(func):
            @functools.wraps(func)
            def wrapper(*args, **kwargs):
                # 获取函数签名
                import inspect
                sig = inspect.signature(func)
                bound_args = sig.bind(*args, **kwargs)
                bound_args.apply_defaults()
                
                # 验证类型
                for param_name, expected_type in type_checks.items():
                    if param_name in bound_args.arguments:
                        value = bound_args.arguments[param_name]
                        if not isinstance(value, expected_type):
                            raise TypeError(
                                f"参数 {param_name} 期望类型 {expected_type.__name__}, "
                                f"实际类型 {type(value).__name__}"
                            )
                
                return func(*args, **kwargs)
            
            return wrapper
        return decorator

@DecoratorFactory.rate_limit(max_calls=3, time_window=5)
def api_call(endpoint):
    """API调用函数"""
    return f"调用API: {endpoint}"

@DecoratorFactory.validate_types(name=str, age=int, active=bool)
def create_user(name, age, active=True):
    """创建用户"""
    return {'name': name, 'age': age, 'active': active}

print("\n=== 装饰器工厂 ===")

# 测试速率限制
for i in range(5):
    try:
        result = api_call(f"/users/{i}")
        print(f"第{i+1}次调用: {result}")
    except Exception as e:
        print(f"第{i+1}次调用失败: {e}")

# 测试类型验证
try:
    user1 = create_user("Alice", 25, True)
    print(f"用户1: {user1}")
    
    user2 = create_user("Bob", "30", True)  # 错误的类型
except TypeError as e:
    print(f"类型错误: {e}")

# 运行异步示例
async def test_async():
    """测试异步装饰器"""
    print("\n=== 异步装饰器 ===")
    try:
        result = await async_unstable_function()
        print(f"异步结果: {result}")
    except Exception as e:
        print(f"异步最终失败: {e}")

# 如果在支持asyncio的环境中运行
if __name__ == "__main__":
    try:
        asyncio.run(test_async())
    except Exception as e:
        print(f"异步测试跳过: {e}")

运行装饰器示例:

python decorators.py

本章小结

本章我们深入学习了Python的函数和作用域:

  1. 函数基础:定义、调用、参数传递、函数作为对象
  2. 作用域和命名空间:LEGB规则、global/nonlocal关键字、作用域陷阱
  3. 闭包:概念、实现、实际应用
  4. 装饰器:基础装饰器、带参数装饰器、类装饰器、高级模式

下一章预告

下一章我们将学习《模块和包》,内容包括: - 模块的创建和导入 - 包的组织和管理 - Python标准库介绍 - 第三方库的使用 - 虚拟环境管理

练习题

基础练习

  1. 函数设计

    • 编写一个计算器函数,支持四则运算
    • 实现一个参数验证装饰器
    • 创建一个简单的缓存系统
  2. 闭包应用

    • 实现一个配置管理器
    • 创建一个事件监听器
    • 编写一个状态机

进阶练习

  1. 装饰器实战

    • 实现一个性能监控装饰器
    • 创建一个权限控制系统
    • 编写一个API限流装饰器
  2. 高级应用

    • 实现一个简单的ORM装饰器
    • 创建一个异步任务调度器
    • 编写一个代码执行时间分析器

提示:函数是代码复用的基础,装饰器是Python的强大特性。多练习不同场景下的应用,掌握函数式编程的思想。