学习目标

通过本章学习,你将掌握: - 异常的概念和类型 - try/except/finally语句的使用 - 自定义异常的创建 - 调试技巧和工具 - 日志记录的最佳实践 - 单元测试的编写

8.1 异常处理基础

异常的概念

# 常见异常示例
print("=== 常见异常类型 ===")

# 1. ZeroDivisionError - 除零错误
try:
    result = 10 / 0
except ZeroDivisionError as e:
    print(f"除零错误: {e}")

# 2. TypeError - 类型错误
try:
    result = "hello" + 5
except TypeError as e:
    print(f"类型错误: {e}")

# 3. ValueError - 值错误
try:
    number = int("hello")
except ValueError as e:
    print(f"值错误: {e}")

# 4. IndexError - 索引错误
try:
    my_list = [1, 2, 3]
    item = my_list[10]
except IndexError as e:
    print(f"索引错误: {e}")

# 5. KeyError - 键错误
try:
    my_dict = {'a': 1, 'b': 2}
    value = my_dict['c']
except KeyError as e:
    print(f"键错误: {e}")

# 6. FileNotFoundError - 文件未找到错误
try:
    with open('nonexistent_file.txt', 'r') as f:
        content = f.read()
except FileNotFoundError as e:
    print(f"文件未找到: {e}")

# 7. AttributeError - 属性错误
try:
    my_string = "hello"
    my_string.nonexistent_method()
except AttributeError as e:
    print(f"属性错误: {e}")

# 8. ImportError - 导入错误
try:
    import nonexistent_module
except ImportError as e:
    print(f"导入错误: {e}")

基本异常处理语法

# try/except 基本语法
def safe_divide(a, b):
    """安全除法函数"""
    try:
        result = a / b
        return result
    except ZeroDivisionError:
        print("错误:不能除以零")
        return None
    except TypeError:
        print("错误:参数必须是数字")
        return None

print("\n=== 基本异常处理 ===")
print(f"10 / 2 = {safe_divide(10, 2)}")
print(f"10 / 0 = {safe_divide(10, 0)}")
print(f"'10' / 2 = {safe_divide('10', 2)}")

# 捕获多种异常
def safe_operation(operation, a, b):
    """安全运算函数"""
    try:
        if operation == 'add':
            return a + b
        elif operation == 'subtract':
            return a - b
        elif operation == 'multiply':
            return a * b
        elif operation == 'divide':
            return a / b
        else:
            raise ValueError(f"不支持的运算: {operation}")
    
    except (TypeError, ValueError) as e:
        print(f"运算错误: {e}")
        return None
    except ZeroDivisionError:
        print("错误:除数不能为零")
        return None
    except Exception as e:
        print(f"未知错误: {e}")
        return None

print("\n=== 多种异常处理 ===")
print(f"加法: {safe_operation('add', 5, 3)}")
print(f"除法: {safe_operation('divide', 10, 2)}")
print(f"除零: {safe_operation('divide', 10, 0)}")
print(f"类型错误: {safe_operation('add', '5', 3)}")
print(f"不支持的运算: {safe_operation('power', 2, 3)}")

# try/except/else/finally 完整语法
def read_file_safely(filename):
    """安全读取文件"""
    file_handle = None
    try:
        print(f"尝试打开文件: {filename}")
        file_handle = open(filename, 'r', encoding='utf-8')
        content = file_handle.read()
        print(f"文件读取成功,内容长度: {len(content)}")
        return content
    
    except FileNotFoundError:
        print(f"文件不存在: {filename}")
        return None
    
    except PermissionError:
        print(f"没有权限访问文件: {filename}")
        return None
    
    except UnicodeDecodeError:
        print(f"文件编码错误: {filename}")
        return None
    
    else:
        # 只有在没有异常时才执行
        print("文件读取操作成功完成")
    
    finally:
        # 无论是否有异常都会执行
        if file_handle and not file_handle.closed:
            file_handle.close()
            print("文件已关闭")
        print("清理操作完成")

print("\n=== 完整异常处理语法 ===")

# 创建一个测试文件
with open('test_file.txt', 'w', encoding='utf-8') as f:
    f.write("这是一个测试文件\n包含一些中文内容")

# 测试正常情况
content = read_file_safely('test_file.txt')

print()
# 测试文件不存在
content = read_file_safely('nonexistent.txt')

# 清理测试文件
import os
if os.path.exists('test_file.txt'):
    os.remove('test_file.txt')

异常信息获取

import traceback
import sys

def detailed_exception_handling():
    """详细异常信息处理"""
    
    def problematic_function(x, y):
        """有问题的函数"""
        if x == 0:
            raise ValueError("x 不能为零")
        if y == 0:
            raise ZeroDivisionError("y 不能为零")
        return x / y
    
    def another_function(a, b):
        """另一个函数"""
        return problematic_function(a, b) * 2
    
    try:
        result = another_function(10, 0)
    except Exception as e:
        print("=== 异常信息详解 ===")
        
        # 1. 异常类型和消息
        print(f"异常类型: {type(e).__name__}")
        print(f"异常消息: {str(e)}")
        print(f"异常参数: {e.args}")
        
        # 2. 异常发生的位置
        exc_type, exc_value, exc_traceback = sys.exc_info()
        print(f"\n异常发生在:")
        print(f"  文件: {exc_traceback.tb_frame.f_code.co_filename}")
        print(f"  函数: {exc_traceback.tb_frame.f_code.co_name}")
        print(f"  行号: {exc_traceback.tb_lineno}")
        
        # 3. 完整的调用栈
        print("\n完整调用栈:")
        traceback.print_exc()
        
        # 4. 格式化的调用栈
        print("\n格式化调用栈:")
        tb_lines = traceback.format_exception(exc_type, exc_value, exc_traceback)
        for line in tb_lines:
            print(line.rstrip())
        
        # 5. 只获取调用栈字符串
        print("\n调用栈字符串:")
        tb_str = ''.join(traceback.format_exception(exc_type, exc_value, exc_traceback))
        print(tb_str)

detailed_exception_handling()

# 异常链
def exception_chaining():
    """异常链示例"""
    
    def database_operation():
        """模拟数据库操作"""
        raise ConnectionError("数据库连接失败")
    
    def business_logic():
        """业务逻辑"""
        try:
            database_operation()
        except ConnectionError as e:
            # 抛出新异常,保留原异常信息
            raise RuntimeError("业务操作失败") from e
    
    def api_handler():
        """API处理器"""
        try:
            business_logic()
        except RuntimeError as e:
            print("=== 异常链 ===")
            print(f"当前异常: {e}")
            print(f"原始异常: {e.__cause__}")
            
            # 打印完整的异常链
            print("\n完整异常链:")
            traceback.print_exc()
    
    api_handler()

exception_chaining()

# 异常重新抛出
def exception_reraising():
    """异常重新抛出示例"""
    
    def risky_operation(value):
        """有风险的操作"""
        if value < 0:
            raise ValueError("值不能为负数")
        return value ** 0.5
    
    def logged_operation(value):
        """带日志的操作"""
        try:
            result = risky_operation(value)
            print(f"操作成功: sqrt({value}) = {result}")
            return result
        except ValueError as e:
            print(f"记录错误: {e}")
            # 重新抛出异常
            raise
    
    try:
        logged_operation(16)  # 正常情况
        logged_operation(-4)  # 异常情况
    except ValueError as e:
        print(f"最终捕获异常: {e}")

print("\n=== 异常重新抛出 ===")
exception_reraising()

运行异常处理基础示例:

python exception_basics.py

8.2 自定义异常

创建自定义异常类

# 基本自定义异常
class CustomError(Exception):
    """自定义异常基类"""
    pass

class ValidationError(CustomError):
    """验证错误"""
    
    def __init__(self, message, field=None, value=None):
        super().__init__(message)
        self.field = field
        self.value = value
        self.message = message
    
    def __str__(self):
        if self.field:
            return f"验证错误 [{self.field}]: {self.message} (值: {self.value})"
        return f"验证错误: {self.message}"

class BusinessLogicError(CustomError):
    """业务逻辑错误"""
    
    def __init__(self, message, error_code=None):
        super().__init__(message)
        self.error_code = error_code
        self.message = message
    
    def __str__(self):
        if self.error_code:
            return f"业务错误 [{self.error_code}]: {self.message}"
        return f"业务错误: {self.message}"

class ConfigurationError(CustomError):
    """配置错误"""
    
    def __init__(self, message, config_key=None, config_file=None):
        super().__init__(message)
        self.config_key = config_key
        self.config_file = config_file
        self.message = message
    
    def __str__(self):
        parts = ["配置错误"]
        if self.config_file:
            parts.append(f"文件: {self.config_file}")
        if self.config_key:
            parts.append(f"键: {self.config_key}")
        parts.append(self.message)
        return " - ".join(parts)

print("=== 自定义异常示例 ===")

# 使用自定义异常
def validate_user_data(user_data):
    """验证用户数据"""
    
    # 检查必需字段
    required_fields = ['name', 'email', 'age']
    for field in required_fields:
        if field not in user_data:
            raise ValidationError(f"缺少必需字段", field=field)
    
    # 验证姓名
    name = user_data['name']
    if not isinstance(name, str) or len(name.strip()) == 0:
        raise ValidationError("姓名不能为空", field='name', value=name)
    
    if len(name) > 50:
        raise ValidationError("姓名长度不能超过50个字符", field='name', value=name)
    
    # 验证邮箱
    email = user_data['email']
    if not isinstance(email, str) or '@' not in email:
        raise ValidationError("邮箱格式无效", field='email', value=email)
    
    # 验证年龄
    age = user_data['age']
    if not isinstance(age, int):
        raise ValidationError("年龄必须是整数", field='age', value=age)
    
    if age < 0 or age > 150:
        raise ValidationError("年龄必须在0-150之间", field='age', value=age)
    
    return True

# 测试验证函数
test_users = [
    {'name': 'Alice', 'email': 'alice@example.com', 'age': 25},  # 正常
    {'name': '', 'email': 'bob@example.com', 'age': 30},        # 姓名为空
    {'name': 'Charlie', 'email': 'invalid-email', 'age': 35},   # 邮箱无效
    {'name': 'David', 'email': 'david@example.com', 'age': -5}, # 年龄无效
    {'email': 'eve@example.com', 'age': 28},                    # 缺少姓名
]

for i, user in enumerate(test_users):
    try:
        validate_user_data(user)
        print(f"用户 {i+1}: 验证通过")
    except ValidationError as e:
        print(f"用户 {i+1}: {e}")

# 业务逻辑异常示例
class BankAccount:
    """银行账户类"""
    
    def __init__(self, account_number, initial_balance=0):
        self.account_number = account_number
        self.balance = initial_balance
        self.is_frozen = False
    
    def withdraw(self, amount):
        """取款"""
        if self.is_frozen:
            raise BusinessLogicError(
                "账户已冻结,无法进行取款操作", 
                error_code="ACCOUNT_FROZEN"
            )
        
        if amount <= 0:
            raise ValidationError(
                "取款金额必须大于零", 
                field="amount", 
                value=amount
            )
        
        if amount > self.balance:
            raise BusinessLogicError(
                f"余额不足,当前余额: {self.balance},尝试取款: {amount}",
                error_code="INSUFFICIENT_FUNDS"
            )
        
        self.balance -= amount
        return self.balance
    
    def freeze_account(self):
        """冻结账户"""
        self.is_frozen = True

print("\n=== 业务逻辑异常 ===")

account = BankAccount("123456789", 1000)

# 正常取款
try:
    new_balance = account.withdraw(200)
    print(f"取款成功,余额: {new_balance}")
except (ValidationError, BusinessLogicError) as e:
    print(f"取款失败: {e}")

# 余额不足
try:
    account.withdraw(2000)
except BusinessLogicError as e:
    print(f"取款失败: {e}")
    print(f"错误代码: {e.error_code}")

# 账户冻结
account.freeze_account()
try:
    account.withdraw(100)
except BusinessLogicError as e:
    print(f"取款失败: {e}")
    print(f"错误代码: {e.error_code}")

# 配置错误示例
class ConfigManager:
    """配置管理器"""
    
    def __init__(self, config_file):
        self.config_file = config_file
        self.config = {}
    
    def load_config(self):
        """加载配置"""
        try:
            # 模拟配置文件不存在
            if self.config_file == "missing.conf":
                raise FileNotFoundError(f"配置文件不存在: {self.config_file}")
            
            # 模拟配置内容
            self.config = {
                'database_url': 'localhost:5432',
                'debug': True,
                'max_connections': 100
            }
            
        except FileNotFoundError as e:
            raise ConfigurationError(
                "无法加载配置文件",
                config_file=self.config_file
            ) from e
    
    def get_config(self, key, required=True):
        """获取配置值"""
        if key not in self.config:
            if required:
                raise ConfigurationError(
                    "配置项不存在",
                    config_key=key,
                    config_file=self.config_file
                )
            return None
        
        return self.config[key]
    
    def validate_config(self):
        """验证配置"""
        required_keys = ['database_url', 'debug']
        
        for key in required_keys:
            if key not in self.config:
                raise ConfigurationError(
                    f"缺少必需的配置项",
                    config_key=key,
                    config_file=self.config_file
                )
        
        # 验证数据库URL格式
        db_url = self.config['database_url']
        if ':' not in db_url:
            raise ConfigurationError(
                "数据库URL格式无效,应为 host:port",
                config_key='database_url',
                config_file=self.config_file
            )

print("\n=== 配置错误示例 ===")

# 正常配置
try:
    config_mgr = ConfigManager("app.conf")
    config_mgr.load_config()
    config_mgr.validate_config()
    print(f"数据库URL: {config_mgr.get_config('database_url')}")
except ConfigurationError as e:
    print(f"配置错误: {e}")

# 配置文件不存在
try:
    config_mgr = ConfigManager("missing.conf")
    config_mgr.load_config()
except ConfigurationError as e:
    print(f"配置错误: {e}")
    print(f"原始异常: {e.__cause__}")

异常层次结构

# 设计良好的异常层次结构
class ApplicationError(Exception):
    """应用程序异常基类"""
    
    def __init__(self, message, error_code=None, details=None):
        super().__init__(message)
        self.message = message
        self.error_code = error_code
        self.details = details or {}
        self.timestamp = self._get_timestamp()
    
    def _get_timestamp(self):
        """获取时间戳"""
        from datetime import datetime
        return datetime.now().isoformat()
    
    def to_dict(self):
        """转换为字典"""
        return {
            'error_type': self.__class__.__name__,
            'message': self.message,
            'error_code': self.error_code,
            'details': self.details,
            'timestamp': self.timestamp
        }
    
    def __str__(self):
        parts = [self.message]
        if self.error_code:
            parts.append(f"(代码: {self.error_code})")
        return " ".join(parts)

# 数据相关异常
class DataError(ApplicationError):
    """数据异常基类"""
    pass

class ValidationError(DataError):
    """数据验证异常"""
    
    def __init__(self, message, field=None, value=None, **kwargs):
        super().__init__(message, **kwargs)
        self.field = field
        self.value = value
        if field:
            self.details.update({'field': field, 'value': str(value)})

class DataNotFoundError(DataError):
    """数据未找到异常"""
    
    def __init__(self, message, resource_type=None, resource_id=None, **kwargs):
        super().__init__(message, **kwargs)
        self.resource_type = resource_type
        self.resource_id = resource_id
        if resource_type:
            self.details.update({
                'resource_type': resource_type,
                'resource_id': str(resource_id)
            })

class DataIntegrityError(DataError):
    """数据完整性异常"""
    pass

# 业务逻辑异常
class BusinessError(ApplicationError):
    """业务逻辑异常基类"""
    pass

class AuthenticationError(BusinessError):
    """认证异常"""
    pass

class AuthorizationError(BusinessError):
    """授权异常"""
    
    def __init__(self, message, required_permission=None, **kwargs):
        super().__init__(message, **kwargs)
        self.required_permission = required_permission
        if required_permission:
            self.details['required_permission'] = required_permission

class BusinessRuleError(BusinessError):
    """业务规则异常"""
    
    def __init__(self, message, rule_name=None, **kwargs):
        super().__init__(message, **kwargs)
        self.rule_name = rule_name
        if rule_name:
            self.details['rule_name'] = rule_name

# 系统异常
class SystemError(ApplicationError):
    """系统异常基类"""
    pass

class ExternalServiceError(SystemError):
    """外部服务异常"""
    
    def __init__(self, message, service_name=None, status_code=None, **kwargs):
        super().__init__(message, **kwargs)
        self.service_name = service_name
        self.status_code = status_code
        if service_name:
            self.details['service_name'] = service_name
        if status_code:
            self.details['status_code'] = status_code

class ConfigurationError(SystemError):
    """配置异常"""
    
    def __init__(self, message, config_key=None, config_file=None, **kwargs):
        super().__init__(message, **kwargs)
        self.config_key = config_key
        self.config_file = config_file
        if config_key:
            self.details['config_key'] = config_key
        if config_file:
            self.details['config_file'] = config_file

# 使用异常层次结构的示例
class UserService:
    """用户服务"""
    
    def __init__(self):
        self.users = {
            1: {'name': 'Alice', 'email': 'alice@example.com', 'role': 'admin'},
            2: {'name': 'Bob', 'email': 'bob@example.com', 'role': 'user'},
        }
        self.current_user = None
    
    def authenticate(self, user_id, password):
        """用户认证"""
        if user_id not in self.users:
            raise AuthenticationError(
                "用户不存在",
                error_code="USER_NOT_FOUND",
                details={'user_id': user_id}
            )
        
        # 模拟密码验证
        if password != "correct_password":
            raise AuthenticationError(
                "密码错误",
                error_code="INVALID_PASSWORD"
            )
        
        self.current_user = self.users[user_id]
        return True
    
    def get_user(self, user_id):
        """获取用户信息"""
        if not self.current_user:
            raise AuthenticationError(
                "用户未登录",
                error_code="NOT_AUTHENTICATED"
            )
        
        if user_id not in self.users:
            raise DataNotFoundError(
                "用户不存在",
                resource_type="User",
                resource_id=user_id,
                error_code="USER_NOT_FOUND"
            )
        
        return self.users[user_id]
    
    def delete_user(self, user_id):
        """删除用户"""
        if not self.current_user:
            raise AuthenticationError(
                "用户未登录",
                error_code="NOT_AUTHENTICATED"
            )
        
        if self.current_user['role'] != 'admin':
            raise AuthorizationError(
                "权限不足,需要管理员权限",
                required_permission="admin",
                error_code="INSUFFICIENT_PRIVILEGES"
            )
        
        if user_id not in self.users:
            raise DataNotFoundError(
                "用户不存在",
                resource_type="User",
                resource_id=user_id,
                error_code="USER_NOT_FOUND"
            )
        
        if user_id == 1:  # 假设用户1是系统管理员
            raise BusinessRuleError(
                "不能删除系统管理员",
                rule_name="PROTECT_SYSTEM_ADMIN",
                error_code="CANNOT_DELETE_ADMIN"
            )
        
        del self.users[user_id]
        return True
    
    def update_user(self, user_id, user_data):
        """更新用户信息"""
        if not self.current_user:
            raise AuthenticationError(
                "用户未登录",
                error_code="NOT_AUTHENTICATED"
            )
        
        if user_id not in self.users:
            raise DataNotFoundError(
                "用户不存在",
                resource_type="User",
                resource_id=user_id,
                error_code="USER_NOT_FOUND"
            )
        
        # 验证数据
        if 'email' in user_data:
            email = user_data['email']
            if not isinstance(email, str) or '@' not in email:
                raise ValidationError(
                    "邮箱格式无效",
                    field="email",
                    value=email,
                    error_code="INVALID_EMAIL_FORMAT"
                )
            
            # 检查邮箱是否已存在
            for uid, user in self.users.items():
                if uid != user_id and user['email'] == email:
                    raise DataIntegrityError(
                        "邮箱已存在",
                        error_code="EMAIL_ALREADY_EXISTS",
                        details={'email': email, 'existing_user_id': uid}
                    )
        
        self.users[user_id].update(user_data)
        return self.users[user_id]

print("\n=== 异常层次结构示例 ===")

user_service = UserService()

# 测试各种异常情况
test_cases = [
    # 认证测试
    lambda: user_service.authenticate(999, "wrong_password"),  # 用户不存在
    lambda: user_service.authenticate(1, "wrong_password"),    # 密码错误
    lambda: user_service.get_user(1),                          # 未登录
    
    # 正常登录
    lambda: user_service.authenticate(1, "correct_password"),  # 正常登录
    
    # 数据操作测试
    lambda: user_service.get_user(999),                        # 用户不存在
    lambda: user_service.update_user(2, {'email': 'invalid'}), # 邮箱格式错误
    lambda: user_service.update_user(2, {'email': 'alice@example.com'}), # 邮箱已存在
    lambda: user_service.delete_user(1),                       # 不能删除管理员
]

for i, test_case in enumerate(test_cases):
    try:
        result = test_case()
        print(f"测试 {i+1}: 成功 - {result}")
    except ApplicationError as e:
        print(f"测试 {i+1}: {e.__class__.__name__} - {e}")
        print(f"  详细信息: {e.to_dict()}")
    except Exception as e:
        print(f"测试 {i+1}: 未知错误 - {e}")

运行自定义异常示例:

python custom_exceptions.py

8.3 调试技巧和工具

使用pdb调试器

import pdb

def debugging_example():
    """调试示例函数"""
    
    def calculate_average(numbers):
        """计算平均值"""
        total = 0
        count = 0
        
        for num in numbers:
            # 设置断点
            # pdb.set_trace()  # 取消注释以启用调试
            
            if isinstance(num, (int, float)):
                total += num
                count += 1
            else:
                print(f"跳过非数字值: {num}")
        
        if count == 0:
            raise ValueError("没有有效的数字")
        
        average = total / count
        return average
    
    def process_data(data_list):
        """处理数据列表"""
        results = []
        
        for i, data in enumerate(data_list):
            try:
                # 在这里可以设置条件断点
                if i == 2:  # 在处理第3个元素时调试
                    # pdb.set_trace()  # 取消注释以启用调试
                    pass
                
                avg = calculate_average(data)
                results.append({
                    'index': i,
                    'data': data,
                    'average': avg,
                    'status': 'success'
                })
            except Exception as e:
                results.append({
                    'index': i,
                    'data': data,
                    'error': str(e),
                    'status': 'error'
                })
        
        return results
    
    # 测试数据
    test_data = [
        [1, 2, 3, 4, 5],           # 正常数据
        [10, 20, 30],              # 正常数据
        [1, 'invalid', 3],         # 包含无效数据
        [],                        # 空列表
        [100, 200, 300, 400]       # 正常数据
    ]
    
    print("=== 调试示例 ===")
    results = process_data(test_data)
    
    for result in results:
        if result['status'] == 'success':
            print(f"数据 {result['index']}: 平均值 = {result['average']:.2f}")
        else:
            print(f"数据 {result['index']}: 错误 - {result['error']}")

# 运行调试示例
debugging_example()

# 调试技巧和最佳实践
class DebugHelper:
    """调试辅助类"""
    
    @staticmethod
    def print_debug_info(func):
        """打印调试信息的装饰器"""
        import functools
        
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            print(f"\n=== 调试信息: {func.__name__} ===")
            print(f"参数: args={args}, kwargs={kwargs}")
            
            try:
                result = func(*args, **kwargs)
                print(f"返回值: {result}")
                print(f"返回值类型: {type(result)}")
                return result
            except Exception as e:
                print(f"异常: {type(e).__name__}: {e}")
                import traceback
                traceback.print_exc()
                raise
            finally:
                print(f"=== 结束: {func.__name__} ===")
        
        return wrapper
    
    @staticmethod
    def trace_calls(func):
        """跟踪函数调用的装饰器"""
        import functools
        
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            indent = getattr(wrapper, 'call_depth', 0) * '  '
            print(f"{indent}-> 调用 {func.__name__}")
            
            wrapper.call_depth = getattr(wrapper, 'call_depth', 0) + 1
            
            try:
                result = func(*args, **kwargs)
                print(f"{indent}<- 返回 {func.__name__}: {result}")
                return result
            except Exception as e:
                print(f"{indent}<- 异常 {func.__name__}: {e}")
                raise
            finally:
                wrapper.call_depth -= 1
        
        return wrapper
    
    @staticmethod
    def profile_performance(func):
        """性能分析装饰器"""
        import functools
        import time
        import psutil
        import os
        
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            # 获取进程信息
            process = psutil.Process(os.getpid())
            
            # 记录开始状态
            start_time = time.time()
            start_memory = process.memory_info().rss / 1024 / 1024  # MB
            
            try:
                result = func(*args, **kwargs)
                
                # 记录结束状态
                end_time = time.time()
                end_memory = process.memory_info().rss / 1024 / 1024  # MB
                
                # 计算性能指标
                duration = end_time - start_time
                memory_delta = end_memory - start_memory
                
                print(f"\n=== 性能分析: {func.__name__} ===")
                print(f"执行时间: {duration:.4f} 秒")
                print(f"内存使用: {start_memory:.2f} MB -> {end_memory:.2f} MB (变化: {memory_delta:+.2f} MB)")
                print(f"CPU使用率: {process.cpu_percent()}%")
                
                return result
            
            except Exception as e:
                end_time = time.time()
                duration = end_time - start_time
                print(f"\n=== 性能分析 (异常): {func.__name__} ===")
                print(f"执行时间: {duration:.4f} 秒")
                print(f"异常: {e}")
                raise
        
        return wrapper

# 使用调试装饰器的示例
class Calculator:
    """计算器类(用于演示调试)"""
    
    @DebugHelper.print_debug_info
    def add(self, a, b):
        """加法"""
        return a + b
    
    @DebugHelper.trace_calls
    def factorial(self, n):
        """阶乘(递归实现)"""
        if n <= 1:
            return 1
        return n * self.factorial(n - 1)
    
    @DebugHelper.profile_performance
    def heavy_calculation(self, n):
        """重计算(用于性能分析)"""
        result = 0
        for i in range(n):
            result += i ** 2
        return result

print("\n=== 调试装饰器示例 ===")

calc = Calculator()

# 测试调试信息装饰器
result = calc.add(5, 3)

# 测试调用跟踪装饰器
result = calc.factorial(5)

# 测试性能分析装饰器(需要安装psutil: pip install psutil)
try:
    result = calc.heavy_calculation(100000)
except ImportError:
    print("跳过性能分析(需要安装psutil)")

# 变量检查工具
def inspect_variables():
    """检查变量的工具函数"""
    
    def analyze_variable(var, name="variable"):
        """分析变量"""
        print(f"\n=== 变量分析: {name} ===")
        print(f"值: {var}")
        print(f"类型: {type(var)}")
        print(f"大小: {len(var) if hasattr(var, '__len__') else 'N/A'}")
        print(f"内存地址: {id(var)}")
        
        if hasattr(var, '__dict__'):
            print(f"属性: {list(var.__dict__.keys())}")
        
        if hasattr(var, '__class__'):
            print(f"类: {var.__class__.__name__}")
            print(f"MRO: {[cls.__name__ for cls in var.__class__.__mro__]}")
        
        # 对于容器类型,显示内容摘要
        if isinstance(var, (list, tuple)):
            print(f"前5个元素: {var[:5]}")
        elif isinstance(var, dict):
            keys = list(var.keys())[:5]
            print(f"前5个键: {keys}")
        elif isinstance(var, str):
            print(f"前50个字符: {repr(var[:50])}")
    
    # 测试不同类型的变量
    test_vars = {
        'integer': 42,
        'string': "Hello, World!" * 10,
        'list': list(range(100)),
        'dict': {f'key_{i}': f'value_{i}' for i in range(20)},
        'calculator': Calculator()
    }
    
    for name, var in test_vars.items():
        analyze_variable(var, name)

print("\n=== 变量检查工具 ===")
inspect_variables()

日志记录

import logging
import sys
from datetime import datetime
import json

# 配置日志记录
def setup_logging():
    """设置日志配置"""
    
    # 创建根日志记录器
    logger = logging.getLogger()
    logger.setLevel(logging.DEBUG)
    
    # 清除现有的处理器
    logger.handlers.clear()
    
    # 创建格式化器
    formatter = logging.Formatter(
        '%(asctime)s - %(name)s - %(levelname)s - %(filename)s:%(lineno)d - %(message)s'
    )
    
    # 控制台处理器
    console_handler = logging.StreamHandler(sys.stdout)
    console_handler.setLevel(logging.INFO)
    console_handler.setFormatter(formatter)
    logger.addHandler(console_handler)
    
    # 文件处理器
    file_handler = logging.FileHandler('app.log', encoding='utf-8')
    file_handler.setLevel(logging.DEBUG)
    file_handler.setFormatter(formatter)
    logger.addHandler(file_handler)
    
    # 错误文件处理器
    error_handler = logging.FileHandler('error.log', encoding='utf-8')
    error_handler.setLevel(logging.ERROR)
    error_handler.setFormatter(formatter)
    logger.addHandler(error_handler)
    
    return logger

# 自定义日志格式化器
class JSONFormatter(logging.Formatter):
    """JSON格式的日志格式化器"""
    
    def format(self, record):
        log_entry = {
            'timestamp': datetime.fromtimestamp(record.created).isoformat(),
            'level': record.levelname,
            'logger': record.name,
            'module': record.module,
            'function': record.funcName,
            'line': record.lineno,
            'message': record.getMessage(),
        }
        
        # 添加异常信息
        if record.exc_info:
            log_entry['exception'] = self.formatException(record.exc_info)
        
        # 添加额外字段
        if hasattr(record, 'user_id'):
            log_entry['user_id'] = record.user_id
        if hasattr(record, 'request_id'):
            log_entry['request_id'] = record.request_id
        
        return json.dumps(log_entry, ensure_ascii=False)

# 日志记录示例类
class UserService:
    """用户服务(带日志记录)"""
    
    def __init__(self):
        self.logger = logging.getLogger(self.__class__.__name__)
        self.users = {}
        self.logger.info("用户服务初始化完成")
    
    def create_user(self, user_data):
        """创建用户"""
        user_id = user_data.get('id')
        
        # 记录操作开始
        self.logger.info(f"开始创建用户", extra={'user_id': user_id})
        
        try:
            # 验证用户数据
            self._validate_user_data(user_data)
            
            # 检查用户是否已存在
            if user_id in self.users:
                self.logger.warning(f"用户已存在", extra={'user_id': user_id})
                raise ValueError(f"用户 {user_id} 已存在")
            
            # 创建用户
            self.users[user_id] = user_data.copy()
            
            self.logger.info(f"用户创建成功", extra={'user_id': user_id})
            return user_data
        
        except Exception as e:
            self.logger.error(f"用户创建失败: {e}", extra={'user_id': user_id}, exc_info=True)
            raise
    
    def get_user(self, user_id):
        """获取用户"""
        self.logger.debug(f"查询用户", extra={'user_id': user_id})
        
        if user_id not in self.users:
            self.logger.warning(f"用户不存在", extra={'user_id': user_id})
            return None
        
        user = self.users[user_id]
        self.logger.debug(f"用户查询成功", extra={'user_id': user_id})
        return user
    
    def update_user(self, user_id, update_data):
        """更新用户"""
        self.logger.info(f"开始更新用户", extra={'user_id': user_id})
        
        try:
            if user_id not in self.users:
                self.logger.error(f"尝试更新不存在的用户", extra={'user_id': user_id})
                raise ValueError(f"用户 {user_id} 不存在")
            
            # 记录更新前的数据
            old_data = self.users[user_id].copy()
            self.logger.debug(f"更新前数据: {old_data}", extra={'user_id': user_id})
            
            # 更新数据
            self.users[user_id].update(update_data)
            
            # 记录更新后的数据
            new_data = self.users[user_id]
            self.logger.info(f"用户更新成功", extra={'user_id': user_id})
            self.logger.debug(f"更新后数据: {new_data}", extra={'user_id': user_id})
            
            return new_data
        
        except Exception as e:
            self.logger.error(f"用户更新失败: {e}", extra={'user_id': user_id}, exc_info=True)
            raise
    
    def delete_user(self, user_id):
        """删除用户"""
        self.logger.info(f"开始删除用户", extra={'user_id': user_id})
        
        try:
            if user_id not in self.users:
                self.logger.warning(f"尝试删除不存在的用户", extra={'user_id': user_id})
                return False
            
            # 记录被删除的用户数据
            deleted_user = self.users[user_id]
            self.logger.debug(f"删除的用户数据: {deleted_user}", extra={'user_id': user_id})
            
            del self.users[user_id]
            
            self.logger.info(f"用户删除成功", extra={'user_id': user_id})
            return True
        
        except Exception as e:
            self.logger.error(f"用户删除失败: {e}", extra={'user_id': user_id}, exc_info=True)
            raise
    
    def _validate_user_data(self, user_data):
        """验证用户数据"""
        required_fields = ['id', 'name', 'email']
        
        for field in required_fields:
            if field not in user_data:
                self.logger.error(f"缺少必需字段: {field}")
                raise ValueError(f"缺少必需字段: {field}")
        
        # 验证邮箱格式
        email = user_data['email']
        if '@' not in email:
            self.logger.error(f"邮箱格式无效: {email}")
            raise ValueError(f"邮箱格式无效: {email}")
        
        self.logger.debug("用户数据验证通过")

# 性能日志记录
class PerformanceLogger:
    """性能日志记录器"""
    
    def __init__(self, logger_name="performance"):
        self.logger = logging.getLogger(logger_name)
    
    def log_function_performance(self, func):
        """记录函数性能的装饰器"""
        import functools
        import time
        
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            start_time = time.time()
            
            try:
                result = func(*args, **kwargs)
                end_time = time.time()
                duration = end_time - start_time
                
                self.logger.info(
                    f"函数 {func.__name__} 执行完成",
                    extra={
                        'function': func.__name__,
                        'duration': duration,
                        'status': 'success',
                        'args_count': len(args),
                        'kwargs_count': len(kwargs)
                    }
                )
                
                return result
            
            except Exception as e:
                end_time = time.time()
                duration = end_time - start_time
                
                self.logger.error(
                    f"函数 {func.__name__} 执行失败",
                    extra={
                        'function': func.__name__,
                        'duration': duration,
                        'status': 'error',
                        'error': str(e),
                        'args_count': len(args),
                        'kwargs_count': len(kwargs)
                    },
                    exc_info=True
                )
                
                raise
        
        return wrapper

# 设置日志并运行示例
print("=== 日志记录示例 ===")

# 设置基本日志
logger = setup_logging()

# 创建用户服务
user_service = UserService()

# 创建性能日志记录器
perf_logger = PerformanceLogger()

# 为用户服务方法添加性能日志
user_service.create_user = perf_logger.log_function_performance(user_service.create_user)
user_service.get_user = perf_logger.log_function_performance(user_service.get_user)

# 测试用户服务
test_users = [
    {'id': 1, 'name': 'Alice', 'email': 'alice@example.com'},
    {'id': 2, 'name': 'Bob', 'email': 'bob@example.com'},
    {'id': 3, 'name': 'Charlie', 'email': 'invalid-email'},  # 无效邮箱
]

for user_data in test_users:
    try:
        user = user_service.create_user(user_data)
        print(f"创建用户成功: {user['name']}")
    except Exception as e:
        print(f"创建用户失败: {e}")

# 查询用户
for user_id in [1, 2, 999]:
    user = user_service.get_user(user_id)
    if user:
        print(f"找到用户: {user['name']}")
    else:
        print(f"用户 {user_id} 不存在")

# 更新用户
try:
    updated_user = user_service.update_user(1, {'name': 'Alice Smith'})
    print(f"更新用户成功: {updated_user['name']}")
except Exception as e:
    print(f"更新用户失败: {e}")

# 删除用户
try:
    success = user_service.delete_user(2)
    print(f"删除用户: {'成功' if success else '失败'}")
except Exception as e:
    print(f"删除用户失败: {e}")

print("\n日志文件已生成: app.log, error.log")
print("可以查看这些文件了解详细的日志记录")

运行调试和日志示例:

python debugging_logging.py

本章小结

本章我们深入学习了Python的异常处理和调试:

  1. 异常处理基础:异常类型、try/except语法、异常信息获取
  2. 自定义异常:异常类设计、异常层次结构、最佳实践
  3. 调试技巧:pdb调试器、调试装饰器、变量检查、性能分析
  4. 日志记录:日志配置、自定义格式化器、性能日志、最佳实践

下一章预告

下一章我们将学习《文件操作和I/O》,内容包括: - 文件的读写操作 - 文件路径处理 - CSV和JSON文件处理 - 二进制文件操作 - 网络I/O和异步I/O

练习题

基础练习

  1. 异常处理

    • 编写一个安全的文件读取函数
    • 实现一个带重试机制的网络请求函数
    • 创建一个数据验证器
  2. 自定义异常

    • 设计一个电商系统的异常体系
    • 实现一个配置管理器的异常处理
    • 创建一个API错误处理系统

进阶练习

  1. 调试工具

    • 实现一个代码执行时间分析器
    • 创建一个内存使用监控器
    • 编写一个函数调用跟踪器
  2. 日志系统

    • 实现一个分布式日志收集器
    • 创建一个日志分析工具
    • 设计一个实时日志监控系统

提示:良好的异常处理和调试技能是编写健壮Python程序的关键。学会使用各种调试工具和日志记录技术,能大大提高开发效率和代码质量。