9.1 概述

Redis事务和脚本是Redis提供的两种重要机制,用于确保多个操作的原子性执行。虽然Redis是单线程的,但在某些场景下仍需要保证一组操作的完整性和一致性。

9.1.1 Redis事务特点

  • 原子性:事务中的所有命令要么全部执行,要么全部不执行
  • 隔离性:事务执行期间不会被其他客户端的命令打断
  • 一致性:事务执行前后,数据库状态保持一致
  • 无持久性保证:Redis事务不提供持久性保证,依赖于Redis的持久化机制

9.1.2 Lua脚本特点

  • 原子性执行:整个脚本作为一个原子操作执行
  • 服务器端执行:减少网络往返次数
  • 可复用性:脚本可以被缓存和重复使用
  • 丰富的逻辑控制:支持条件判断、循环等复杂逻辑

9.1.3 应用场景

  • 银行转账:确保扣款和入账操作的原子性
  • 库存管理:防止超卖问题
  • 计数器操作:确保计数的准确性
  • 缓存更新:保证缓存和数据的一致性
  • 分布式锁:实现复杂的锁机制

9.2 Redis事务

9.2.1 基本命令

MULTI - 开始事务

MULTI

EXEC - 执行事务

EXEC

DISCARD - 取消事务

DISCARD

WATCH - 监视键

WATCH key [key ...]

UNWATCH - 取消监视

UNWATCH

9.2.2 Python实现示例

import redis
import time
import threading
from typing import List, Dict, Any, Optional
from decimal import Decimal
import logging

class RedisTransaction:
    def __init__(self, redis_client: redis.Redis):
        self.redis_client = redis_client
        self.logger = logging.getLogger(__name__)
    
    def simple_transaction(self, operations: List[tuple]) -> List[Any]:
        """执行简单事务"""
        pipe = self.redis_client.pipeline()
        
        try:
            # 开始事务
            pipe.multi()
            
            # 添加操作
            for operation in operations:
                command = operation[0]
                args = operation[1:]
                getattr(pipe, command)(*args)
            
            # 执行事务
            results = pipe.execute()
            return results
            
        except redis.WatchError:
            self.logger.warning("Transaction failed due to watched key modification")
            return None
        except Exception as e:
            self.logger.error(f"Transaction failed: {e}")
            pipe.discard()
            return None
    
    def watched_transaction(self, watch_keys: List[str], operations: List[tuple], 
                          max_retries: int = 3) -> Optional[List[Any]]:
        """执行带监视的事务"""
        for attempt in range(max_retries):
            try:
                pipe = self.redis_client.pipeline()
                
                # 监视键
                pipe.watch(*watch_keys)
                
                # 检查条件(可选)
                current_values = {}
                for key in watch_keys:
                    current_values[key] = self.redis_client.get(key)
                
                # 开始事务
                pipe.multi()
                
                # 添加操作
                for operation in operations:
                    command = operation[0]
                    args = operation[1:]
                    getattr(pipe, command)(*args)
                
                # 执行事务
                results = pipe.execute()
                return results
                
            except redis.WatchError:
                self.logger.warning(f"Transaction attempt {attempt + 1} failed due to watched key modification")
                if attempt == max_retries - 1:
                    self.logger.error("Transaction failed after maximum retries")
                    return None
                time.sleep(0.01 * (2 ** attempt))  # 指数退避
            except Exception as e:
                self.logger.error(f"Transaction failed: {e}")
                return None
        
        return None

### 9.2.3 银行转账系统

```python
class BankTransferSystem:
    def __init__(self, redis_client: redis.Redis):
        self.redis_client = redis_client
        self.transaction_handler = RedisTransaction(redis_client)
        self.logger = logging.getLogger(__name__)
    
    def create_account(self, account_id: str, initial_balance: Decimal = Decimal('0')):
        """创建账户"""
        account_key = f"account:{account_id}:balance"
        info_key = f"account:{account_id}:info"
        
        pipe = self.redis_client.pipeline()
        pipe.set(account_key, str(initial_balance))
        pipe.hset(info_key, mapping={
            'created_at': time.time(),
            'status': 'active',
            'currency': 'USD'
        })
        pipe.execute()
        
        self.logger.info(f"Account {account_id} created with balance {initial_balance}")
    
    def get_balance(self, account_id: str) -> Optional[Decimal]:
        """获取账户余额"""
        account_key = f"account:{account_id}:balance"
        balance_str = self.redis_client.get(account_key)
        
        if balance_str is None:
            return None
        
        return Decimal(balance_str)
    
    def transfer(self, from_account: str, to_account: str, amount: Decimal, 
                description: str = "") -> Dict[str, Any]:
        """转账操作"""
        if amount <= 0:
            return {
                'success': False,
                'error': 'Transfer amount must be positive',
                'transaction_id': None
            }
        
        from_key = f"account:{from_account}:balance"
        to_key = f"account:{to_account}:balance"
        transaction_id = f"tx:{int(time.time() * 1000000)}"
        
        # 检查账户是否存在
        if not self.redis_client.exists(from_key) or not self.redis_client.exists(to_key):
            return {
                'success': False,
                'error': 'One or both accounts do not exist',
                'transaction_id': None
            }
        
        # 使用监视事务确保原子性
        for attempt in range(5):
            try:
                pipe = self.redis_client.pipeline()
                
                # 监视相关账户
                pipe.watch(from_key, to_key)
                
                # 获取当前余额
                from_balance = Decimal(self.redis_client.get(from_key) or '0')
                to_balance = Decimal(self.redis_client.get(to_key) or '0')
                
                # 检查余额是否足够
                if from_balance < amount:
                    pipe.unwatch()
                    return {
                        'success': False,
                        'error': 'Insufficient balance',
                        'transaction_id': None,
                        'current_balance': from_balance
                    }
                
                # 计算新余额
                new_from_balance = from_balance - amount
                new_to_balance = to_balance + amount
                
                # 开始事务
                pipe.multi()
                
                # 更新余额
                pipe.set(from_key, str(new_from_balance))
                pipe.set(to_key, str(new_to_balance))
                
                # 记录交易历史
                transaction_data = {
                    'transaction_id': transaction_id,
                    'from_account': from_account,
                    'to_account': to_account,
                    'amount': str(amount),
                    'description': description,
                    'timestamp': time.time(),
                    'status': 'completed'
                }
                
                pipe.hset(f"transaction:{transaction_id}", mapping=transaction_data)
                
                # 添加到账户交易历史
                pipe.lpush(f"account:{from_account}:transactions", transaction_id)
                pipe.lpush(f"account:{to_account}:transactions", transaction_id)
                
                # 限制交易历史长度
                pipe.ltrim(f"account:{from_account}:transactions", 0, 999)
                pipe.ltrim(f"account:{to_account}:transactions", 0, 999)
                
                # 执行事务
                results = pipe.execute()
                
                self.logger.info(f"Transfer completed: {from_account} -> {to_account}, amount: {amount}")
                
                return {
                    'success': True,
                    'transaction_id': transaction_id,
                    'from_balance': new_from_balance,
                    'to_balance': new_to_balance,
                    'timestamp': transaction_data['timestamp']
                }
                
            except redis.WatchError:
                self.logger.warning(f"Transfer attempt {attempt + 1} failed due to concurrent modification")
                if attempt == 4:
                    return {
                        'success': False,
                        'error': 'Transfer failed due to concurrent operations',
                        'transaction_id': None
                    }
                time.sleep(0.01 * (2 ** attempt))
            except Exception as e:
                self.logger.error(f"Transfer failed: {e}")
                return {
                    'success': False,
                    'error': str(e),
                    'transaction_id': None
                }
        
        return {
            'success': False,
            'error': 'Transfer failed after maximum retries',
            'transaction_id': None
        }
    
    def get_transaction_history(self, account_id: str, limit: int = 10) -> List[Dict[str, Any]]:
        """获取交易历史"""
        transaction_ids = self.redis_client.lrange(f"account:{account_id}:transactions", 0, limit - 1)
        
        transactions = []
        for tx_id in transaction_ids:
            tx_data = self.redis_client.hgetall(f"transaction:{tx_id}")
            if tx_data:
                transactions.append(tx_data)
        
        return transactions
    
    def get_account_info(self, account_id: str) -> Dict[str, Any]:
        """获取账户信息"""
        balance = self.get_balance(account_id)
        if balance is None:
            return {'error': 'Account not found'}
        
        info = self.redis_client.hgetall(f"account:{account_id}:info")
        recent_transactions = self.get_transaction_history(account_id, 5)
        
        return {
            'account_id': account_id,
            'balance': balance,
            'info': info,
            'recent_transactions': recent_transactions
        }

### 9.2.4 库存管理系统

```python
class InventoryManager:
    def __init__(self, redis_client: redis.Redis):
        self.redis_client = redis_client
        self.logger = logging.getLogger(__name__)
    
    def add_product(self, product_id: str, initial_stock: int, price: Decimal, 
                   name: str = "", description: str = ""):
        """添加产品"""
        product_key = f"product:{product_id}"
        stock_key = f"stock:{product_id}"
        
        pipe = self.redis_client.pipeline()
        pipe.hset(product_key, mapping={
            'name': name,
            'description': description,
            'price': str(price),
            'created_at': time.time()
        })
        pipe.set(stock_key, initial_stock)
        pipe.execute()
        
        self.logger.info(f"Product {product_id} added with stock {initial_stock}")
    
    def get_stock(self, product_id: str) -> Optional[int]:
        """获取库存"""
        stock_key = f"stock:{product_id}"
        stock = self.redis_client.get(stock_key)
        return int(stock) if stock is not None else None
    
    def reserve_stock(self, product_id: str, quantity: int, 
                     customer_id: str, ttl: int = 300) -> Dict[str, Any]:
        """预留库存"""
        if quantity <= 0:
            return {
                'success': False,
                'error': 'Quantity must be positive'
            }
        
        stock_key = f"stock:{product_id}"
        reservation_id = f"reservation:{int(time.time() * 1000000)}"
        
        for attempt in range(5):
            try:
                pipe = self.redis_client.pipeline()
                
                # 监视库存
                pipe.watch(stock_key)
                
                # 获取当前库存
                current_stock = self.redis_client.get(stock_key)
                if current_stock is None:
                    pipe.unwatch()
                    return {
                        'success': False,
                        'error': 'Product not found'
                    }
                
                current_stock = int(current_stock)
                
                # 检查库存是否足够
                if current_stock < quantity:
                    pipe.unwatch()
                    return {
                        'success': False,
                        'error': 'Insufficient stock',
                        'available_stock': current_stock
                    }
                
                # 开始事务
                pipe.multi()
                
                # 减少库存
                pipe.set(stock_key, current_stock - quantity)
                
                # 创建预留记录
                reservation_data = {
                    'product_id': product_id,
                    'customer_id': customer_id,
                    'quantity': quantity,
                    'created_at': time.time(),
                    'status': 'reserved'
                }
                
                pipe.hset(reservation_id, mapping=reservation_data)
                pipe.expire(reservation_id, ttl)
                
                # 添加到客户预留列表
                pipe.lpush(f"customer:{customer_id}:reservations", reservation_id)
                pipe.expire(f"customer:{customer_id}:reservations", ttl + 60)
                
                # 执行事务
                results = pipe.execute()
                
                self.logger.info(f"Stock reserved: {product_id}, quantity: {quantity}, customer: {customer_id}")
                
                return {
                    'success': True,
                    'reservation_id': reservation_id,
                    'remaining_stock': current_stock - quantity,
                    'expires_at': time.time() + ttl
                }
                
            except redis.WatchError:
                self.logger.warning(f"Stock reservation attempt {attempt + 1} failed due to concurrent modification")
                if attempt == 4:
                    return {
                        'success': False,
                        'error': 'Reservation failed due to concurrent operations'
                    }
                time.sleep(0.01 * (2 ** attempt))
            except Exception as e:
                self.logger.error(f"Stock reservation failed: {e}")
                return {
                    'success': False,
                    'error': str(e)
                }
        
        return {
            'success': False,
            'error': 'Reservation failed after maximum retries'
        }
    
    def confirm_reservation(self, reservation_id: str) -> Dict[str, Any]:
        """确认预留(完成购买)"""
        reservation_data = self.redis_client.hgetall(reservation_id)
        
        if not reservation_data:
            return {
                'success': False,
                'error': 'Reservation not found or expired'
            }
        
        if reservation_data.get('status') != 'reserved':
            return {
                'success': False,
                'error': 'Reservation already processed'
            }
        
        # 更新预留状态
        pipe = self.redis_client.pipeline()
        pipe.hset(reservation_id, 'status', 'confirmed')
        pipe.hset(reservation_id, 'confirmed_at', time.time())
        pipe.persist(reservation_id)  # 移除过期时间
        pipe.execute()
        
        self.logger.info(f"Reservation confirmed: {reservation_id}")
        
        return {
            'success': True,
            'reservation_id': reservation_id,
            'product_id': reservation_data['product_id'],
            'quantity': int(reservation_data['quantity'])
        }
    
    def cancel_reservation(self, reservation_id: str) -> Dict[str, Any]:
        """取消预留(恢复库存)"""
        reservation_data = self.redis_client.hgetall(reservation_id)
        
        if not reservation_data:
            return {
                'success': False,
                'error': 'Reservation not found or expired'
            }
        
        if reservation_data.get('status') != 'reserved':
            return {
                'success': False,
                'error': 'Reservation already processed'
            }
        
        product_id = reservation_data['product_id']
        quantity = int(reservation_data['quantity'])
        stock_key = f"stock:{product_id}"
        
        for attempt in range(3):
            try:
                pipe = self.redis_client.pipeline()
                
                # 监视库存
                pipe.watch(stock_key)
                
                # 获取当前库存
                current_stock = int(self.redis_client.get(stock_key) or 0)
                
                # 开始事务
                pipe.multi()
                
                # 恢复库存
                pipe.set(stock_key, current_stock + quantity)
                
                # 更新预留状态
                pipe.hset(reservation_id, 'status', 'cancelled')
                pipe.hset(reservation_id, 'cancelled_at', time.time())
                
                # 执行事务
                results = pipe.execute()
                
                self.logger.info(f"Reservation cancelled: {reservation_id}, stock restored: {quantity}")
                
                return {
                    'success': True,
                    'reservation_id': reservation_id,
                    'restored_quantity': quantity,
                    'new_stock': current_stock + quantity
                }
                
            except redis.WatchError:
                self.logger.warning(f"Cancel reservation attempt {attempt + 1} failed")
                if attempt == 2:
                    return {
                        'success': False,
                        'error': 'Failed to cancel reservation due to concurrent operations'
                    }
                time.sleep(0.01)
            except Exception as e:
                self.logger.error(f"Cancel reservation failed: {e}")
                return {
                    'success': False,
                    'error': str(e)
                }
        
        return {
            'success': False,
            'error': 'Cancel reservation failed after retries'
        }
    
    def get_product_info(self, product_id: str) -> Dict[str, Any]:
        """获取产品信息"""
        product_data = self.redis_client.hgetall(f"product:{product_id}")
        stock = self.get_stock(product_id)
        
        if not product_data:
            return {'error': 'Product not found'}
        
        return {
            'product_id': product_id,
            'stock': stock,
            **product_data
        }

# 使用示例
if __name__ == "__main__":
    redis_client = redis.Redis(decode_responses=True)
    
    # 银行转账系统示例
    bank = BankTransferSystem(redis_client)
    
    # 创建账户
    bank.create_account("alice", Decimal('1000.00'))
    bank.create_account("bob", Decimal('500.00'))
    
    # 执行转账
    result = bank.transfer("alice", "bob", Decimal('100.00'), "Payment for services")
    print(f"Transfer result: {result}")
    
    # 查看账户信息
    alice_info = bank.get_account_info("alice")
    bob_info = bank.get_account_info("bob")
    print(f"Alice balance: {alice_info['balance']}")
    print(f"Bob balance: {bob_info['balance']}")
    
    # 库存管理系统示例
    inventory = InventoryManager(redis_client)
    
    # 添加产品
    inventory.add_product("laptop001", 50, Decimal('999.99'), "Gaming Laptop", "High-performance gaming laptop")
    
    # 预留库存
    reservation = inventory.reserve_stock("laptop001", 2, "customer123")
    print(f"Reservation result: {reservation}")
    
    # 确认预留
    if reservation['success']:
        confirm_result = inventory.confirm_reservation(reservation['reservation_id'])
        print(f"Confirmation result: {confirm_result}")
    
    # 查看产品信息
    product_info = inventory.get_product_info("laptop001")
    print(f"Product info: {product_info}")

9.3 Lua脚本

9.3.1 基本命令

EVAL - 执行脚本

EVAL script numkeys key [key ...] arg [arg ...]

EVALSHA - 执行缓存的脚本

EVALSHA sha1 numkeys key [key ...] arg [arg ...]

SCRIPT LOAD - 加载脚本

SCRIPT LOAD script

SCRIPT EXISTS - 检查脚本是否存在

SCRIPT EXISTS sha1 [sha1 ...]

SCRIPT FLUSH - 清除脚本缓存

SCRIPT FLUSH

SCRIPT KILL - 终止脚本

SCRIPT KILL

9.3.2 Python实现示例

import redis
import hashlib
import json
from typing import List, Dict, Any, Optional, Union
import time

class LuaScriptManager:
    def __init__(self, redis_client: redis.Redis):
        self.redis_client = redis_client
        self.script_cache = {}
        self.logger = logging.getLogger(__name__)
    
    def load_script(self, script: str) -> str:
        """加载脚本并返回SHA1"""
        sha1 = self.redis_client.script_load(script)
        self.script_cache[sha1] = script
        return sha1
    
    def execute_script(self, script: str, keys: List[str] = None, 
                      args: List[Any] = None) -> Any:
        """执行脚本"""
        keys = keys or []
        args = args or []
        
        try:
            return self.redis_client.eval(script, len(keys), *(keys + args))
        except redis.ResponseError as e:
            self.logger.error(f"Script execution failed: {e}")
            raise
    
    def execute_script_sha(self, sha1: str, keys: List[str] = None, 
                          args: List[Any] = None) -> Any:
        """执行缓存的脚本"""
        keys = keys or []
        args = args or []
        
        try:
            return self.redis_client.evalsha(sha1, len(keys), *(keys + args))
        except redis.NoScriptError:
            # 脚本不存在,重新加载
            if sha1 in self.script_cache:
                self.load_script(self.script_cache[sha1])
                return self.redis_client.evalsha(sha1, len(keys), *(keys + args))
            else:
                raise ValueError(f"Script with SHA1 {sha1} not found in cache")
    
    def script_exists(self, sha1: str) -> bool:
        """检查脚本是否存在"""
        return self.redis_client.script_exists(sha1)[0]
    
    def flush_scripts(self):
        """清除所有脚本缓存"""
        self.redis_client.script_flush()
        self.script_cache.clear()

### 9.3.3 分布式锁实现

```python
class DistributedLock:
    def __init__(self, redis_client: redis.Redis):
        self.redis_client = redis_client
        self.script_manager = LuaScriptManager(redis_client)
        
        # 获取锁的脚本
        self.acquire_script = """
        local key = KEYS[1]
        local identifier = ARGV[1]
        local ttl = tonumber(ARGV[2])
        
        -- 尝试获取锁
        local result = redis.call('SET', key, identifier, 'NX', 'EX', ttl)
        if result then
            return 1
        else
            return 0
        end
        """
        
        # 释放锁的脚本
        self.release_script = """
        local key = KEYS[1]
        local identifier = ARGV[1]
        
        -- 检查锁的拥有者
        local current = redis.call('GET', key)
        if current == identifier then
            redis.call('DEL', key)
            return 1
        else
            return 0
        end
        """
        
        # 续期锁的脚本
        self.renew_script = """
        local key = KEYS[1]
        local identifier = ARGV[1]
        local ttl = tonumber(ARGV[2])
        
        -- 检查锁的拥有者并续期
        local current = redis.call('GET', key)
        if current == identifier then
            redis.call('EXPIRE', key, ttl)
            return 1
        else
            return 0
        end
        """
        
        # 加载脚本
        self.acquire_sha = self.script_manager.load_script(self.acquire_script)
        self.release_sha = self.script_manager.load_script(self.release_script)
        self.renew_sha = self.script_manager.load_script(self.renew_script)
    
    def acquire(self, lock_name: str, identifier: str, ttl: int = 30) -> bool:
        """获取锁"""
        try:
            result = self.script_manager.execute_script_sha(
                self.acquire_sha, 
                keys=[f"lock:{lock_name}"], 
                args=[identifier, ttl]
            )
            return bool(result)
        except Exception as e:
            self.script_manager.logger.error(f"Failed to acquire lock {lock_name}: {e}")
            return False
    
    def release(self, lock_name: str, identifier: str) -> bool:
        """释放锁"""
        try:
            result = self.script_manager.execute_script_sha(
                self.release_sha,
                keys=[f"lock:{lock_name}"],
                args=[identifier]
            )
            return bool(result)
        except Exception as e:
            self.script_manager.logger.error(f"Failed to release lock {lock_name}: {e}")
            return False
    
    def renew(self, lock_name: str, identifier: str, ttl: int = 30) -> bool:
        """续期锁"""
        try:
            result = self.script_manager.execute_script_sha(
                self.renew_sha,
                keys=[f"lock:{lock_name}"],
                args=[identifier, ttl]
            )
            return bool(result)
        except Exception as e:
            self.script_manager.logger.error(f"Failed to renew lock {lock_name}: {e}")
            return False

### 9.3.4 限流器实现

```python
class RateLimiter:
    def __init__(self, redis_client: redis.Redis):
        self.redis_client = redis_client
        self.script_manager = LuaScriptManager(redis_client)
        
        # 滑动窗口限流脚本
        self.sliding_window_script = """
        local key = KEYS[1]
        local window = tonumber(ARGV[1])
        local limit = tonumber(ARGV[2])
        local current_time = tonumber(ARGV[3])
        
        -- 清理过期的记录
        redis.call('ZREMRANGEBYSCORE', key, 0, current_time - window)
        
        -- 获取当前窗口内的请求数
        local current_requests = redis.call('ZCARD', key)
        
        if current_requests < limit then
            -- 添加当前请求
            redis.call('ZADD', key, current_time, current_time)
            redis.call('EXPIRE', key, window)
            return {1, limit - current_requests - 1}
        else
            return {0, 0}
        end
        """
        
        # 令牌桶限流脚本
        self.token_bucket_script = """
        local key = KEYS[1]
        local capacity = tonumber(ARGV[1])
        local tokens = tonumber(ARGV[2])
        local interval = tonumber(ARGV[3])
        local current_time = tonumber(ARGV[4])
        
        -- 获取桶的状态
        local bucket = redis.call('HMGET', key, 'tokens', 'last_refill')
        local current_tokens = tonumber(bucket[1]) or capacity
        local last_refill = tonumber(bucket[2]) or current_time
        
        -- 计算需要添加的令牌数
        local time_passed = current_time - last_refill
        local tokens_to_add = math.floor(time_passed / interval) * tokens
        current_tokens = math.min(capacity, current_tokens + tokens_to_add)
        
        if current_tokens >= 1 then
            -- 消费一个令牌
            current_tokens = current_tokens - 1
            redis.call('HMSET', key, 'tokens', current_tokens, 'last_refill', current_time)
            redis.call('EXPIRE', key, interval * capacity)
            return {1, current_tokens}
        else
            -- 更新最后填充时间
            redis.call('HMSET', key, 'tokens', current_tokens, 'last_refill', current_time)
            redis.call('EXPIRE', key, interval * capacity)
            return {0, current_tokens}
        end
        """
        
        # 加载脚本
        self.sliding_window_sha = self.script_manager.load_script(self.sliding_window_script)
        self.token_bucket_sha = self.script_manager.load_script(self.token_bucket_script)
    
    def check_sliding_window(self, identifier: str, window_seconds: int, 
                           limit: int) -> Dict[str, Any]:
        """滑动窗口限流检查"""
        key = f"rate_limit:sliding:{identifier}"
        current_time = int(time.time() * 1000)  # 毫秒时间戳
        
        try:
            result = self.script_manager.execute_script_sha(
                self.sliding_window_sha,
                keys=[key],
                args=[window_seconds * 1000, limit, current_time]
            )
            
            allowed, remaining = result
            return {
                'allowed': bool(allowed),
                'remaining': remaining,
                'reset_time': current_time + (window_seconds * 1000)
            }
        except Exception as e:
            self.script_manager.logger.error(f"Sliding window rate limit check failed: {e}")
            return {'allowed': False, 'remaining': 0, 'reset_time': current_time}
    
    def check_token_bucket(self, identifier: str, capacity: int, 
                          refill_tokens: int, refill_interval: int) -> Dict[str, Any]:
        """令牌桶限流检查"""
        key = f"rate_limit:bucket:{identifier}"
        current_time = int(time.time())
        
        try:
            result = self.script_manager.execute_script_sha(
                self.token_bucket_sha,
                keys=[key],
                args=[capacity, refill_tokens, refill_interval, current_time]
            )
            
            allowed, tokens_remaining = result
            return {
                'allowed': bool(allowed),
                'tokens_remaining': tokens_remaining,
                'capacity': capacity
            }
        except Exception as e:
            self.script_manager.logger.error(f"Token bucket rate limit check failed: {e}")
            return {'allowed': False, 'tokens_remaining': 0, 'capacity': capacity}

### 9.3.5 原子计数器

```python
class AtomicCounter:
    def __init__(self, redis_client: redis.Redis):
        self.redis_client = redis_client
        self.script_manager = LuaScriptManager(redis_client)
        
        # 带限制的递增脚本
        self.increment_with_limit_script = """
        local key = KEYS[1]
        local increment = tonumber(ARGV[1])
        local max_value = tonumber(ARGV[2])
        local ttl = tonumber(ARGV[3])
        
        local current = tonumber(redis.call('GET', key)) or 0
        local new_value = current + increment
        
        if new_value <= max_value then
            redis.call('SET', key, new_value)
            if ttl > 0 then
                redis.call('EXPIRE', key, ttl)
            end
            return {1, new_value}
        else
            return {0, current}
        end
        """
        
        # 条件递减脚本
        self.decrement_if_positive_script = """
        local key = KEYS[1]
        local decrement = tonumber(ARGV[1])
        
        local current = tonumber(redis.call('GET', key)) or 0
        
        if current >= decrement then
            local new_value = current - decrement
            redis.call('SET', key, new_value)
            return {1, new_value}
        else
            return {0, current}
        end
        """
        
        # 批量操作脚本
        self.batch_operations_script = """
        local operations = cjson.decode(ARGV[1])
        local results = {}
        
        for i, op in ipairs(operations) do
            local key = op.key
            local action = op.action
            local value = tonumber(op.value) or 0
            
            if action == 'incr' then
                local result = redis.call('INCRBY', key, value)
                table.insert(results, {key = key, action = action, result = result})
            elseif action == 'decr' then
                local current = tonumber(redis.call('GET', key)) or 0
                if current >= value then
                    local result = redis.call('DECRBY', key, value)
                    table.insert(results, {key = key, action = action, result = result, success = true})
                else
                    table.insert(results, {key = key, action = action, result = current, success = false})
                end
            elseif action == 'set' then
                redis.call('SET', key, value)
                table.insert(results, {key = key, action = action, result = value})
            end
        end
        
        return cjson.encode(results)
        """
        
        # 加载脚本
        self.increment_with_limit_sha = self.script_manager.load_script(self.increment_with_limit_script)
        self.decrement_if_positive_sha = self.script_manager.load_script(self.decrement_if_positive_script)
        self.batch_operations_sha = self.script_manager.load_script(self.batch_operations_script)
    
    def increment_with_limit(self, counter_name: str, increment: int = 1, 
                           max_value: int = float('inf'), ttl: int = 0) -> Dict[str, Any]:
        """带限制的递增"""
        key = f"counter:{counter_name}"
        
        try:
            result = self.script_manager.execute_script_sha(
                self.increment_with_limit_sha,
                keys=[key],
                args=[increment, max_value, ttl]
            )
            
            success, value = result
            return {
                'success': bool(success),
                'value': value,
                'counter_name': counter_name
            }
        except Exception as e:
            self.script_manager.logger.error(f"Increment with limit failed: {e}")
            return {'success': False, 'value': 0, 'counter_name': counter_name}
    
    def decrement_if_positive(self, counter_name: str, decrement: int = 1) -> Dict[str, Any]:
        """条件递减(仅当值为正时)"""
        key = f"counter:{counter_name}"
        
        try:
            result = self.script_manager.execute_script_sha(
                self.decrement_if_positive_sha,
                keys=[key],
                args=[decrement]
            )
            
            success, value = result
            return {
                'success': bool(success),
                'value': value,
                'counter_name': counter_name
            }
        except Exception as e:
            self.script_manager.logger.error(f"Conditional decrement failed: {e}")
            return {'success': False, 'value': 0, 'counter_name': counter_name}
    
    def batch_operations(self, operations: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """批量操作"""
        try:
            # 转换操作格式
            formatted_ops = []
            for op in operations:
                formatted_ops.append({
                    'key': f"counter:{op['counter_name']}",
                    'action': op['action'],
                    'value': op.get('value', 1)
                })
            
            result = self.script_manager.execute_script_sha(
                self.batch_operations_sha,
                keys=[],
                args=[json.dumps(formatted_ops)]
            )
            
            return json.loads(result)
        except Exception as e:
            self.script_manager.logger.error(f"Batch operations failed: {e}")
            return []
    
    def get_value(self, counter_name: str) -> int:
        """获取计数器值"""
        key = f"counter:{counter_name}"
        value = self.redis_client.get(key)
        return int(value) if value is not None else 0
    
    def reset(self, counter_name: str, value: int = 0):
        """重置计数器"""
        key = f"counter:{counter_name}"
        self.redis_client.set(key, value)

# 使用示例
if __name__ == "__main__":
    redis_client = redis.Redis(decode_responses=True)
    
    # 分布式锁示例
    lock = DistributedLock(redis_client)
    
    import uuid
    identifier = str(uuid.uuid4())
    
    if lock.acquire("resource1", identifier, 30):
        print("Lock acquired successfully")
        
        # 模拟工作
        time.sleep(2)
        
        # 续期锁
        if lock.renew("resource1", identifier, 30):
            print("Lock renewed")
        
        # 释放锁
        if lock.release("resource1", identifier):
            print("Lock released successfully")
    else:
        print("Failed to acquire lock")
    
    # 限流器示例
    rate_limiter = RateLimiter(redis_client)
    
    # 滑动窗口限流测试
    for i in range(5):
        result = rate_limiter.check_sliding_window("user123", 60, 3)  # 每分钟最多3次
        print(f"Request {i+1}: {result}")
        time.sleep(1)
    
    # 令牌桶限流测试
    for i in range(5):
        result = rate_limiter.check_token_bucket("api_endpoint", 10, 1, 1)  # 容量10,每秒补充1个
        print(f"API call {i+1}: {result}")
        time.sleep(0.5)
    
    # 原子计数器示例
    counter = AtomicCounter(redis_client)
    
    # 带限制的递增
    result = counter.increment_with_limit("daily_requests", 1, 1000)  # 每日最多1000次
    print(f"Increment result: {result}")
    
    # 条件递减
    result = counter.decrement_if_positive("available_tokens", 5)
    print(f"Decrement result: {result}")
    
    # 批量操作
    operations = [
        {'counter_name': 'counter1', 'action': 'incr', 'value': 5},
        {'counter_name': 'counter2', 'action': 'set', 'value': 100},
        {'counter_name': 'counter3', 'action': 'decr', 'value': 3}
    ]
    
    results = counter.batch_operations(operations)
    print(f"Batch operations results: {results}")

9.4 性能优化

9.4.1 脚本优化策略

class OptimizedScriptManager:
    def __init__(self, redis_client: redis.Redis):
        self.redis_client = redis_client
        self.script_cache = {}
        self.performance_stats = {}
        self.logger = logging.getLogger(__name__)
    
    def register_script(self, name: str, script: str, 
                       cache_locally: bool = True) -> str:
        """注册并优化脚本"""
        # 脚本优化:移除注释和多余空格
        optimized_script = self._optimize_script(script)
        
        # 加载到Redis
        sha1 = self.redis_client.script_load(optimized_script)
        
        # 本地缓存
        if cache_locally:
            self.script_cache[name] = {
                'sha1': sha1,
                'script': optimized_script,
                'original_script': script,
                'load_time': time.time(),
                'execution_count': 0,
                'total_execution_time': 0
            }
        
        self.logger.info(f"Script '{name}' registered with SHA1: {sha1}")
        return sha1
    
    def _optimize_script(self, script: str) -> str:
        """优化Lua脚本"""
        lines = script.split('\n')
        optimized_lines = []
        
        for line in lines:
            # 移除注释
            if '--' in line:
                line = line[:line.index('--')]
            
            # 移除前后空格
            line = line.strip()
            
            # 跳过空行
            if line:
                optimized_lines.append(line)
        
        return '\n'.join(optimized_lines)
    
    def execute_registered_script(self, name: str, keys: List[str] = None, 
                                args: List[Any] = None) -> Any:
        """执行已注册的脚本"""
        if name not in self.script_cache:
            raise ValueError(f"Script '{name}' not registered")
        
        script_info = self.script_cache[name]
        keys = keys or []
        args = args or []
        
        start_time = time.time()
        
        try:
            result = self.redis_client.evalsha(
                script_info['sha1'], 
                len(keys), 
                *(keys + args)
            )
            
            # 更新性能统计
            execution_time = time.time() - start_time
            script_info['execution_count'] += 1
            script_info['total_execution_time'] += execution_time
            
            return result
            
        except redis.NoScriptError:
            # 脚本不存在,重新加载
            self.logger.warning(f"Script '{name}' not found in Redis, reloading...")
            sha1 = self.redis_client.script_load(script_info['script'])
            script_info['sha1'] = sha1
            
            result = self.redis_client.evalsha(sha1, len(keys), *(keys + args))
            
            # 更新性能统计
            execution_time = time.time() - start_time
            script_info['execution_count'] += 1
            script_info['total_execution_time'] += execution_time
            
            return result
    
    def get_performance_stats(self, name: str = None) -> Dict[str, Any]:
        """获取性能统计"""
        if name:
            if name not in self.script_cache:
                return {'error': f"Script '{name}' not found"}
            
            script_info = self.script_cache[name]
            avg_time = (script_info['total_execution_time'] / 
                       script_info['execution_count'] 
                       if script_info['execution_count'] > 0 else 0)
            
            return {
                'name': name,
                'sha1': script_info['sha1'],
                'execution_count': script_info['execution_count'],
                'total_execution_time': script_info['total_execution_time'],
                'average_execution_time': avg_time,
                'load_time': script_info['load_time']
            }
        else:
            # 返回所有脚本的统计
            stats = {}
            for script_name, script_info in self.script_cache.items():
                avg_time = (script_info['total_execution_time'] / 
                           script_info['execution_count'] 
                           if script_info['execution_count'] > 0 else 0)
                
                stats[script_name] = {
                    'execution_count': script_info['execution_count'],
                    'average_execution_time': avg_time
                }
            
            return stats
    
    def preload_scripts(self, script_names: List[str] = None):
        """预加载脚本"""
        scripts_to_load = script_names or list(self.script_cache.keys())
        
        for name in scripts_to_load:
            if name in self.script_cache:
                script_info = self.script_cache[name]
                if not self.redis_client.script_exists(script_info['sha1'])[0]:
                    sha1 = self.redis_client.script_load(script_info['script'])
                    script_info['sha1'] = sha1
                    self.logger.info(f"Preloaded script '{name}'")

### 9.4.2 批量操作优化

```python
class BatchProcessor:
    def __init__(self, redis_client: redis.Redis, batch_size: int = 100):
        self.redis_client = redis_client
        self.batch_size = batch_size
        self.script_manager = OptimizedScriptManager(redis_client)
        
        # 批量处理脚本
        self.batch_process_script = """
        local operations = cjson.decode(ARGV[1])
        local results = {}
        
        for i, op in ipairs(operations) do
            local op_type = op.type
            local key = op.key
            local value = op.value
            local result
            
            if op_type == 'set' then
                result = redis.call('SET', key, value)
            elseif op_type == 'get' then
                result = redis.call('GET', key)
            elseif op_type == 'incr' then
                result = redis.call('INCRBY', key, tonumber(value) or 1)
            elseif op_type == 'decr' then
                result = redis.call('DECRBY', key, tonumber(value) or 1)
            elseif op_type == 'del' then
                result = redis.call('DEL', key)
            elseif op_type == 'expire' then
                result = redis.call('EXPIRE', key, tonumber(value))
            elseif op_type == 'hset' then
                result = redis.call('HSET', key, op.field, value)
            elseif op_type == 'hget' then
                result = redis.call('HGET', key, op.field)
            elseif op_type == 'lpush' then
                result = redis.call('LPUSH', key, value)
            elseif op_type == 'rpush' then
                result = redis.call('RPUSH', key, value)
            elseif op_type == 'sadd' then
                result = redis.call('SADD', key, value)
            elseif op_type == 'zadd' then
                result = redis.call('ZADD', key, tonumber(op.score), value)
            else
                result = 'UNSUPPORTED_OPERATION'
            end
            
            table.insert(results, {
                index = i,
                type = op_type,
                key = key,
                result = result
            })
        end
        
        return cjson.encode(results)
        """
        
        self.script_manager.register_script('batch_process', self.batch_process_script)
    
    def process_batch(self, operations: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """批量处理操作"""
        if not operations:
            return []
        
        # 分批处理
        all_results = []
        
        for i in range(0, len(operations), self.batch_size):
            batch = operations[i:i + self.batch_size]
            
            try:
                result = self.script_manager.execute_registered_script(
                    'batch_process',
                    keys=[],
                    args=[json.dumps(batch)]
                )
                
                batch_results = json.loads(result)
                all_results.extend(batch_results)
                
            except Exception as e:
                self.script_manager.logger.error(f"Batch processing failed: {e}")
                # 为失败的批次添加错误结果
                for j, op in enumerate(batch):
                    all_results.append({
                        'index': i + j + 1,
                        'type': op.get('type', 'unknown'),
                        'key': op.get('key', ''),
                        'result': f'ERROR: {str(e)}'
                    })
        
        return all_results
    
    def bulk_set(self, key_value_pairs: Dict[str, Any], ttl: int = None) -> List[Dict[str, Any]]:
        """批量设置键值"""
        operations = []
        
        for key, value in key_value_pairs.items():
            operations.append({
                'type': 'set',
                'key': key,
                'value': str(value)
            })
            
            if ttl:
                operations.append({
                    'type': 'expire',
                    'key': key,
                    'value': ttl
                })
        
        return self.process_batch(operations)
    
    def bulk_get(self, keys: List[str]) -> Dict[str, Any]:
        """批量获取值"""
        operations = []
        
        for key in keys:
            operations.append({
                'type': 'get',
                'key': key,
                'value': None
            })
        
        results = self.process_batch(operations)
        
        # 转换为字典格式
        result_dict = {}
        for result in results:
            result_dict[result['key']] = result['result']
        
        return result_dict

### 9.4.3 连接池优化

```python
class OptimizedRedisManager:
    def __init__(self, host: str = 'localhost', port: int = 6379, 
                 db: int = 0, max_connections: int = 50):
        # 创建连接池
        self.pool = redis.ConnectionPool(
            host=host,
            port=port,
            db=db,
            max_connections=max_connections,
            retry_on_timeout=True,
            socket_keepalive=True,
            socket_keepalive_options={},
            health_check_interval=30
        )
        
        self.redis_client = redis.Redis(connection_pool=self.pool, decode_responses=True)
        self.script_manager = OptimizedScriptManager(self.redis_client)
        self.batch_processor = BatchProcessor(self.redis_client)
        
        # 性能监控
        self.connection_stats = {
            'total_commands': 0,
            'total_time': 0,
            'error_count': 0,
            'start_time': time.time()
        }
        
        self.logger = logging.getLogger(__name__)
    
    def execute_with_monitoring(self, func, *args, **kwargs):
        """带监控的执行"""
        start_time = time.time()
        
        try:
            result = func(*args, **kwargs)
            
            # 更新统计
            execution_time = time.time() - start_time
            self.connection_stats['total_commands'] += 1
            self.connection_stats['total_time'] += execution_time
            
            return result
            
        except Exception as e:
            self.connection_stats['error_count'] += 1
            self.logger.error(f"Redis operation failed: {e}")
            raise
    
    def get_connection_stats(self) -> Dict[str, Any]:
        """获取连接统计"""
        uptime = time.time() - self.connection_stats['start_time']
        avg_time = (self.connection_stats['total_time'] / 
                   self.connection_stats['total_commands'] 
                   if self.connection_stats['total_commands'] > 0 else 0)
        
        pool_info = {
            'created_connections': self.pool.created_connections,
            'available_connections': len(self.pool._available_connections),
            'in_use_connections': len(self.pool._in_use_connections)
        }
        
        return {
            'uptime_seconds': uptime,
            'total_commands': self.connection_stats['total_commands'],
            'total_execution_time': self.connection_stats['total_time'],
            'average_execution_time': avg_time,
            'error_count': self.connection_stats['error_count'],
            'commands_per_second': self.connection_stats['total_commands'] / uptime if uptime > 0 else 0,
            'pool_info': pool_info
        }
    
    def health_check(self) -> Dict[str, Any]:
        """健康检查"""
        try:
            start_time = time.time()
            
            # 基本连接测试
            ping_result = self.redis_client.ping()
            ping_time = time.time() - start_time
            
            # 获取Redis信息
            info = self.redis_client.info()
            
            # 检查内存使用
            memory_usage = info.get('used_memory', 0)
            max_memory = info.get('maxmemory', 0)
            memory_usage_percent = (memory_usage / max_memory * 100) if max_memory > 0 else 0
            
            return {
                'status': 'healthy' if ping_result else 'unhealthy',
                'ping_time_ms': ping_time * 1000,
                'redis_version': info.get('redis_version', 'unknown'),
                'connected_clients': info.get('connected_clients', 0),
                'memory_usage_bytes': memory_usage,
                'memory_usage_percent': memory_usage_percent,
                'uptime_seconds': info.get('uptime_in_seconds', 0),
                'total_commands_processed': info.get('total_commands_processed', 0)
            }
            
        except Exception as e:
            return {
                'status': 'unhealthy',
                'error': str(e)
            }
    
    def close(self):
        """关闭连接池"""
        self.pool.disconnect()

## 9.5 监控和调试

### 9.5.1 事务监控

```python
class TransactionMonitor:
    def __init__(self, redis_client: redis.Redis):
        self.redis_client = redis_client
        self.transaction_log = []
        self.logger = logging.getLogger(__name__)
    
    def monitor_transaction(self, transaction_func, *args, **kwargs):
        """监控事务执行"""
        transaction_id = f"tx_{int(time.time() * 1000000)}"
        start_time = time.time()
        
        transaction_info = {
            'transaction_id': transaction_id,
            'start_time': start_time,
            'function_name': transaction_func.__name__,
            'args': str(args),
            'kwargs': str(kwargs),
            'status': 'started'
        }
        
        try:
            result = transaction_func(*args, **kwargs)
            
            end_time = time.time()
            transaction_info.update({
                'end_time': end_time,
                'duration': end_time - start_time,
                'status': 'completed',
                'result': str(result)[:1000]  # 限制结果长度
            })
            
            self.logger.info(f"Transaction {transaction_id} completed in {end_time - start_time:.3f}s")
            return result
            
        except redis.WatchError as e:
            end_time = time.time()
            transaction_info.update({
                'end_time': end_time,
                'duration': end_time - start_time,
                'status': 'watch_error',
                'error': str(e)
            })
            
            self.logger.warning(f"Transaction {transaction_id} failed due to watch error: {e}")
            raise
            
        except Exception as e:
            end_time = time.time()
            transaction_info.update({
                'end_time': end_time,
                'duration': end_time - start_time,
                'status': 'error',
                'error': str(e)
            })
            
            self.logger.error(f"Transaction {transaction_id} failed: {e}")
            raise
            
        finally:
            self.transaction_log.append(transaction_info)
            
            # 保留最近1000个事务记录
            if len(self.transaction_log) > 1000:
                self.transaction_log.pop(0)
    
    def get_transaction_stats(self, hours: int = 24) -> Dict[str, Any]:
        """获取事务统计"""
        cutoff_time = time.time() - (hours * 3600)
        recent_transactions = [
            tx for tx in self.transaction_log 
            if tx['start_time'] >= cutoff_time
        ]
        
        if not recent_transactions:
            return {'error': 'No transactions in the specified time period'}
        
        # 统计信息
        total_transactions = len(recent_transactions)
        completed_transactions = len([tx for tx in recent_transactions if tx['status'] == 'completed'])
        failed_transactions = len([tx for tx in recent_transactions if tx['status'] in ['error', 'watch_error']])
        watch_errors = len([tx for tx in recent_transactions if tx['status'] == 'watch_error'])
        
        # 计算平均执行时间
        completed_durations = [tx['duration'] for tx in recent_transactions if tx.get('duration') and tx['status'] == 'completed']
        avg_duration = sum(completed_durations) / len(completed_durations) if completed_durations else 0
        max_duration = max(completed_durations) if completed_durations else 0
        min_duration = min(completed_durations) if completed_durations else 0
        
        # 按函数名统计
        function_stats = {}
        for tx in recent_transactions:
            func_name = tx['function_name']
            if func_name not in function_stats:
                function_stats[func_name] = {'count': 0, 'success': 0, 'failed': 0}
            
            function_stats[func_name]['count'] += 1
            if tx['status'] == 'completed':
                function_stats[func_name]['success'] += 1
            else:
                function_stats[func_name]['failed'] += 1
        
        return {
            'period_hours': hours,
            'total_transactions': total_transactions,
            'completed_transactions': completed_transactions,
            'failed_transactions': failed_transactions,
            'watch_errors': watch_errors,
            'success_rate': (completed_transactions / total_transactions * 100) if total_transactions > 0 else 0,
            'average_duration': avg_duration,
            'max_duration': max_duration,
            'min_duration': min_duration,
            'function_stats': function_stats
        }
    
    def get_recent_failures(self, limit: int = 10) -> List[Dict[str, Any]]:
        """获取最近的失败事务"""
        failed_transactions = [
            tx for tx in self.transaction_log 
            if tx['status'] in ['error', 'watch_error']
        ]
        
        # 按时间倒序排列
        failed_transactions.sort(key=lambda x: x['start_time'], reverse=True)
        
        return failed_transactions[:limit]

### 9.5.2 脚本调试工具

```python
class ScriptDebugger:
    def __init__(self, redis_client: redis.Redis):
        self.redis_client = redis_client
        self.execution_log = []
        self.logger = logging.getLogger(__name__)
    
    def debug_script(self, script: str, keys: List[str] = None, 
                    args: List[Any] = None, enable_logging: bool = True) -> Dict[str, Any]:
        """调试脚本执行"""
        keys = keys or []
        args = args or []
        
        # 添加调试信息到脚本
        if enable_logging:
            debug_script = self._add_debug_logging(script)
        else:
            debug_script = script
        
        execution_id = f"script_{int(time.time() * 1000000)}"
        start_time = time.time()
        
        execution_info = {
            'execution_id': execution_id,
            'start_time': start_time,
            'script_hash': hashlib.md5(script.encode()).hexdigest(),
            'keys': keys,
            'args': args,
            'status': 'started'
        }
        
        try:
            result = self.redis_client.eval(debug_script, len(keys), *(keys + args))
            
            end_time = time.time()
            execution_info.update({
                'end_time': end_time,
                'duration': end_time - start_time,
                'status': 'completed',
                'result': result
            })
            
            self.logger.info(f"Script {execution_id} completed in {end_time - start_time:.3f}s")
            
            return {
                'success': True,
                'result': result,
                'execution_info': execution_info
            }
            
        except redis.ResponseError as e:
            end_time = time.time()
            execution_info.update({
                'end_time': end_time,
                'duration': end_time - start_time,
                'status': 'error',
                'error': str(e)
            })
            
            self.logger.error(f"Script {execution_id} failed: {e}")
            
            return {
                'success': False,
                'error': str(e),
                'execution_info': execution_info,
                'debug_info': self._parse_lua_error(str(e))
            }
            
        finally:
            self.execution_log.append(execution_info)
            
            # 保留最近500个执行记录
            if len(self.execution_log) > 500:
                self.execution_log.pop(0)
    
    def _add_debug_logging(self, script: str) -> str:
        """为脚本添加调试日志"""
        debug_prefix = """
        local function debug_log(message)
            redis.log(redis.LOG_NOTICE, "[DEBUG] " .. tostring(message))
        end
        
        debug_log("Script execution started")
        debug_log("Keys: " .. table.concat(KEYS, ", "))
        debug_log("Args: " .. table.concat(ARGV, ", "))
        
        """
        
        debug_suffix = """
        
        debug_log("Script execution completed")
        """
        
        return debug_prefix + script + debug_suffix
    
    def _parse_lua_error(self, error_message: str) -> Dict[str, Any]:
        """解析Lua错误信息"""
        debug_info = {
            'error_type': 'unknown',
            'line_number': None,
            'description': error_message
        }
        
        # 解析常见错误类型
        if 'attempt to call' in error_message:
            debug_info['error_type'] = 'function_call_error'
        elif 'attempt to index' in error_message:
            debug_info['error_type'] = 'index_error'
        elif 'attempt to perform arithmetic' in error_message:
            debug_info['error_type'] = 'arithmetic_error'
        elif 'syntax error' in error_message:
            debug_info['error_type'] = 'syntax_error'
        
        # 尝试提取行号
        import re
        line_match = re.search(r'line (\d+)', error_message)
        if line_match:
            debug_info['line_number'] = int(line_match.group(1))
        
        return debug_info
    
    def validate_script_syntax(self, script: str) -> Dict[str, Any]:
        """验证脚本语法"""
        try:
            # 尝试加载脚本
            sha1 = self.redis_client.script_load(script)
            
            return {
                'valid': True,
                'sha1': sha1,
                'message': 'Script syntax is valid'
            }
            
        except redis.ResponseError as e:
            return {
                'valid': False,
                'error': str(e),
                'debug_info': self._parse_lua_error(str(e))
            }
    
    def profile_script(self, script: str, keys: List[str] = None, 
                      args: List[Any] = None, iterations: int = 100) -> Dict[str, Any]:
        """性能分析脚本"""
        keys = keys or []
        args = args or []
        
        # 加载脚本
        sha1 = self.redis_client.script_load(script)
        
        execution_times = []
        errors = 0
        
        for i in range(iterations):
            start_time = time.time()
            
            try:
                self.redis_client.evalsha(sha1, len(keys), *(keys + args))
                execution_time = time.time() - start_time
                execution_times.append(execution_time)
                
            except Exception as e:
                errors += 1
                self.logger.warning(f"Script execution {i+1} failed: {e}")
        
        if execution_times:
            avg_time = sum(execution_times) / len(execution_times)
            min_time = min(execution_times)
            max_time = max(execution_times)
            
            # 计算百分位数
            sorted_times = sorted(execution_times)
            p50 = sorted_times[len(sorted_times) // 2]
            p95 = sorted_times[int(len(sorted_times) * 0.95)]
            p99 = sorted_times[int(len(sorted_times) * 0.99)]
            
            return {
                'iterations': iterations,
                'successful_executions': len(execution_times),
                'errors': errors,
                'average_time': avg_time,
                'min_time': min_time,
                'max_time': max_time,
                'p50_time': p50,
                'p95_time': p95,
                'p99_time': p99,
                'executions_per_second': len(execution_times) / sum(execution_times) if sum(execution_times) > 0 else 0
            }
        else:
            return {
                'iterations': iterations,
                'successful_executions': 0,
                'errors': errors,
                'message': 'All executions failed'
            }

# 使用示例
if __name__ == "__main__":
    redis_client = redis.Redis(decode_responses=True)
    
    # 优化的Redis管理器
    redis_manager = OptimizedRedisManager()
    
    # 健康检查
    health = redis_manager.health_check()
    print(f"Redis health: {health}")
    
    # 事务监控示例
    transaction_monitor = TransactionMonitor(redis_client)
    
    def sample_transaction():
        pipe = redis_client.pipeline()
        pipe.multi()
        pipe.set("test_key", "test_value")
        pipe.incr("counter")
        return pipe.execute()
    
    # 监控事务执行
    result = transaction_monitor.monitor_transaction(sample_transaction)
    print(f"Transaction result: {result}")
    
    # 获取事务统计
    stats = transaction_monitor.get_transaction_stats(hours=1)
    print(f"Transaction stats: {stats}")
    
    # 脚本调试示例
    script_debugger = ScriptDebugger(redis_client)
    
    test_script = """
    local key = KEYS[1]
    local value = ARGV[1]
    
    redis.call('SET', key, value)
    local result = redis.call('GET', key)
    
    return result
    """
    
    # 验证脚本语法
    syntax_check = script_debugger.validate_script_syntax(test_script)
    print(f"Syntax check: {syntax_check}")
    
    # 调试脚本执行
    debug_result = script_debugger.debug_script(
        test_script, 
        keys=["debug_key"], 
        args=["debug_value"]
    )
    print(f"Debug result: {debug_result}")
    
    # 性能分析
    profile_result = script_debugger.profile_script(
        test_script,
        keys=["profile_key"],
        args=["profile_value"],
        iterations=50
    )
    print(f"Profile result: {profile_result}")
    
    # 批量处理示例
    batch_processor = BatchProcessor(redis_client)
    
    operations = [
        {'type': 'set', 'key': 'batch_key1', 'value': 'value1'},
        {'type': 'set', 'key': 'batch_key2', 'value': 'value2'},
        {'type': 'incr', 'key': 'batch_counter', 'value': 5},
        {'type': 'get', 'key': 'batch_key1', 'value': None}
    ]
    
    batch_results = batch_processor.process_batch(operations)
    print(f"Batch results: {batch_results}")
    
    # 获取连接统计
    connection_stats = redis_manager.get_connection_stats()
    print(f"Connection stats: {connection_stats}")
    
    # 清理
    redis_manager.close()

9.6 最佳实践

9.6.1 事务设计原则

  1. 保持事务简短

    • 减少事务中的操作数量
    • 避免长时间运行的操作
    • 及时释放资源
  2. 合理使用WATCH

    • 只监视必要的键
    • 避免监视过多键导致冲突
    • 实现重试机制
  3. 错误处理

    • 处理WatchError异常
    • 实现指数退避重试
    • 记录详细的错误日志

9.6.2 脚本开发指南

  1. 脚本优化

    • 避免复杂的循环和递归
    • 使用局部变量减少内存使用
    • 合理使用Redis命令
  2. 错误处理

    • 验证输入参数
    • 处理Redis命令错误
    • 返回有意义的错误信息
  3. 性能考虑

    • 缓存脚本SHA1值
    • 使用EVALSHA而不是EVAL
    • 监控脚本执行时间

9.6.3 安全考虑

  1. 输入验证

    • 验证键名和参数
    • 防止脚本注入
    • 限制脚本权限
  2. 资源限制

    • 设置脚本执行超时
    • 限制脚本内存使用
    • 监控脚本执行频率

9.7 总结

Redis事务和脚本是实现复杂业务逻辑的重要工具:

事务特点: - 提供基本的原子性保证 - 支持乐观锁机制(WATCH) - 适合简单的多命令操作 - 需要处理并发冲突

Lua脚本优势: - 真正的原子性执行 - 支持复杂逻辑控制 - 减少网络往返次数 - 可缓存和重复使用

适用场景: - 银行转账和支付系统 - 库存管理和预留 - 分布式锁实现 - 限流和计数器 - 缓存一致性维护

性能优化要点: - 使用连接池管理连接 - 缓存脚本SHA1值 - 实现批量操作 - 监控执行性能

最佳实践: - 保持操作简短高效 - 实现健壮的错误处理 - 做好性能监控和调试 - 考虑安全和资源限制

通过合理使用Redis事务和脚本,可以构建高性能、高可靠的分布式应用系统。

参考资料