学习目标
通过本章学习,你将掌握: - 函数的定义、调用和参数传递 - 作用域和命名空间的概念 - 装饰器和闭包的使用 - 模块和包的创建与使用 - Python标准库的常用模块 - 第三方库的安装和使用
6.1 函数基础
函数的定义和调用
# 基本函数定义
def greet(name):
"""问候函数"""
return f"Hello, {name}!"
# 函数调用
message = greet("Alice")
print(message) # 输出: Hello, Alice!
# 无参数函数
def get_current_time():
"""获取当前时间"""
from datetime import datetime
return datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print(f"当前时间: {get_current_time()}")
# 无返回值函数
def print_separator(char='=', length=50):
"""打印分隔符"""
print(char * length)
print_separator()
print_separator('-', 30)
# 多返回值函数
def get_name_age():
"""获取姓名和年龄"""
return "Alice", 25
name, age = get_name_age()
print(f"姓名: {name}, 年龄: {age}")
# 也可以作为元组接收
result = get_name_age()
print(f"结果: {result}")
print(f"类型: {type(result)}")
参数传递
# 1. 位置参数
def add(a, b, c):
"""三个数相加"""
return a + b + c
result = add(1, 2, 3)
print(f"1 + 2 + 3 = {result}")
# 2. 关键字参数
def create_person(name, age, city):
"""创建人员信息"""
return {
'name': name,
'age': age,
'city': city
}
# 使用关键字参数(顺序可以改变)
person1 = create_person(name="Alice", city="Beijing", age=25)
person2 = create_person("Bob", age=30, city="Shanghai")
print(f"Person1: {person1}")
print(f"Person2: {person2}")
# 3. 默认参数
def greet_with_title(name, title="Mr."):
"""带称谓的问候"""
return f"Hello, {title} {name}!"
print(greet_with_title("Smith")) # 使用默认值
print(greet_with_title("Johnson", "Dr.")) # 指定值
print(greet_with_title("Brown", title="Ms.")) # 关键字参数
# 4. 可变位置参数 (*args)
def sum_all(*numbers):
"""计算所有数字的和"""
total = 0
for num in numbers:
total += num
return total
print(f"sum_all(1, 2, 3) = {sum_all(1, 2, 3)}")
print(f"sum_all(1, 2, 3, 4, 5) = {sum_all(1, 2, 3, 4, 5)}")
print(f"sum_all() = {sum_all()}")
# 传递列表给*args
numbers = [1, 2, 3, 4, 5]
print(f"sum_all(*numbers) = {sum_all(*numbers)}")
# 5. 可变关键字参数 (**kwargs)
def create_profile(**info):
"""创建用户档案"""
profile = {}
for key, value in info.items():
profile[key] = value
return profile
profile1 = create_profile(name="Alice", age=25, city="Beijing")
profile2 = create_profile(name="Bob", occupation="Engineer", salary=50000)
print(f"Profile1: {profile1}")
print(f"Profile2: {profile2}")
# 传递字典给**kwargs
user_data = {'name': 'Charlie', 'age': 35, 'email': 'charlie@example.com'}
profile3 = create_profile(**user_data)
print(f"Profile3: {profile3}")
# 6. 混合参数类型
def complex_function(required_arg, default_arg="default", *args, **kwargs):
"""复杂参数函数"""
print(f"必需参数: {required_arg}")
print(f"默认参数: {default_arg}")
print(f"位置参数: {args}")
print(f"关键字参数: {kwargs}")
return f"处理完成"
print("=== 复杂函数调用 ===")
result = complex_function(
"必需值",
"自定义默认值",
1, 2, 3,
name="Alice",
age=25
)
print(f"返回值: {result}")
# 7. 强制关键字参数 (Python 3+)
def create_user(name, *, age, email):
"""创建用户(age和email必须使用关键字参数)"""
return {
'name': name,
'age': age,
'email': email
}
# 正确调用
user = create_user("Alice", age=25, email="alice@example.com")
print(f"用户: {user}")
# 错误调用(会报错)
# user = create_user("Alice", 25, "alice@example.com") # TypeError
# 8. 仅位置参数 (Python 3.8+)
def divide(dividend, divisor, /):
"""除法函数(参数只能按位置传递)"""
if divisor == 0:
return "除数不能为零"
return dividend / divisor
# 正确调用
result = divide(10, 2)
print(f"10 / 2 = {result}")
# 错误调用(会报错)
# result = divide(dividend=10, divisor=2) # TypeError
函数作为对象
# 函数是第一类对象
def square(x):
"""计算平方"""
return x ** 2
def cube(x):
"""计算立方"""
return x ** 3
# 函数可以赋值给变量
my_function = square
print(f"my_function(5) = {my_function(5)}")
# 函数可以作为参数传递
def apply_function(func, value):
"""应用函数到值"""
return func(value)
result1 = apply_function(square, 4)
result2 = apply_function(cube, 3)
print(f"square(4) = {result1}")
print(f"cube(3) = {result2}")
# 函数可以存储在数据结构中
operations = {
'square': square,
'cube': cube,
'double': lambda x: x * 2
}
for name, func in operations.items():
result = func(3)
print(f"{name}(3) = {result}")
# 函数可以作为返回值
def get_operation(op_type):
"""根据类型返回对应的操作函数"""
if op_type == 'square':
return square
elif op_type == 'cube':
return cube
else:
return lambda x: x
op = get_operation('square')
print(f"动态获取的函数: {op(6)}")
# 高阶函数示例
def create_multiplier(factor):
"""创建乘法器函数"""
def multiplier(x):
return x * factor
return multiplier
# 创建不同的乘法器
double = create_multiplier(2)
triple = create_multiplier(3)
print(f"double(5) = {double(5)}")
print(f"triple(5) = {triple(5)}")
# 内置高阶函数
numbers = [1, 2, 3, 4, 5]
# map: 对每个元素应用函数
squared = list(map(square, numbers))
print(f"原数组: {numbers}")
print(f"平方后: {squared}")
# filter: 过滤元素
def is_even(x):
return x % 2 == 0
even_numbers = list(filter(is_even, numbers))
print(f"偶数: {even_numbers}")
# 使用lambda表达式
odd_numbers = list(filter(lambda x: x % 2 == 1, numbers))
print(f"奇数: {odd_numbers}")
# reduce: 累积操作
from functools import reduce
product = reduce(lambda x, y: x * y, numbers)
print(f"所有数字的乘积: {product}")
# 排序中使用函数
students = [
{'name': 'Alice', 'grade': 85},
{'name': 'Bob', 'grade': 90},
{'name': 'Charlie', 'grade': 78}
]
# 按成绩排序
sorted_by_grade = sorted(students, key=lambda s: s['grade'])
print("按成绩排序:")
for student in sorted_by_grade:
print(f" {student['name']}: {student['grade']}")
# 按姓名排序
sorted_by_name = sorted(students, key=lambda s: s['name'])
print("按姓名排序:")
for student in sorted_by_name:
print(f" {student['name']}: {student['grade']}")
运行函数基础示例:
python function_basics.py
6.2 作用域和命名空间
作用域规则 (LEGB)
# LEGB规则:Local -> Enclosing -> Global -> Built-in
# 全局变量
global_var = "我是全局变量"
def outer_function():
# 封闭作用域变量
enclosing_var = "我是封闭作用域变量"
def inner_function():
# 局部变量
local_var = "我是局部变量"
print(f"局部变量: {local_var}")
print(f"封闭变量: {enclosing_var}")
print(f"全局变量: {global_var}")
print(f"内置函数: {len([1, 2, 3])}")
inner_function()
print("=== LEGB规则演示 ===")
outer_function()
# 变量遮蔽
def variable_shadowing():
global_var = "局部的global_var" # 遮蔽全局变量
print(f"函数内的global_var: {global_var}")
print(f"\n函数外的global_var: {global_var}")
variable_shadowing()
print(f"函数调用后的global_var: {global_var}")
# global关键字
counter = 0
def increment():
global counter # 声明使用全局变量
counter += 1
print(f"计数器: {counter}")
print("\n=== global关键字 ===")
print(f"初始计数器: {counter}")
increment()
increment()
increment()
# nonlocal关键字
def create_counter():
count = 0
def increment():
nonlocal count # 声明使用封闭作用域变量
count += 1
return count
def decrement():
nonlocal count
count -= 1
return count
def get_count():
return count
return increment, decrement, get_count
print("\n=== nonlocal关键字 ===")
inc, dec, get = create_counter()
print(f"初始值: {get()}")
print(f"递增: {inc()}")
print(f"递增: {inc()}")
print(f"递减: {dec()}")
print(f"当前值: {get()}")
# 命名空间查看
def namespace_demo():
local_var = "局部变量"
print("局部命名空间:")
print(locals())
print("\n全局命名空间的部分内容:")
global_ns = globals()
for key in ['global_var', 'counter', '__name__']:
if key in global_ns:
print(f" {key}: {global_ns[key]}")
print("\n=== 命名空间演示 ===")
namespace_demo()
# 内置命名空间
print("\n=== 内置命名空间 ===")
import builtins
print(f"内置函数数量: {len(dir(builtins))}")
print("部分内置函数:", [name for name in dir(builtins) if not name.startswith('_')][:10])
# 作用域陷阱
print("\n=== 作用域陷阱 ===")
# 陷阱1:循环变量的作用域
functions = []
for i in range(3):
functions.append(lambda: i) # 错误:所有lambda都引用同一个i
print("错误的闭包:")
for func in functions:
print(f" 结果: {func()}") # 都输出2
# 正确的做法
functions_correct = []
for i in range(3):
functions_correct.append(lambda x=i: x) # 正确:使用默认参数捕获当前值
print("正确的闭包:")
for func in functions_correct:
print(f" 结果: {func()}") # 输出0, 1, 2
# 陷阱2:可变默认参数
def append_to_list(item, target_list=[]): # 危险!
target_list.append(item)
return target_list
print("\n可变默认参数陷阱:")
list1 = append_to_list(1)
list2 = append_to_list(2)
print(f"list1: {list1}") # [1, 2]
print(f"list2: {list2}") # [1, 2] - 意外!
# 正确的做法
def append_to_list_correct(item, target_list=None):
if target_list is None:
target_list = []
target_list.append(item)
return target_list
print("正确的可变默认参数:")
list3 = append_to_list_correct(1)
list4 = append_to_list_correct(2)
print(f"list3: {list3}") # [1]
print(f"list4: {list4}") # [2]
# 作用域的实际应用
class ConfigManager:
"""配置管理器"""
_instance = None
_config = {}
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
@classmethod
def set_config(cls, key, value):
cls._config[key] = value
@classmethod
def get_config(cls, key, default=None):
return cls._config.get(key, default)
@classmethod
def get_all_config(cls):
return cls._config.copy()
# 使用配置管理器
print("\n=== 配置管理器示例 ===")
config1 = ConfigManager()
config2 = ConfigManager()
print(f"是否为同一实例: {config1 is config2}")
ConfigManager.set_config('database_url', 'localhost:5432')
ConfigManager.set_config('debug', True)
print(f"数据库URL: {ConfigManager.get_config('database_url')}")
print(f"调试模式: {ConfigManager.get_config('debug')}")
print(f"所有配置: {ConfigManager.get_all_config()}")
闭包详解
# 闭包的基本概念
def outer_function(x):
"""外部函数"""
def inner_function(y):
"""内部函数 - 这是一个闭包"""
return x + y # 访问外部函数的变量
return inner_function
print("=== 闭包基础 ===")
# 创建闭包
add_10 = outer_function(10)
add_20 = outer_function(20)
print(f"add_10(5) = {add_10(5)}") # 15
print(f"add_20(5) = {add_20(5)}") # 25
# 检查闭包
print(f"add_10是否为闭包: {add_10.__closure__ is not None}")
print(f"闭包变量: {add_10.__closure__[0].cell_contents}")
# 实用的闭包示例
def create_validator(min_length, max_length):
"""创建字符串长度验证器"""
def validate(text):
"""验证字符串长度"""
length = len(text)
if length < min_length:
return False, f"长度不能少于{min_length}个字符"
elif length > max_length:
return False, f"长度不能超过{max_length}个字符"
else:
return True, "验证通过"
return validate
print("\n=== 验证器闭包 ===")
# 创建不同的验证器
username_validator = create_validator(3, 20)
password_validator = create_validator(8, 50)
# 测试用户名验证
test_usernames = ["ab", "alice", "very_long_username_that_exceeds_limit"]
for username in test_usernames:
is_valid, message = username_validator(username)
print(f"用户名 '{username}': {message}")
print()
# 测试密码验证
test_passwords = ["123", "password123", "very_long_password_that_definitely_exceeds_the_maximum_limit"]
for password in test_passwords:
is_valid, message = password_validator(password)
print(f"密码 '{password}': {message}")
# 闭包实现计数器
def create_counter(initial=0, step=1):
"""创建计数器闭包"""
count = initial
def counter():
nonlocal count
count += step
return count
def reset():
nonlocal count
count = initial
def get_current():
return count
# 返回多个函数
counter.reset = reset
counter.get_current = get_current
return counter
print("\n=== 计数器闭包 ===")
counter1 = create_counter(0, 1)
counter2 = create_counter(100, 5)
print(f"counter1: {counter1()}")
print(f"counter1: {counter1()}")
print(f"counter1当前值: {counter1.get_current()}")
print(f"counter2: {counter2()}")
print(f"counter2: {counter2()}")
print(f"counter2当前值: {counter2.get_current()}")
counter1.reset()
print(f"counter1重置后: {counter1.get_current()}")
# 闭包实现缓存
def create_cache(max_size=100):
"""创建缓存闭包"""
cache = {}
access_order = []
def get(key):
"""获取缓存值"""
if key in cache:
# 更新访问顺序
access_order.remove(key)
access_order.append(key)
return cache[key]
return None
def set(key, value):
"""设置缓存值"""
if key in cache:
# 更新现有值
access_order.remove(key)
elif len(cache) >= max_size:
# 删除最久未使用的项
oldest_key = access_order.pop(0)
del cache[oldest_key]
cache[key] = value
access_order.append(key)
def clear():
"""清空缓存"""
cache.clear()
access_order.clear()
def size():
"""获取缓存大小"""
return len(cache)
def keys():
"""获取所有键"""
return list(cache.keys())
# 创建缓存对象
cache_obj = type('Cache', (), {
'get': get,
'set': set,
'clear': clear,
'size': size,
'keys': keys
})()
return cache_obj
print("\n=== 缓存闭包 ===")
cache = create_cache(max_size=3)
# 添加缓存项
cache.set('user:1', {'name': 'Alice', 'age': 25})
cache.set('user:2', {'name': 'Bob', 'age': 30})
cache.set('user:3', {'name': 'Charlie', 'age': 35})
print(f"缓存大小: {cache.size()}")
print(f"缓存键: {cache.keys()}")
# 访问缓存
user1 = cache.get('user:1')
print(f"用户1: {user1}")
# 添加新项(会删除最久未使用的)
cache.set('user:4', {'name': 'David', 'age': 28})
print(f"添加user:4后的键: {cache.keys()}")
# 装饰器形式的闭包
def memoize(func):
"""记忆化装饰器"""
cache = {}
def wrapper(*args, **kwargs):
# 创建缓存键
key = str(args) + str(sorted(kwargs.items()))
if key not in cache:
cache[key] = func(*args, **kwargs)
print(f"计算并缓存: {func.__name__}{args}")
else:
print(f"从缓存获取: {func.__name__}{args}")
return cache[key]
wrapper.cache = cache
wrapper.clear_cache = lambda: cache.clear()
return wrapper
@memoize
def fibonacci(n):
"""斐波那契数列(递归实现)"""
if n <= 1:
return n
return fibonacci(n-1) + fibonacci(n-2)
print("\n=== 记忆化装饰器 ===")
print(f"fibonacci(10) = {fibonacci(10)}")
print(f"fibonacci(10) = {fibonacci(10)}") # 第二次调用会使用缓存
print(f"缓存大小: {len(fibonacci.cache)}")
运行作用域和闭包示例:
python scope_closure.py
6.3 装饰器
装饰器基础
import time
import functools
from datetime import datetime
# 简单装饰器
def simple_decorator(func):
"""简单装饰器示例"""
def wrapper():
print("函数执行前")
result = func()
print("函数执行后")
return result
return wrapper
@simple_decorator
def say_hello():
"""问候函数"""
print("Hello, World!")
return "greeting_done"
print("=== 简单装饰器 ===")
result = say_hello()
print(f"返回值: {result}")
# 带参数的装饰器
def timing_decorator(func):
"""计时装饰器"""
@functools.wraps(func) # 保持原函数的元数据
def wrapper(*args, **kwargs):
start_time = time.time()
result = func(*args, **kwargs)
end_time = time.time()
print(f"{func.__name__} 执行时间: {end_time - start_time:.4f}秒")
return result
return wrapper
@timing_decorator
def slow_function(n):
"""模拟耗时函数"""
total = 0
for i in range(n):
total += i ** 2
return total
print("\n=== 计时装饰器 ===")
result = slow_function(100000)
print(f"计算结果: {result}")
# 带参数的装饰器
def retry(max_attempts=3, delay=1):
"""重试装饰器"""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_attempts):
try:
return func(*args, **kwargs)
except Exception as e:
if attempt == max_attempts - 1:
print(f"函数 {func.__name__} 在 {max_attempts} 次尝试后仍然失败")
raise e
else:
print(f"第 {attempt + 1} 次尝试失败: {e},{delay}秒后重试...")
time.sleep(delay)
return wrapper
return decorator
# 模拟不稳定的函数
import random
@retry(max_attempts=3, delay=0.5)
def unstable_function():
"""不稳定的函数"""
if random.random() < 0.7: # 70%的概率失败
raise Exception("随机失败")
return "成功执行"
print("\n=== 重试装饰器 ===")
try:
result = unstable_function()
print(f"最终结果: {result}")
except Exception as e:
print(f"最终失败: {e}")
# 日志装饰器
def log_calls(log_args=True, log_result=True):
"""日志装饰器"""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
# 记录函数调用
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
log_msg = f"[{timestamp}] 调用 {func.__name__}"
if log_args and (args or kwargs):
log_msg += f" 参数: args={args}, kwargs={kwargs}"
print(log_msg)
# 执行函数
try:
result = func(*args, **kwargs)
if log_result:
print(f"[{timestamp}] {func.__name__} 返回: {result}")
return result
except Exception as e:
print(f"[{timestamp}] {func.__name__} 异常: {e}")
raise
return wrapper
return decorator
@log_calls(log_args=True, log_result=True)
def calculate_area(length, width):
"""计算矩形面积"""
return length * width
@log_calls(log_args=False, log_result=True)
def get_user_info(user_id):
"""获取用户信息"""
# 模拟数据库查询
users = {
1: {'name': 'Alice', 'email': 'alice@example.com'},
2: {'name': 'Bob', 'email': 'bob@example.com'}
}
return users.get(user_id, None)
print("\n=== 日志装饰器 ===")
area = calculate_area(5, 3)
user = get_user_info(1)
# 缓存装饰器
def cache_result(max_size=128):
"""结果缓存装饰器"""
def decorator(func):
cache = {}
access_order = []
@functools.wraps(func)
def wrapper(*args, **kwargs):
# 创建缓存键
key = str(args) + str(sorted(kwargs.items()))
if key in cache:
# 缓存命中
access_order.remove(key)
access_order.append(key)
print(f"缓存命中: {func.__name__}{args}")
return cache[key]
# 缓存未命中,执行函数
result = func(*args, **kwargs)
# 添加到缓存
if len(cache) >= max_size:
# 删除最久未使用的项
oldest_key = access_order.pop(0)
del cache[oldest_key]
cache[key] = result
access_order.append(key)
print(f"缓存存储: {func.__name__}{args}")
return result
# 添加缓存管理方法
wrapper.cache_info = lambda: {'size': len(cache), 'keys': list(cache.keys())}
wrapper.cache_clear = lambda: (cache.clear(), access_order.clear())
return wrapper
return decorator
@cache_result(max_size=5)
def expensive_calculation(n):
"""昂贵的计算"""
print(f"正在计算 {n} 的阶乘...")
result = 1
for i in range(1, n + 1):
result *= i
return result
print("\n=== 缓存装饰器 ===")
print(f"5! = {expensive_calculation(5)}")
print(f"3! = {expensive_calculation(3)}")
print(f"5! = {expensive_calculation(5)}") # 缓存命中
print(f"7! = {expensive_calculation(7)}")
print(f"缓存信息: {expensive_calculation.cache_info()}")
# 权限检查装饰器
def require_permission(permission):
"""权限检查装饰器"""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
# 模拟当前用户(实际应用中可能从session或token获取)
current_user = getattr(wrapper, 'current_user', None)
if not current_user:
raise PermissionError("用户未登录")
user_permissions = current_user.get('permissions', [])
if permission not in user_permissions:
raise PermissionError(f"用户缺少权限: {permission}")
return func(*args, **kwargs)
return wrapper
return decorator
@require_permission('admin')
def delete_user(user_id):
"""删除用户(需要管理员权限)"""
return f"用户 {user_id} 已删除"
@require_permission('read')
def view_user(user_id):
"""查看用户(需要读取权限)"""
return f"用户 {user_id} 的信息"
print("\n=== 权限检查装饰器 ===")
# 设置当前用户
delete_user.current_user = {'id': 1, 'name': 'admin', 'permissions': ['read', 'write', 'admin']}
view_user.current_user = {'id': 2, 'name': 'user', 'permissions': ['read']}
try:
result = delete_user(123)
print(f"删除结果: {result}")
except PermissionError as e:
print(f"权限错误: {e}")
try:
result = view_user(123)
print(f"查看结果: {result}")
except PermissionError as e:
print(f"权限错误: {e}")
# 类装饰器
class CountCalls:
"""计数装饰器类"""
def __init__(self, func):
self.func = func
self.count = 0
functools.update_wrapper(self, func)
def __call__(self, *args, **kwargs):
self.count += 1
print(f"{self.func.__name__} 被调用了 {self.count} 次")
return self.func(*args, **kwargs)
def reset_count(self):
"""重置计数"""
self.count = 0
@CountCalls
def greet(name):
"""问候函数"""
return f"Hello, {name}!"
print("\n=== 类装饰器 ===")
print(greet("Alice"))
print(greet("Bob"))
print(greet("Charlie"))
greet.reset_count()
print(greet("David"))
# 多个装饰器的组合
@log_calls(log_args=True, log_result=False)
@timing_decorator
@cache_result(max_size=3)
def complex_function(x, y):
"""复杂函数(应用多个装饰器)"""
time.sleep(0.1) # 模拟耗时操作
return x ** y
print("\n=== 多装饰器组合 ===")
result1 = complex_function(2, 3)
result2 = complex_function(3, 2)
result3 = complex_function(2, 3) # 缓存命中
高级装饰器模式
import asyncio
import threading
import queue
from contextlib import contextmanager
# 单例装饰器
def singleton(cls):
"""单例装饰器"""
instances = {}
lock = threading.Lock()
def get_instance(*args, **kwargs):
if cls not in instances:
with lock:
if cls not in instances:
instances[cls] = cls(*args, **kwargs)
return instances[cls]
return get_instance
@singleton
class DatabaseConnection:
"""数据库连接类"""
def __init__(self):
self.connection_id = id(self)
print(f"创建数据库连接: {self.connection_id}")
def query(self, sql):
return f"执行查询: {sql} (连接ID: {self.connection_id})"
print("=== 单例装饰器 ===")
db1 = DatabaseConnection()
db2 = DatabaseConnection()
print(f"db1 is db2: {db1 is db2}")
print(db1.query("SELECT * FROM users"))
# 属性装饰器
def property_decorator(func):
"""属性装饰器"""
attr_name = f"_{func.__name__}"
def getter(self):
return getattr(self, attr_name, None)
def setter(self, value):
if func(self, value):
setattr(self, attr_name, value)
else:
raise ValueError(f"无效的值: {value}")
return property(getter, setter)
class Person:
"""人员类"""
@property_decorator
def age(self, value):
"""年龄验证"""
return isinstance(value, int) and 0 <= value <= 150
@property_decorator
def email(self, value):
"""邮箱验证"""
return isinstance(value, str) and '@' in value
print("\n=== 属性装饰器 ===")
person = Person()
try:
person.age = 25
person.email = "alice@example.com"
print(f"年龄: {person.age}, 邮箱: {person.email}")
person.age = -5 # 会抛出异常
except ValueError as e:
print(f"验证错误: {e}")
# 异步装饰器
def async_retry(max_attempts=3, delay=1):
"""异步重试装饰器"""
def decorator(func):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
for attempt in range(max_attempts):
try:
return await func(*args, **kwargs)
except Exception as e:
if attempt == max_attempts - 1:
print(f"异步函数 {func.__name__} 在 {max_attempts} 次尝试后仍然失败")
raise e
else:
print(f"第 {attempt + 1} 次尝试失败: {e},{delay}秒后重试...")
await asyncio.sleep(delay)
return wrapper
return decorator
@async_retry(max_attempts=3, delay=0.5)
async def async_unstable_function():
"""异步不稳定函数"""
import random
if random.random() < 0.7:
raise Exception("异步随机失败")
return "异步成功执行"
# 上下文管理装饰器
def with_context(context_manager):
"""上下文管理装饰器"""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
with context_manager:
return func(*args, **kwargs)
return wrapper
return decorator
@contextmanager
def database_transaction():
"""数据库事务上下文管理器"""
print("开始事务")
try:
yield
print("提交事务")
except Exception as e:
print(f"回滚事务: {e}")
raise
@with_context(database_transaction())
def update_user_data(user_id, data):
"""更新用户数据"""
print(f"更新用户 {user_id}: {data}")
# 模拟可能的异常
if user_id < 0:
raise ValueError("无效的用户ID")
return "更新成功"
print("\n=== 上下文管理装饰器 ===")
try:
result = update_user_data(1, {'name': 'Alice'})
print(f"结果: {result}")
except Exception as e:
print(f"错误: {e}")
try:
result = update_user_data(-1, {'name': 'Bob'})
except Exception as e:
print(f"错误: {e}")
# 装饰器工厂
class DecoratorFactory:
"""装饰器工厂类"""
@staticmethod
def rate_limit(max_calls, time_window):
"""速率限制装饰器"""
def decorator(func):
calls = queue.Queue(maxsize=max_calls)
lock = threading.Lock()
@functools.wraps(func)
def wrapper(*args, **kwargs):
current_time = time.time()
with lock:
# 清理过期的调用记录
while not calls.empty():
if current_time - calls.queue[0] > time_window:
calls.get()
else:
break
# 检查是否超过限制
if calls.full():
raise Exception(f"速率限制: 在{time_window}秒内最多调用{max_calls}次")
calls.put(current_time)
return func(*args, **kwargs)
return wrapper
return decorator
@staticmethod
def validate_types(**type_checks):
"""类型验证装饰器"""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
# 获取函数签名
import inspect
sig = inspect.signature(func)
bound_args = sig.bind(*args, **kwargs)
bound_args.apply_defaults()
# 验证类型
for param_name, expected_type in type_checks.items():
if param_name in bound_args.arguments:
value = bound_args.arguments[param_name]
if not isinstance(value, expected_type):
raise TypeError(
f"参数 {param_name} 期望类型 {expected_type.__name__}, "
f"实际类型 {type(value).__name__}"
)
return func(*args, **kwargs)
return wrapper
return decorator
@DecoratorFactory.rate_limit(max_calls=3, time_window=5)
def api_call(endpoint):
"""API调用函数"""
return f"调用API: {endpoint}"
@DecoratorFactory.validate_types(name=str, age=int, active=bool)
def create_user(name, age, active=True):
"""创建用户"""
return {'name': name, 'age': age, 'active': active}
print("\n=== 装饰器工厂 ===")
# 测试速率限制
for i in range(5):
try:
result = api_call(f"/users/{i}")
print(f"第{i+1}次调用: {result}")
except Exception as e:
print(f"第{i+1}次调用失败: {e}")
# 测试类型验证
try:
user1 = create_user("Alice", 25, True)
print(f"用户1: {user1}")
user2 = create_user("Bob", "30", True) # 错误的类型
except TypeError as e:
print(f"类型错误: {e}")
# 运行异步示例
async def test_async():
"""测试异步装饰器"""
print("\n=== 异步装饰器 ===")
try:
result = await async_unstable_function()
print(f"异步结果: {result}")
except Exception as e:
print(f"异步最终失败: {e}")
# 如果在支持asyncio的环境中运行
if __name__ == "__main__":
try:
asyncio.run(test_async())
except Exception as e:
print(f"异步测试跳过: {e}")
运行装饰器示例:
python decorators.py
本章小结
本章我们深入学习了Python的函数和作用域:
- 函数基础:定义、调用、参数传递、函数作为对象
- 作用域和命名空间:LEGB规则、global/nonlocal关键字、作用域陷阱
- 闭包:概念、实现、实际应用
- 装饰器:基础装饰器、带参数装饰器、类装饰器、高级模式
下一章预告
下一章我们将学习《模块和包》,内容包括: - 模块的创建和导入 - 包的组织和管理 - Python标准库介绍 - 第三方库的使用 - 虚拟环境管理
练习题
基础练习
函数设计:
- 编写一个计算器函数,支持四则运算
- 实现一个参数验证装饰器
- 创建一个简单的缓存系统
闭包应用:
- 实现一个配置管理器
- 创建一个事件监听器
- 编写一个状态机
进阶练习
装饰器实战:
- 实现一个性能监控装饰器
- 创建一个权限控制系统
- 编写一个API限流装饰器
高级应用:
- 实现一个简单的ORM装饰器
- 创建一个异步任务调度器
- 编写一个代码执行时间分析器
提示:函数是代码复用的基础,装饰器是Python的强大特性。多练习不同场景下的应用,掌握函数式编程的思想。