学习目标
通过本章学习,你将掌握: - 异常的概念和类型 - try/except/finally语句的使用 - 自定义异常的创建 - 调试技巧和工具 - 日志记录的最佳实践 - 单元测试的编写
8.1 异常处理基础
异常的概念
# 常见异常示例
print("=== 常见异常类型 ===")
# 1. ZeroDivisionError - 除零错误
try:
result = 10 / 0
except ZeroDivisionError as e:
print(f"除零错误: {e}")
# 2. TypeError - 类型错误
try:
result = "hello" + 5
except TypeError as e:
print(f"类型错误: {e}")
# 3. ValueError - 值错误
try:
number = int("hello")
except ValueError as e:
print(f"值错误: {e}")
# 4. IndexError - 索引错误
try:
my_list = [1, 2, 3]
item = my_list[10]
except IndexError as e:
print(f"索引错误: {e}")
# 5. KeyError - 键错误
try:
my_dict = {'a': 1, 'b': 2}
value = my_dict['c']
except KeyError as e:
print(f"键错误: {e}")
# 6. FileNotFoundError - 文件未找到错误
try:
with open('nonexistent_file.txt', 'r') as f:
content = f.read()
except FileNotFoundError as e:
print(f"文件未找到: {e}")
# 7. AttributeError - 属性错误
try:
my_string = "hello"
my_string.nonexistent_method()
except AttributeError as e:
print(f"属性错误: {e}")
# 8. ImportError - 导入错误
try:
import nonexistent_module
except ImportError as e:
print(f"导入错误: {e}")
基本异常处理语法
# try/except 基本语法
def safe_divide(a, b):
"""安全除法函数"""
try:
result = a / b
return result
except ZeroDivisionError:
print("错误:不能除以零")
return None
except TypeError:
print("错误:参数必须是数字")
return None
print("\n=== 基本异常处理 ===")
print(f"10 / 2 = {safe_divide(10, 2)}")
print(f"10 / 0 = {safe_divide(10, 0)}")
print(f"'10' / 2 = {safe_divide('10', 2)}")
# 捕获多种异常
def safe_operation(operation, a, b):
"""安全运算函数"""
try:
if operation == 'add':
return a + b
elif operation == 'subtract':
return a - b
elif operation == 'multiply':
return a * b
elif operation == 'divide':
return a / b
else:
raise ValueError(f"不支持的运算: {operation}")
except (TypeError, ValueError) as e:
print(f"运算错误: {e}")
return None
except ZeroDivisionError:
print("错误:除数不能为零")
return None
except Exception as e:
print(f"未知错误: {e}")
return None
print("\n=== 多种异常处理 ===")
print(f"加法: {safe_operation('add', 5, 3)}")
print(f"除法: {safe_operation('divide', 10, 2)}")
print(f"除零: {safe_operation('divide', 10, 0)}")
print(f"类型错误: {safe_operation('add', '5', 3)}")
print(f"不支持的运算: {safe_operation('power', 2, 3)}")
# try/except/else/finally 完整语法
def read_file_safely(filename):
"""安全读取文件"""
file_handle = None
try:
print(f"尝试打开文件: {filename}")
file_handle = open(filename, 'r', encoding='utf-8')
content = file_handle.read()
print(f"文件读取成功,内容长度: {len(content)}")
return content
except FileNotFoundError:
print(f"文件不存在: {filename}")
return None
except PermissionError:
print(f"没有权限访问文件: {filename}")
return None
except UnicodeDecodeError:
print(f"文件编码错误: {filename}")
return None
else:
# 只有在没有异常时才执行
print("文件读取操作成功完成")
finally:
# 无论是否有异常都会执行
if file_handle and not file_handle.closed:
file_handle.close()
print("文件已关闭")
print("清理操作完成")
print("\n=== 完整异常处理语法 ===")
# 创建一个测试文件
with open('test_file.txt', 'w', encoding='utf-8') as f:
f.write("这是一个测试文件\n包含一些中文内容")
# 测试正常情况
content = read_file_safely('test_file.txt')
print()
# 测试文件不存在
content = read_file_safely('nonexistent.txt')
# 清理测试文件
import os
if os.path.exists('test_file.txt'):
os.remove('test_file.txt')
异常信息获取
import traceback
import sys
def detailed_exception_handling():
"""详细异常信息处理"""
def problematic_function(x, y):
"""有问题的函数"""
if x == 0:
raise ValueError("x 不能为零")
if y == 0:
raise ZeroDivisionError("y 不能为零")
return x / y
def another_function(a, b):
"""另一个函数"""
return problematic_function(a, b) * 2
try:
result = another_function(10, 0)
except Exception as e:
print("=== 异常信息详解 ===")
# 1. 异常类型和消息
print(f"异常类型: {type(e).__name__}")
print(f"异常消息: {str(e)}")
print(f"异常参数: {e.args}")
# 2. 异常发生的位置
exc_type, exc_value, exc_traceback = sys.exc_info()
print(f"\n异常发生在:")
print(f" 文件: {exc_traceback.tb_frame.f_code.co_filename}")
print(f" 函数: {exc_traceback.tb_frame.f_code.co_name}")
print(f" 行号: {exc_traceback.tb_lineno}")
# 3. 完整的调用栈
print("\n完整调用栈:")
traceback.print_exc()
# 4. 格式化的调用栈
print("\n格式化调用栈:")
tb_lines = traceback.format_exception(exc_type, exc_value, exc_traceback)
for line in tb_lines:
print(line.rstrip())
# 5. 只获取调用栈字符串
print("\n调用栈字符串:")
tb_str = ''.join(traceback.format_exception(exc_type, exc_value, exc_traceback))
print(tb_str)
detailed_exception_handling()
# 异常链
def exception_chaining():
"""异常链示例"""
def database_operation():
"""模拟数据库操作"""
raise ConnectionError("数据库连接失败")
def business_logic():
"""业务逻辑"""
try:
database_operation()
except ConnectionError as e:
# 抛出新异常,保留原异常信息
raise RuntimeError("业务操作失败") from e
def api_handler():
"""API处理器"""
try:
business_logic()
except RuntimeError as e:
print("=== 异常链 ===")
print(f"当前异常: {e}")
print(f"原始异常: {e.__cause__}")
# 打印完整的异常链
print("\n完整异常链:")
traceback.print_exc()
api_handler()
exception_chaining()
# 异常重新抛出
def exception_reraising():
"""异常重新抛出示例"""
def risky_operation(value):
"""有风险的操作"""
if value < 0:
raise ValueError("值不能为负数")
return value ** 0.5
def logged_operation(value):
"""带日志的操作"""
try:
result = risky_operation(value)
print(f"操作成功: sqrt({value}) = {result}")
return result
except ValueError as e:
print(f"记录错误: {e}")
# 重新抛出异常
raise
try:
logged_operation(16) # 正常情况
logged_operation(-4) # 异常情况
except ValueError as e:
print(f"最终捕获异常: {e}")
print("\n=== 异常重新抛出 ===")
exception_reraising()
运行异常处理基础示例:
python exception_basics.py
8.2 自定义异常
创建自定义异常类
# 基本自定义异常
class CustomError(Exception):
"""自定义异常基类"""
pass
class ValidationError(CustomError):
"""验证错误"""
def __init__(self, message, field=None, value=None):
super().__init__(message)
self.field = field
self.value = value
self.message = message
def __str__(self):
if self.field:
return f"验证错误 [{self.field}]: {self.message} (值: {self.value})"
return f"验证错误: {self.message}"
class BusinessLogicError(CustomError):
"""业务逻辑错误"""
def __init__(self, message, error_code=None):
super().__init__(message)
self.error_code = error_code
self.message = message
def __str__(self):
if self.error_code:
return f"业务错误 [{self.error_code}]: {self.message}"
return f"业务错误: {self.message}"
class ConfigurationError(CustomError):
"""配置错误"""
def __init__(self, message, config_key=None, config_file=None):
super().__init__(message)
self.config_key = config_key
self.config_file = config_file
self.message = message
def __str__(self):
parts = ["配置错误"]
if self.config_file:
parts.append(f"文件: {self.config_file}")
if self.config_key:
parts.append(f"键: {self.config_key}")
parts.append(self.message)
return " - ".join(parts)
print("=== 自定义异常示例 ===")
# 使用自定义异常
def validate_user_data(user_data):
"""验证用户数据"""
# 检查必需字段
required_fields = ['name', 'email', 'age']
for field in required_fields:
if field not in user_data:
raise ValidationError(f"缺少必需字段", field=field)
# 验证姓名
name = user_data['name']
if not isinstance(name, str) or len(name.strip()) == 0:
raise ValidationError("姓名不能为空", field='name', value=name)
if len(name) > 50:
raise ValidationError("姓名长度不能超过50个字符", field='name', value=name)
# 验证邮箱
email = user_data['email']
if not isinstance(email, str) or '@' not in email:
raise ValidationError("邮箱格式无效", field='email', value=email)
# 验证年龄
age = user_data['age']
if not isinstance(age, int):
raise ValidationError("年龄必须是整数", field='age', value=age)
if age < 0 or age > 150:
raise ValidationError("年龄必须在0-150之间", field='age', value=age)
return True
# 测试验证函数
test_users = [
{'name': 'Alice', 'email': 'alice@example.com', 'age': 25}, # 正常
{'name': '', 'email': 'bob@example.com', 'age': 30}, # 姓名为空
{'name': 'Charlie', 'email': 'invalid-email', 'age': 35}, # 邮箱无效
{'name': 'David', 'email': 'david@example.com', 'age': -5}, # 年龄无效
{'email': 'eve@example.com', 'age': 28}, # 缺少姓名
]
for i, user in enumerate(test_users):
try:
validate_user_data(user)
print(f"用户 {i+1}: 验证通过")
except ValidationError as e:
print(f"用户 {i+1}: {e}")
# 业务逻辑异常示例
class BankAccount:
"""银行账户类"""
def __init__(self, account_number, initial_balance=0):
self.account_number = account_number
self.balance = initial_balance
self.is_frozen = False
def withdraw(self, amount):
"""取款"""
if self.is_frozen:
raise BusinessLogicError(
"账户已冻结,无法进行取款操作",
error_code="ACCOUNT_FROZEN"
)
if amount <= 0:
raise ValidationError(
"取款金额必须大于零",
field="amount",
value=amount
)
if amount > self.balance:
raise BusinessLogicError(
f"余额不足,当前余额: {self.balance},尝试取款: {amount}",
error_code="INSUFFICIENT_FUNDS"
)
self.balance -= amount
return self.balance
def freeze_account(self):
"""冻结账户"""
self.is_frozen = True
print("\n=== 业务逻辑异常 ===")
account = BankAccount("123456789", 1000)
# 正常取款
try:
new_balance = account.withdraw(200)
print(f"取款成功,余额: {new_balance}")
except (ValidationError, BusinessLogicError) as e:
print(f"取款失败: {e}")
# 余额不足
try:
account.withdraw(2000)
except BusinessLogicError as e:
print(f"取款失败: {e}")
print(f"错误代码: {e.error_code}")
# 账户冻结
account.freeze_account()
try:
account.withdraw(100)
except BusinessLogicError as e:
print(f"取款失败: {e}")
print(f"错误代码: {e.error_code}")
# 配置错误示例
class ConfigManager:
"""配置管理器"""
def __init__(self, config_file):
self.config_file = config_file
self.config = {}
def load_config(self):
"""加载配置"""
try:
# 模拟配置文件不存在
if self.config_file == "missing.conf":
raise FileNotFoundError(f"配置文件不存在: {self.config_file}")
# 模拟配置内容
self.config = {
'database_url': 'localhost:5432',
'debug': True,
'max_connections': 100
}
except FileNotFoundError as e:
raise ConfigurationError(
"无法加载配置文件",
config_file=self.config_file
) from e
def get_config(self, key, required=True):
"""获取配置值"""
if key not in self.config:
if required:
raise ConfigurationError(
"配置项不存在",
config_key=key,
config_file=self.config_file
)
return None
return self.config[key]
def validate_config(self):
"""验证配置"""
required_keys = ['database_url', 'debug']
for key in required_keys:
if key not in self.config:
raise ConfigurationError(
f"缺少必需的配置项",
config_key=key,
config_file=self.config_file
)
# 验证数据库URL格式
db_url = self.config['database_url']
if ':' not in db_url:
raise ConfigurationError(
"数据库URL格式无效,应为 host:port",
config_key='database_url',
config_file=self.config_file
)
print("\n=== 配置错误示例 ===")
# 正常配置
try:
config_mgr = ConfigManager("app.conf")
config_mgr.load_config()
config_mgr.validate_config()
print(f"数据库URL: {config_mgr.get_config('database_url')}")
except ConfigurationError as e:
print(f"配置错误: {e}")
# 配置文件不存在
try:
config_mgr = ConfigManager("missing.conf")
config_mgr.load_config()
except ConfigurationError as e:
print(f"配置错误: {e}")
print(f"原始异常: {e.__cause__}")
异常层次结构
# 设计良好的异常层次结构
class ApplicationError(Exception):
"""应用程序异常基类"""
def __init__(self, message, error_code=None, details=None):
super().__init__(message)
self.message = message
self.error_code = error_code
self.details = details or {}
self.timestamp = self._get_timestamp()
def _get_timestamp(self):
"""获取时间戳"""
from datetime import datetime
return datetime.now().isoformat()
def to_dict(self):
"""转换为字典"""
return {
'error_type': self.__class__.__name__,
'message': self.message,
'error_code': self.error_code,
'details': self.details,
'timestamp': self.timestamp
}
def __str__(self):
parts = [self.message]
if self.error_code:
parts.append(f"(代码: {self.error_code})")
return " ".join(parts)
# 数据相关异常
class DataError(ApplicationError):
"""数据异常基类"""
pass
class ValidationError(DataError):
"""数据验证异常"""
def __init__(self, message, field=None, value=None, **kwargs):
super().__init__(message, **kwargs)
self.field = field
self.value = value
if field:
self.details.update({'field': field, 'value': str(value)})
class DataNotFoundError(DataError):
"""数据未找到异常"""
def __init__(self, message, resource_type=None, resource_id=None, **kwargs):
super().__init__(message, **kwargs)
self.resource_type = resource_type
self.resource_id = resource_id
if resource_type:
self.details.update({
'resource_type': resource_type,
'resource_id': str(resource_id)
})
class DataIntegrityError(DataError):
"""数据完整性异常"""
pass
# 业务逻辑异常
class BusinessError(ApplicationError):
"""业务逻辑异常基类"""
pass
class AuthenticationError(BusinessError):
"""认证异常"""
pass
class AuthorizationError(BusinessError):
"""授权异常"""
def __init__(self, message, required_permission=None, **kwargs):
super().__init__(message, **kwargs)
self.required_permission = required_permission
if required_permission:
self.details['required_permission'] = required_permission
class BusinessRuleError(BusinessError):
"""业务规则异常"""
def __init__(self, message, rule_name=None, **kwargs):
super().__init__(message, **kwargs)
self.rule_name = rule_name
if rule_name:
self.details['rule_name'] = rule_name
# 系统异常
class SystemError(ApplicationError):
"""系统异常基类"""
pass
class ExternalServiceError(SystemError):
"""外部服务异常"""
def __init__(self, message, service_name=None, status_code=None, **kwargs):
super().__init__(message, **kwargs)
self.service_name = service_name
self.status_code = status_code
if service_name:
self.details['service_name'] = service_name
if status_code:
self.details['status_code'] = status_code
class ConfigurationError(SystemError):
"""配置异常"""
def __init__(self, message, config_key=None, config_file=None, **kwargs):
super().__init__(message, **kwargs)
self.config_key = config_key
self.config_file = config_file
if config_key:
self.details['config_key'] = config_key
if config_file:
self.details['config_file'] = config_file
# 使用异常层次结构的示例
class UserService:
"""用户服务"""
def __init__(self):
self.users = {
1: {'name': 'Alice', 'email': 'alice@example.com', 'role': 'admin'},
2: {'name': 'Bob', 'email': 'bob@example.com', 'role': 'user'},
}
self.current_user = None
def authenticate(self, user_id, password):
"""用户认证"""
if user_id not in self.users:
raise AuthenticationError(
"用户不存在",
error_code="USER_NOT_FOUND",
details={'user_id': user_id}
)
# 模拟密码验证
if password != "correct_password":
raise AuthenticationError(
"密码错误",
error_code="INVALID_PASSWORD"
)
self.current_user = self.users[user_id]
return True
def get_user(self, user_id):
"""获取用户信息"""
if not self.current_user:
raise AuthenticationError(
"用户未登录",
error_code="NOT_AUTHENTICATED"
)
if user_id not in self.users:
raise DataNotFoundError(
"用户不存在",
resource_type="User",
resource_id=user_id,
error_code="USER_NOT_FOUND"
)
return self.users[user_id]
def delete_user(self, user_id):
"""删除用户"""
if not self.current_user:
raise AuthenticationError(
"用户未登录",
error_code="NOT_AUTHENTICATED"
)
if self.current_user['role'] != 'admin':
raise AuthorizationError(
"权限不足,需要管理员权限",
required_permission="admin",
error_code="INSUFFICIENT_PRIVILEGES"
)
if user_id not in self.users:
raise DataNotFoundError(
"用户不存在",
resource_type="User",
resource_id=user_id,
error_code="USER_NOT_FOUND"
)
if user_id == 1: # 假设用户1是系统管理员
raise BusinessRuleError(
"不能删除系统管理员",
rule_name="PROTECT_SYSTEM_ADMIN",
error_code="CANNOT_DELETE_ADMIN"
)
del self.users[user_id]
return True
def update_user(self, user_id, user_data):
"""更新用户信息"""
if not self.current_user:
raise AuthenticationError(
"用户未登录",
error_code="NOT_AUTHENTICATED"
)
if user_id not in self.users:
raise DataNotFoundError(
"用户不存在",
resource_type="User",
resource_id=user_id,
error_code="USER_NOT_FOUND"
)
# 验证数据
if 'email' in user_data:
email = user_data['email']
if not isinstance(email, str) or '@' not in email:
raise ValidationError(
"邮箱格式无效",
field="email",
value=email,
error_code="INVALID_EMAIL_FORMAT"
)
# 检查邮箱是否已存在
for uid, user in self.users.items():
if uid != user_id and user['email'] == email:
raise DataIntegrityError(
"邮箱已存在",
error_code="EMAIL_ALREADY_EXISTS",
details={'email': email, 'existing_user_id': uid}
)
self.users[user_id].update(user_data)
return self.users[user_id]
print("\n=== 异常层次结构示例 ===")
user_service = UserService()
# 测试各种异常情况
test_cases = [
# 认证测试
lambda: user_service.authenticate(999, "wrong_password"), # 用户不存在
lambda: user_service.authenticate(1, "wrong_password"), # 密码错误
lambda: user_service.get_user(1), # 未登录
# 正常登录
lambda: user_service.authenticate(1, "correct_password"), # 正常登录
# 数据操作测试
lambda: user_service.get_user(999), # 用户不存在
lambda: user_service.update_user(2, {'email': 'invalid'}), # 邮箱格式错误
lambda: user_service.update_user(2, {'email': 'alice@example.com'}), # 邮箱已存在
lambda: user_service.delete_user(1), # 不能删除管理员
]
for i, test_case in enumerate(test_cases):
try:
result = test_case()
print(f"测试 {i+1}: 成功 - {result}")
except ApplicationError as e:
print(f"测试 {i+1}: {e.__class__.__name__} - {e}")
print(f" 详细信息: {e.to_dict()}")
except Exception as e:
print(f"测试 {i+1}: 未知错误 - {e}")
运行自定义异常示例:
python custom_exceptions.py
8.3 调试技巧和工具
使用pdb调试器
import pdb
def debugging_example():
"""调试示例函数"""
def calculate_average(numbers):
"""计算平均值"""
total = 0
count = 0
for num in numbers:
# 设置断点
# pdb.set_trace() # 取消注释以启用调试
if isinstance(num, (int, float)):
total += num
count += 1
else:
print(f"跳过非数字值: {num}")
if count == 0:
raise ValueError("没有有效的数字")
average = total / count
return average
def process_data(data_list):
"""处理数据列表"""
results = []
for i, data in enumerate(data_list):
try:
# 在这里可以设置条件断点
if i == 2: # 在处理第3个元素时调试
# pdb.set_trace() # 取消注释以启用调试
pass
avg = calculate_average(data)
results.append({
'index': i,
'data': data,
'average': avg,
'status': 'success'
})
except Exception as e:
results.append({
'index': i,
'data': data,
'error': str(e),
'status': 'error'
})
return results
# 测试数据
test_data = [
[1, 2, 3, 4, 5], # 正常数据
[10, 20, 30], # 正常数据
[1, 'invalid', 3], # 包含无效数据
[], # 空列表
[100, 200, 300, 400] # 正常数据
]
print("=== 调试示例 ===")
results = process_data(test_data)
for result in results:
if result['status'] == 'success':
print(f"数据 {result['index']}: 平均值 = {result['average']:.2f}")
else:
print(f"数据 {result['index']}: 错误 - {result['error']}")
# 运行调试示例
debugging_example()
# 调试技巧和最佳实践
class DebugHelper:
"""调试辅助类"""
@staticmethod
def print_debug_info(func):
"""打印调试信息的装饰器"""
import functools
@functools.wraps(func)
def wrapper(*args, **kwargs):
print(f"\n=== 调试信息: {func.__name__} ===")
print(f"参数: args={args}, kwargs={kwargs}")
try:
result = func(*args, **kwargs)
print(f"返回值: {result}")
print(f"返回值类型: {type(result)}")
return result
except Exception as e:
print(f"异常: {type(e).__name__}: {e}")
import traceback
traceback.print_exc()
raise
finally:
print(f"=== 结束: {func.__name__} ===")
return wrapper
@staticmethod
def trace_calls(func):
"""跟踪函数调用的装饰器"""
import functools
@functools.wraps(func)
def wrapper(*args, **kwargs):
indent = getattr(wrapper, 'call_depth', 0) * ' '
print(f"{indent}-> 调用 {func.__name__}")
wrapper.call_depth = getattr(wrapper, 'call_depth', 0) + 1
try:
result = func(*args, **kwargs)
print(f"{indent}<- 返回 {func.__name__}: {result}")
return result
except Exception as e:
print(f"{indent}<- 异常 {func.__name__}: {e}")
raise
finally:
wrapper.call_depth -= 1
return wrapper
@staticmethod
def profile_performance(func):
"""性能分析装饰器"""
import functools
import time
import psutil
import os
@functools.wraps(func)
def wrapper(*args, **kwargs):
# 获取进程信息
process = psutil.Process(os.getpid())
# 记录开始状态
start_time = time.time()
start_memory = process.memory_info().rss / 1024 / 1024 # MB
try:
result = func(*args, **kwargs)
# 记录结束状态
end_time = time.time()
end_memory = process.memory_info().rss / 1024 / 1024 # MB
# 计算性能指标
duration = end_time - start_time
memory_delta = end_memory - start_memory
print(f"\n=== 性能分析: {func.__name__} ===")
print(f"执行时间: {duration:.4f} 秒")
print(f"内存使用: {start_memory:.2f} MB -> {end_memory:.2f} MB (变化: {memory_delta:+.2f} MB)")
print(f"CPU使用率: {process.cpu_percent()}%")
return result
except Exception as e:
end_time = time.time()
duration = end_time - start_time
print(f"\n=== 性能分析 (异常): {func.__name__} ===")
print(f"执行时间: {duration:.4f} 秒")
print(f"异常: {e}")
raise
return wrapper
# 使用调试装饰器的示例
class Calculator:
"""计算器类(用于演示调试)"""
@DebugHelper.print_debug_info
def add(self, a, b):
"""加法"""
return a + b
@DebugHelper.trace_calls
def factorial(self, n):
"""阶乘(递归实现)"""
if n <= 1:
return 1
return n * self.factorial(n - 1)
@DebugHelper.profile_performance
def heavy_calculation(self, n):
"""重计算(用于性能分析)"""
result = 0
for i in range(n):
result += i ** 2
return result
print("\n=== 调试装饰器示例 ===")
calc = Calculator()
# 测试调试信息装饰器
result = calc.add(5, 3)
# 测试调用跟踪装饰器
result = calc.factorial(5)
# 测试性能分析装饰器(需要安装psutil: pip install psutil)
try:
result = calc.heavy_calculation(100000)
except ImportError:
print("跳过性能分析(需要安装psutil)")
# 变量检查工具
def inspect_variables():
"""检查变量的工具函数"""
def analyze_variable(var, name="variable"):
"""分析变量"""
print(f"\n=== 变量分析: {name} ===")
print(f"值: {var}")
print(f"类型: {type(var)}")
print(f"大小: {len(var) if hasattr(var, '__len__') else 'N/A'}")
print(f"内存地址: {id(var)}")
if hasattr(var, '__dict__'):
print(f"属性: {list(var.__dict__.keys())}")
if hasattr(var, '__class__'):
print(f"类: {var.__class__.__name__}")
print(f"MRO: {[cls.__name__ for cls in var.__class__.__mro__]}")
# 对于容器类型,显示内容摘要
if isinstance(var, (list, tuple)):
print(f"前5个元素: {var[:5]}")
elif isinstance(var, dict):
keys = list(var.keys())[:5]
print(f"前5个键: {keys}")
elif isinstance(var, str):
print(f"前50个字符: {repr(var[:50])}")
# 测试不同类型的变量
test_vars = {
'integer': 42,
'string': "Hello, World!" * 10,
'list': list(range(100)),
'dict': {f'key_{i}': f'value_{i}' for i in range(20)},
'calculator': Calculator()
}
for name, var in test_vars.items():
analyze_variable(var, name)
print("\n=== 变量检查工具 ===")
inspect_variables()
日志记录
import logging
import sys
from datetime import datetime
import json
# 配置日志记录
def setup_logging():
"""设置日志配置"""
# 创建根日志记录器
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
# 清除现有的处理器
logger.handlers.clear()
# 创建格式化器
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(filename)s:%(lineno)d - %(message)s'
)
# 控制台处理器
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(logging.INFO)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
# 文件处理器
file_handler = logging.FileHandler('app.log', encoding='utf-8')
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
# 错误文件处理器
error_handler = logging.FileHandler('error.log', encoding='utf-8')
error_handler.setLevel(logging.ERROR)
error_handler.setFormatter(formatter)
logger.addHandler(error_handler)
return logger
# 自定义日志格式化器
class JSONFormatter(logging.Formatter):
"""JSON格式的日志格式化器"""
def format(self, record):
log_entry = {
'timestamp': datetime.fromtimestamp(record.created).isoformat(),
'level': record.levelname,
'logger': record.name,
'module': record.module,
'function': record.funcName,
'line': record.lineno,
'message': record.getMessage(),
}
# 添加异常信息
if record.exc_info:
log_entry['exception'] = self.formatException(record.exc_info)
# 添加额外字段
if hasattr(record, 'user_id'):
log_entry['user_id'] = record.user_id
if hasattr(record, 'request_id'):
log_entry['request_id'] = record.request_id
return json.dumps(log_entry, ensure_ascii=False)
# 日志记录示例类
class UserService:
"""用户服务(带日志记录)"""
def __init__(self):
self.logger = logging.getLogger(self.__class__.__name__)
self.users = {}
self.logger.info("用户服务初始化完成")
def create_user(self, user_data):
"""创建用户"""
user_id = user_data.get('id')
# 记录操作开始
self.logger.info(f"开始创建用户", extra={'user_id': user_id})
try:
# 验证用户数据
self._validate_user_data(user_data)
# 检查用户是否已存在
if user_id in self.users:
self.logger.warning(f"用户已存在", extra={'user_id': user_id})
raise ValueError(f"用户 {user_id} 已存在")
# 创建用户
self.users[user_id] = user_data.copy()
self.logger.info(f"用户创建成功", extra={'user_id': user_id})
return user_data
except Exception as e:
self.logger.error(f"用户创建失败: {e}", extra={'user_id': user_id}, exc_info=True)
raise
def get_user(self, user_id):
"""获取用户"""
self.logger.debug(f"查询用户", extra={'user_id': user_id})
if user_id not in self.users:
self.logger.warning(f"用户不存在", extra={'user_id': user_id})
return None
user = self.users[user_id]
self.logger.debug(f"用户查询成功", extra={'user_id': user_id})
return user
def update_user(self, user_id, update_data):
"""更新用户"""
self.logger.info(f"开始更新用户", extra={'user_id': user_id})
try:
if user_id not in self.users:
self.logger.error(f"尝试更新不存在的用户", extra={'user_id': user_id})
raise ValueError(f"用户 {user_id} 不存在")
# 记录更新前的数据
old_data = self.users[user_id].copy()
self.logger.debug(f"更新前数据: {old_data}", extra={'user_id': user_id})
# 更新数据
self.users[user_id].update(update_data)
# 记录更新后的数据
new_data = self.users[user_id]
self.logger.info(f"用户更新成功", extra={'user_id': user_id})
self.logger.debug(f"更新后数据: {new_data}", extra={'user_id': user_id})
return new_data
except Exception as e:
self.logger.error(f"用户更新失败: {e}", extra={'user_id': user_id}, exc_info=True)
raise
def delete_user(self, user_id):
"""删除用户"""
self.logger.info(f"开始删除用户", extra={'user_id': user_id})
try:
if user_id not in self.users:
self.logger.warning(f"尝试删除不存在的用户", extra={'user_id': user_id})
return False
# 记录被删除的用户数据
deleted_user = self.users[user_id]
self.logger.debug(f"删除的用户数据: {deleted_user}", extra={'user_id': user_id})
del self.users[user_id]
self.logger.info(f"用户删除成功", extra={'user_id': user_id})
return True
except Exception as e:
self.logger.error(f"用户删除失败: {e}", extra={'user_id': user_id}, exc_info=True)
raise
def _validate_user_data(self, user_data):
"""验证用户数据"""
required_fields = ['id', 'name', 'email']
for field in required_fields:
if field not in user_data:
self.logger.error(f"缺少必需字段: {field}")
raise ValueError(f"缺少必需字段: {field}")
# 验证邮箱格式
email = user_data['email']
if '@' not in email:
self.logger.error(f"邮箱格式无效: {email}")
raise ValueError(f"邮箱格式无效: {email}")
self.logger.debug("用户数据验证通过")
# 性能日志记录
class PerformanceLogger:
"""性能日志记录器"""
def __init__(self, logger_name="performance"):
self.logger = logging.getLogger(logger_name)
def log_function_performance(self, func):
"""记录函数性能的装饰器"""
import functools
import time
@functools.wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
try:
result = func(*args, **kwargs)
end_time = time.time()
duration = end_time - start_time
self.logger.info(
f"函数 {func.__name__} 执行完成",
extra={
'function': func.__name__,
'duration': duration,
'status': 'success',
'args_count': len(args),
'kwargs_count': len(kwargs)
}
)
return result
except Exception as e:
end_time = time.time()
duration = end_time - start_time
self.logger.error(
f"函数 {func.__name__} 执行失败",
extra={
'function': func.__name__,
'duration': duration,
'status': 'error',
'error': str(e),
'args_count': len(args),
'kwargs_count': len(kwargs)
},
exc_info=True
)
raise
return wrapper
# 设置日志并运行示例
print("=== 日志记录示例 ===")
# 设置基本日志
logger = setup_logging()
# 创建用户服务
user_service = UserService()
# 创建性能日志记录器
perf_logger = PerformanceLogger()
# 为用户服务方法添加性能日志
user_service.create_user = perf_logger.log_function_performance(user_service.create_user)
user_service.get_user = perf_logger.log_function_performance(user_service.get_user)
# 测试用户服务
test_users = [
{'id': 1, 'name': 'Alice', 'email': 'alice@example.com'},
{'id': 2, 'name': 'Bob', 'email': 'bob@example.com'},
{'id': 3, 'name': 'Charlie', 'email': 'invalid-email'}, # 无效邮箱
]
for user_data in test_users:
try:
user = user_service.create_user(user_data)
print(f"创建用户成功: {user['name']}")
except Exception as e:
print(f"创建用户失败: {e}")
# 查询用户
for user_id in [1, 2, 999]:
user = user_service.get_user(user_id)
if user:
print(f"找到用户: {user['name']}")
else:
print(f"用户 {user_id} 不存在")
# 更新用户
try:
updated_user = user_service.update_user(1, {'name': 'Alice Smith'})
print(f"更新用户成功: {updated_user['name']}")
except Exception as e:
print(f"更新用户失败: {e}")
# 删除用户
try:
success = user_service.delete_user(2)
print(f"删除用户: {'成功' if success else '失败'}")
except Exception as e:
print(f"删除用户失败: {e}")
print("\n日志文件已生成: app.log, error.log")
print("可以查看这些文件了解详细的日志记录")
运行调试和日志示例:
python debugging_logging.py
本章小结
本章我们深入学习了Python的异常处理和调试:
- 异常处理基础:异常类型、try/except语法、异常信息获取
- 自定义异常:异常类设计、异常层次结构、最佳实践
- 调试技巧:pdb调试器、调试装饰器、变量检查、性能分析
- 日志记录:日志配置、自定义格式化器、性能日志、最佳实践
下一章预告
下一章我们将学习《文件操作和I/O》,内容包括: - 文件的读写操作 - 文件路径处理 - CSV和JSON文件处理 - 二进制文件操作 - 网络I/O和异步I/O
练习题
基础练习
异常处理:
- 编写一个安全的文件读取函数
- 实现一个带重试机制的网络请求函数
- 创建一个数据验证器
自定义异常:
- 设计一个电商系统的异常体系
- 实现一个配置管理器的异常处理
- 创建一个API错误处理系统
进阶练习
调试工具:
- 实现一个代码执行时间分析器
- 创建一个内存使用监控器
- 编写一个函数调用跟踪器
日志系统:
- 实现一个分布式日志收集器
- 创建一个日志分析工具
- 设计一个实时日志监控系统
提示:良好的异常处理和调试技能是编写健壮Python程序的关键。学会使用各种调试工具和日志记录技术,能大大提高开发效率和代码质量。