9.1 概述
Redis事务和脚本是Redis提供的两种重要机制,用于确保多个操作的原子性执行。虽然Redis是单线程的,但在某些场景下仍需要保证一组操作的完整性和一致性。
9.1.1 Redis事务特点
- 原子性:事务中的所有命令要么全部执行,要么全部不执行
- 隔离性:事务执行期间不会被其他客户端的命令打断
- 一致性:事务执行前后,数据库状态保持一致
- 无持久性保证:Redis事务不提供持久性保证,依赖于Redis的持久化机制
9.1.2 Lua脚本特点
- 原子性执行:整个脚本作为一个原子操作执行
- 服务器端执行:减少网络往返次数
- 可复用性:脚本可以被缓存和重复使用
- 丰富的逻辑控制:支持条件判断、循环等复杂逻辑
9.1.3 应用场景
- 银行转账:确保扣款和入账操作的原子性
- 库存管理:防止超卖问题
- 计数器操作:确保计数的准确性
- 缓存更新:保证缓存和数据的一致性
- 分布式锁:实现复杂的锁机制
9.2 Redis事务
9.2.1 基本命令
MULTI - 开始事务
MULTI
EXEC - 执行事务
EXEC
DISCARD - 取消事务
DISCARD
WATCH - 监视键
WATCH key [key ...]
UNWATCH - 取消监视
UNWATCH
9.2.2 Python实现示例
import redis
import time
import threading
from typing import List, Dict, Any, Optional
from decimal import Decimal
import logging
class RedisTransaction:
def __init__(self, redis_client: redis.Redis):
self.redis_client = redis_client
self.logger = logging.getLogger(__name__)
def simple_transaction(self, operations: List[tuple]) -> List[Any]:
"""执行简单事务"""
pipe = self.redis_client.pipeline()
try:
# 开始事务
pipe.multi()
# 添加操作
for operation in operations:
command = operation[0]
args = operation[1:]
getattr(pipe, command)(*args)
# 执行事务
results = pipe.execute()
return results
except redis.WatchError:
self.logger.warning("Transaction failed due to watched key modification")
return None
except Exception as e:
self.logger.error(f"Transaction failed: {e}")
pipe.discard()
return None
def watched_transaction(self, watch_keys: List[str], operations: List[tuple],
max_retries: int = 3) -> Optional[List[Any]]:
"""执行带监视的事务"""
for attempt in range(max_retries):
try:
pipe = self.redis_client.pipeline()
# 监视键
pipe.watch(*watch_keys)
# 检查条件(可选)
current_values = {}
for key in watch_keys:
current_values[key] = self.redis_client.get(key)
# 开始事务
pipe.multi()
# 添加操作
for operation in operations:
command = operation[0]
args = operation[1:]
getattr(pipe, command)(*args)
# 执行事务
results = pipe.execute()
return results
except redis.WatchError:
self.logger.warning(f"Transaction attempt {attempt + 1} failed due to watched key modification")
if attempt == max_retries - 1:
self.logger.error("Transaction failed after maximum retries")
return None
time.sleep(0.01 * (2 ** attempt)) # 指数退避
except Exception as e:
self.logger.error(f"Transaction failed: {e}")
return None
return None
### 9.2.3 银行转账系统
```python
class BankTransferSystem:
def __init__(self, redis_client: redis.Redis):
self.redis_client = redis_client
self.transaction_handler = RedisTransaction(redis_client)
self.logger = logging.getLogger(__name__)
def create_account(self, account_id: str, initial_balance: Decimal = Decimal('0')):
"""创建账户"""
account_key = f"account:{account_id}:balance"
info_key = f"account:{account_id}:info"
pipe = self.redis_client.pipeline()
pipe.set(account_key, str(initial_balance))
pipe.hset(info_key, mapping={
'created_at': time.time(),
'status': 'active',
'currency': 'USD'
})
pipe.execute()
self.logger.info(f"Account {account_id} created with balance {initial_balance}")
def get_balance(self, account_id: str) -> Optional[Decimal]:
"""获取账户余额"""
account_key = f"account:{account_id}:balance"
balance_str = self.redis_client.get(account_key)
if balance_str is None:
return None
return Decimal(balance_str)
def transfer(self, from_account: str, to_account: str, amount: Decimal,
description: str = "") -> Dict[str, Any]:
"""转账操作"""
if amount <= 0:
return {
'success': False,
'error': 'Transfer amount must be positive',
'transaction_id': None
}
from_key = f"account:{from_account}:balance"
to_key = f"account:{to_account}:balance"
transaction_id = f"tx:{int(time.time() * 1000000)}"
# 检查账户是否存在
if not self.redis_client.exists(from_key) or not self.redis_client.exists(to_key):
return {
'success': False,
'error': 'One or both accounts do not exist',
'transaction_id': None
}
# 使用监视事务确保原子性
for attempt in range(5):
try:
pipe = self.redis_client.pipeline()
# 监视相关账户
pipe.watch(from_key, to_key)
# 获取当前余额
from_balance = Decimal(self.redis_client.get(from_key) or '0')
to_balance = Decimal(self.redis_client.get(to_key) or '0')
# 检查余额是否足够
if from_balance < amount:
pipe.unwatch()
return {
'success': False,
'error': 'Insufficient balance',
'transaction_id': None,
'current_balance': from_balance
}
# 计算新余额
new_from_balance = from_balance - amount
new_to_balance = to_balance + amount
# 开始事务
pipe.multi()
# 更新余额
pipe.set(from_key, str(new_from_balance))
pipe.set(to_key, str(new_to_balance))
# 记录交易历史
transaction_data = {
'transaction_id': transaction_id,
'from_account': from_account,
'to_account': to_account,
'amount': str(amount),
'description': description,
'timestamp': time.time(),
'status': 'completed'
}
pipe.hset(f"transaction:{transaction_id}", mapping=transaction_data)
# 添加到账户交易历史
pipe.lpush(f"account:{from_account}:transactions", transaction_id)
pipe.lpush(f"account:{to_account}:transactions", transaction_id)
# 限制交易历史长度
pipe.ltrim(f"account:{from_account}:transactions", 0, 999)
pipe.ltrim(f"account:{to_account}:transactions", 0, 999)
# 执行事务
results = pipe.execute()
self.logger.info(f"Transfer completed: {from_account} -> {to_account}, amount: {amount}")
return {
'success': True,
'transaction_id': transaction_id,
'from_balance': new_from_balance,
'to_balance': new_to_balance,
'timestamp': transaction_data['timestamp']
}
except redis.WatchError:
self.logger.warning(f"Transfer attempt {attempt + 1} failed due to concurrent modification")
if attempt == 4:
return {
'success': False,
'error': 'Transfer failed due to concurrent operations',
'transaction_id': None
}
time.sleep(0.01 * (2 ** attempt))
except Exception as e:
self.logger.error(f"Transfer failed: {e}")
return {
'success': False,
'error': str(e),
'transaction_id': None
}
return {
'success': False,
'error': 'Transfer failed after maximum retries',
'transaction_id': None
}
def get_transaction_history(self, account_id: str, limit: int = 10) -> List[Dict[str, Any]]:
"""获取交易历史"""
transaction_ids = self.redis_client.lrange(f"account:{account_id}:transactions", 0, limit - 1)
transactions = []
for tx_id in transaction_ids:
tx_data = self.redis_client.hgetall(f"transaction:{tx_id}")
if tx_data:
transactions.append(tx_data)
return transactions
def get_account_info(self, account_id: str) -> Dict[str, Any]:
"""获取账户信息"""
balance = self.get_balance(account_id)
if balance is None:
return {'error': 'Account not found'}
info = self.redis_client.hgetall(f"account:{account_id}:info")
recent_transactions = self.get_transaction_history(account_id, 5)
return {
'account_id': account_id,
'balance': balance,
'info': info,
'recent_transactions': recent_transactions
}
### 9.2.4 库存管理系统
```python
class InventoryManager:
def __init__(self, redis_client: redis.Redis):
self.redis_client = redis_client
self.logger = logging.getLogger(__name__)
def add_product(self, product_id: str, initial_stock: int, price: Decimal,
name: str = "", description: str = ""):
"""添加产品"""
product_key = f"product:{product_id}"
stock_key = f"stock:{product_id}"
pipe = self.redis_client.pipeline()
pipe.hset(product_key, mapping={
'name': name,
'description': description,
'price': str(price),
'created_at': time.time()
})
pipe.set(stock_key, initial_stock)
pipe.execute()
self.logger.info(f"Product {product_id} added with stock {initial_stock}")
def get_stock(self, product_id: str) -> Optional[int]:
"""获取库存"""
stock_key = f"stock:{product_id}"
stock = self.redis_client.get(stock_key)
return int(stock) if stock is not None else None
def reserve_stock(self, product_id: str, quantity: int,
customer_id: str, ttl: int = 300) -> Dict[str, Any]:
"""预留库存"""
if quantity <= 0:
return {
'success': False,
'error': 'Quantity must be positive'
}
stock_key = f"stock:{product_id}"
reservation_id = f"reservation:{int(time.time() * 1000000)}"
for attempt in range(5):
try:
pipe = self.redis_client.pipeline()
# 监视库存
pipe.watch(stock_key)
# 获取当前库存
current_stock = self.redis_client.get(stock_key)
if current_stock is None:
pipe.unwatch()
return {
'success': False,
'error': 'Product not found'
}
current_stock = int(current_stock)
# 检查库存是否足够
if current_stock < quantity:
pipe.unwatch()
return {
'success': False,
'error': 'Insufficient stock',
'available_stock': current_stock
}
# 开始事务
pipe.multi()
# 减少库存
pipe.set(stock_key, current_stock - quantity)
# 创建预留记录
reservation_data = {
'product_id': product_id,
'customer_id': customer_id,
'quantity': quantity,
'created_at': time.time(),
'status': 'reserved'
}
pipe.hset(reservation_id, mapping=reservation_data)
pipe.expire(reservation_id, ttl)
# 添加到客户预留列表
pipe.lpush(f"customer:{customer_id}:reservations", reservation_id)
pipe.expire(f"customer:{customer_id}:reservations", ttl + 60)
# 执行事务
results = pipe.execute()
self.logger.info(f"Stock reserved: {product_id}, quantity: {quantity}, customer: {customer_id}")
return {
'success': True,
'reservation_id': reservation_id,
'remaining_stock': current_stock - quantity,
'expires_at': time.time() + ttl
}
except redis.WatchError:
self.logger.warning(f"Stock reservation attempt {attempt + 1} failed due to concurrent modification")
if attempt == 4:
return {
'success': False,
'error': 'Reservation failed due to concurrent operations'
}
time.sleep(0.01 * (2 ** attempt))
except Exception as e:
self.logger.error(f"Stock reservation failed: {e}")
return {
'success': False,
'error': str(e)
}
return {
'success': False,
'error': 'Reservation failed after maximum retries'
}
def confirm_reservation(self, reservation_id: str) -> Dict[str, Any]:
"""确认预留(完成购买)"""
reservation_data = self.redis_client.hgetall(reservation_id)
if not reservation_data:
return {
'success': False,
'error': 'Reservation not found or expired'
}
if reservation_data.get('status') != 'reserved':
return {
'success': False,
'error': 'Reservation already processed'
}
# 更新预留状态
pipe = self.redis_client.pipeline()
pipe.hset(reservation_id, 'status', 'confirmed')
pipe.hset(reservation_id, 'confirmed_at', time.time())
pipe.persist(reservation_id) # 移除过期时间
pipe.execute()
self.logger.info(f"Reservation confirmed: {reservation_id}")
return {
'success': True,
'reservation_id': reservation_id,
'product_id': reservation_data['product_id'],
'quantity': int(reservation_data['quantity'])
}
def cancel_reservation(self, reservation_id: str) -> Dict[str, Any]:
"""取消预留(恢复库存)"""
reservation_data = self.redis_client.hgetall(reservation_id)
if not reservation_data:
return {
'success': False,
'error': 'Reservation not found or expired'
}
if reservation_data.get('status') != 'reserved':
return {
'success': False,
'error': 'Reservation already processed'
}
product_id = reservation_data['product_id']
quantity = int(reservation_data['quantity'])
stock_key = f"stock:{product_id}"
for attempt in range(3):
try:
pipe = self.redis_client.pipeline()
# 监视库存
pipe.watch(stock_key)
# 获取当前库存
current_stock = int(self.redis_client.get(stock_key) or 0)
# 开始事务
pipe.multi()
# 恢复库存
pipe.set(stock_key, current_stock + quantity)
# 更新预留状态
pipe.hset(reservation_id, 'status', 'cancelled')
pipe.hset(reservation_id, 'cancelled_at', time.time())
# 执行事务
results = pipe.execute()
self.logger.info(f"Reservation cancelled: {reservation_id}, stock restored: {quantity}")
return {
'success': True,
'reservation_id': reservation_id,
'restored_quantity': quantity,
'new_stock': current_stock + quantity
}
except redis.WatchError:
self.logger.warning(f"Cancel reservation attempt {attempt + 1} failed")
if attempt == 2:
return {
'success': False,
'error': 'Failed to cancel reservation due to concurrent operations'
}
time.sleep(0.01)
except Exception as e:
self.logger.error(f"Cancel reservation failed: {e}")
return {
'success': False,
'error': str(e)
}
return {
'success': False,
'error': 'Cancel reservation failed after retries'
}
def get_product_info(self, product_id: str) -> Dict[str, Any]:
"""获取产品信息"""
product_data = self.redis_client.hgetall(f"product:{product_id}")
stock = self.get_stock(product_id)
if not product_data:
return {'error': 'Product not found'}
return {
'product_id': product_id,
'stock': stock,
**product_data
}
# 使用示例
if __name__ == "__main__":
redis_client = redis.Redis(decode_responses=True)
# 银行转账系统示例
bank = BankTransferSystem(redis_client)
# 创建账户
bank.create_account("alice", Decimal('1000.00'))
bank.create_account("bob", Decimal('500.00'))
# 执行转账
result = bank.transfer("alice", "bob", Decimal('100.00'), "Payment for services")
print(f"Transfer result: {result}")
# 查看账户信息
alice_info = bank.get_account_info("alice")
bob_info = bank.get_account_info("bob")
print(f"Alice balance: {alice_info['balance']}")
print(f"Bob balance: {bob_info['balance']}")
# 库存管理系统示例
inventory = InventoryManager(redis_client)
# 添加产品
inventory.add_product("laptop001", 50, Decimal('999.99'), "Gaming Laptop", "High-performance gaming laptop")
# 预留库存
reservation = inventory.reserve_stock("laptop001", 2, "customer123")
print(f"Reservation result: {reservation}")
# 确认预留
if reservation['success']:
confirm_result = inventory.confirm_reservation(reservation['reservation_id'])
print(f"Confirmation result: {confirm_result}")
# 查看产品信息
product_info = inventory.get_product_info("laptop001")
print(f"Product info: {product_info}")
9.3 Lua脚本
9.3.1 基本命令
EVAL - 执行脚本
EVAL script numkeys key [key ...] arg [arg ...]
EVALSHA - 执行缓存的脚本
EVALSHA sha1 numkeys key [key ...] arg [arg ...]
SCRIPT LOAD - 加载脚本
SCRIPT LOAD script
SCRIPT EXISTS - 检查脚本是否存在
SCRIPT EXISTS sha1 [sha1 ...]
SCRIPT FLUSH - 清除脚本缓存
SCRIPT FLUSH
SCRIPT KILL - 终止脚本
SCRIPT KILL
9.3.2 Python实现示例
import redis
import hashlib
import json
from typing import List, Dict, Any, Optional, Union
import time
class LuaScriptManager:
def __init__(self, redis_client: redis.Redis):
self.redis_client = redis_client
self.script_cache = {}
self.logger = logging.getLogger(__name__)
def load_script(self, script: str) -> str:
"""加载脚本并返回SHA1"""
sha1 = self.redis_client.script_load(script)
self.script_cache[sha1] = script
return sha1
def execute_script(self, script: str, keys: List[str] = None,
args: List[Any] = None) -> Any:
"""执行脚本"""
keys = keys or []
args = args or []
try:
return self.redis_client.eval(script, len(keys), *(keys + args))
except redis.ResponseError as e:
self.logger.error(f"Script execution failed: {e}")
raise
def execute_script_sha(self, sha1: str, keys: List[str] = None,
args: List[Any] = None) -> Any:
"""执行缓存的脚本"""
keys = keys or []
args = args or []
try:
return self.redis_client.evalsha(sha1, len(keys), *(keys + args))
except redis.NoScriptError:
# 脚本不存在,重新加载
if sha1 in self.script_cache:
self.load_script(self.script_cache[sha1])
return self.redis_client.evalsha(sha1, len(keys), *(keys + args))
else:
raise ValueError(f"Script with SHA1 {sha1} not found in cache")
def script_exists(self, sha1: str) -> bool:
"""检查脚本是否存在"""
return self.redis_client.script_exists(sha1)[0]
def flush_scripts(self):
"""清除所有脚本缓存"""
self.redis_client.script_flush()
self.script_cache.clear()
### 9.3.3 分布式锁实现
```python
class DistributedLock:
def __init__(self, redis_client: redis.Redis):
self.redis_client = redis_client
self.script_manager = LuaScriptManager(redis_client)
# 获取锁的脚本
self.acquire_script = """
local key = KEYS[1]
local identifier = ARGV[1]
local ttl = tonumber(ARGV[2])
-- 尝试获取锁
local result = redis.call('SET', key, identifier, 'NX', 'EX', ttl)
if result then
return 1
else
return 0
end
"""
# 释放锁的脚本
self.release_script = """
local key = KEYS[1]
local identifier = ARGV[1]
-- 检查锁的拥有者
local current = redis.call('GET', key)
if current == identifier then
redis.call('DEL', key)
return 1
else
return 0
end
"""
# 续期锁的脚本
self.renew_script = """
local key = KEYS[1]
local identifier = ARGV[1]
local ttl = tonumber(ARGV[2])
-- 检查锁的拥有者并续期
local current = redis.call('GET', key)
if current == identifier then
redis.call('EXPIRE', key, ttl)
return 1
else
return 0
end
"""
# 加载脚本
self.acquire_sha = self.script_manager.load_script(self.acquire_script)
self.release_sha = self.script_manager.load_script(self.release_script)
self.renew_sha = self.script_manager.load_script(self.renew_script)
def acquire(self, lock_name: str, identifier: str, ttl: int = 30) -> bool:
"""获取锁"""
try:
result = self.script_manager.execute_script_sha(
self.acquire_sha,
keys=[f"lock:{lock_name}"],
args=[identifier, ttl]
)
return bool(result)
except Exception as e:
self.script_manager.logger.error(f"Failed to acquire lock {lock_name}: {e}")
return False
def release(self, lock_name: str, identifier: str) -> bool:
"""释放锁"""
try:
result = self.script_manager.execute_script_sha(
self.release_sha,
keys=[f"lock:{lock_name}"],
args=[identifier]
)
return bool(result)
except Exception as e:
self.script_manager.logger.error(f"Failed to release lock {lock_name}: {e}")
return False
def renew(self, lock_name: str, identifier: str, ttl: int = 30) -> bool:
"""续期锁"""
try:
result = self.script_manager.execute_script_sha(
self.renew_sha,
keys=[f"lock:{lock_name}"],
args=[identifier, ttl]
)
return bool(result)
except Exception as e:
self.script_manager.logger.error(f"Failed to renew lock {lock_name}: {e}")
return False
### 9.3.4 限流器实现
```python
class RateLimiter:
def __init__(self, redis_client: redis.Redis):
self.redis_client = redis_client
self.script_manager = LuaScriptManager(redis_client)
# 滑动窗口限流脚本
self.sliding_window_script = """
local key = KEYS[1]
local window = tonumber(ARGV[1])
local limit = tonumber(ARGV[2])
local current_time = tonumber(ARGV[3])
-- 清理过期的记录
redis.call('ZREMRANGEBYSCORE', key, 0, current_time - window)
-- 获取当前窗口内的请求数
local current_requests = redis.call('ZCARD', key)
if current_requests < limit then
-- 添加当前请求
redis.call('ZADD', key, current_time, current_time)
redis.call('EXPIRE', key, window)
return {1, limit - current_requests - 1}
else
return {0, 0}
end
"""
# 令牌桶限流脚本
self.token_bucket_script = """
local key = KEYS[1]
local capacity = tonumber(ARGV[1])
local tokens = tonumber(ARGV[2])
local interval = tonumber(ARGV[3])
local current_time = tonumber(ARGV[4])
-- 获取桶的状态
local bucket = redis.call('HMGET', key, 'tokens', 'last_refill')
local current_tokens = tonumber(bucket[1]) or capacity
local last_refill = tonumber(bucket[2]) or current_time
-- 计算需要添加的令牌数
local time_passed = current_time - last_refill
local tokens_to_add = math.floor(time_passed / interval) * tokens
current_tokens = math.min(capacity, current_tokens + tokens_to_add)
if current_tokens >= 1 then
-- 消费一个令牌
current_tokens = current_tokens - 1
redis.call('HMSET', key, 'tokens', current_tokens, 'last_refill', current_time)
redis.call('EXPIRE', key, interval * capacity)
return {1, current_tokens}
else
-- 更新最后填充时间
redis.call('HMSET', key, 'tokens', current_tokens, 'last_refill', current_time)
redis.call('EXPIRE', key, interval * capacity)
return {0, current_tokens}
end
"""
# 加载脚本
self.sliding_window_sha = self.script_manager.load_script(self.sliding_window_script)
self.token_bucket_sha = self.script_manager.load_script(self.token_bucket_script)
def check_sliding_window(self, identifier: str, window_seconds: int,
limit: int) -> Dict[str, Any]:
"""滑动窗口限流检查"""
key = f"rate_limit:sliding:{identifier}"
current_time = int(time.time() * 1000) # 毫秒时间戳
try:
result = self.script_manager.execute_script_sha(
self.sliding_window_sha,
keys=[key],
args=[window_seconds * 1000, limit, current_time]
)
allowed, remaining = result
return {
'allowed': bool(allowed),
'remaining': remaining,
'reset_time': current_time + (window_seconds * 1000)
}
except Exception as e:
self.script_manager.logger.error(f"Sliding window rate limit check failed: {e}")
return {'allowed': False, 'remaining': 0, 'reset_time': current_time}
def check_token_bucket(self, identifier: str, capacity: int,
refill_tokens: int, refill_interval: int) -> Dict[str, Any]:
"""令牌桶限流检查"""
key = f"rate_limit:bucket:{identifier}"
current_time = int(time.time())
try:
result = self.script_manager.execute_script_sha(
self.token_bucket_sha,
keys=[key],
args=[capacity, refill_tokens, refill_interval, current_time]
)
allowed, tokens_remaining = result
return {
'allowed': bool(allowed),
'tokens_remaining': tokens_remaining,
'capacity': capacity
}
except Exception as e:
self.script_manager.logger.error(f"Token bucket rate limit check failed: {e}")
return {'allowed': False, 'tokens_remaining': 0, 'capacity': capacity}
### 9.3.5 原子计数器
```python
class AtomicCounter:
def __init__(self, redis_client: redis.Redis):
self.redis_client = redis_client
self.script_manager = LuaScriptManager(redis_client)
# 带限制的递增脚本
self.increment_with_limit_script = """
local key = KEYS[1]
local increment = tonumber(ARGV[1])
local max_value = tonumber(ARGV[2])
local ttl = tonumber(ARGV[3])
local current = tonumber(redis.call('GET', key)) or 0
local new_value = current + increment
if new_value <= max_value then
redis.call('SET', key, new_value)
if ttl > 0 then
redis.call('EXPIRE', key, ttl)
end
return {1, new_value}
else
return {0, current}
end
"""
# 条件递减脚本
self.decrement_if_positive_script = """
local key = KEYS[1]
local decrement = tonumber(ARGV[1])
local current = tonumber(redis.call('GET', key)) or 0
if current >= decrement then
local new_value = current - decrement
redis.call('SET', key, new_value)
return {1, new_value}
else
return {0, current}
end
"""
# 批量操作脚本
self.batch_operations_script = """
local operations = cjson.decode(ARGV[1])
local results = {}
for i, op in ipairs(operations) do
local key = op.key
local action = op.action
local value = tonumber(op.value) or 0
if action == 'incr' then
local result = redis.call('INCRBY', key, value)
table.insert(results, {key = key, action = action, result = result})
elseif action == 'decr' then
local current = tonumber(redis.call('GET', key)) or 0
if current >= value then
local result = redis.call('DECRBY', key, value)
table.insert(results, {key = key, action = action, result = result, success = true})
else
table.insert(results, {key = key, action = action, result = current, success = false})
end
elseif action == 'set' then
redis.call('SET', key, value)
table.insert(results, {key = key, action = action, result = value})
end
end
return cjson.encode(results)
"""
# 加载脚本
self.increment_with_limit_sha = self.script_manager.load_script(self.increment_with_limit_script)
self.decrement_if_positive_sha = self.script_manager.load_script(self.decrement_if_positive_script)
self.batch_operations_sha = self.script_manager.load_script(self.batch_operations_script)
def increment_with_limit(self, counter_name: str, increment: int = 1,
max_value: int = float('inf'), ttl: int = 0) -> Dict[str, Any]:
"""带限制的递增"""
key = f"counter:{counter_name}"
try:
result = self.script_manager.execute_script_sha(
self.increment_with_limit_sha,
keys=[key],
args=[increment, max_value, ttl]
)
success, value = result
return {
'success': bool(success),
'value': value,
'counter_name': counter_name
}
except Exception as e:
self.script_manager.logger.error(f"Increment with limit failed: {e}")
return {'success': False, 'value': 0, 'counter_name': counter_name}
def decrement_if_positive(self, counter_name: str, decrement: int = 1) -> Dict[str, Any]:
"""条件递减(仅当值为正时)"""
key = f"counter:{counter_name}"
try:
result = self.script_manager.execute_script_sha(
self.decrement_if_positive_sha,
keys=[key],
args=[decrement]
)
success, value = result
return {
'success': bool(success),
'value': value,
'counter_name': counter_name
}
except Exception as e:
self.script_manager.logger.error(f"Conditional decrement failed: {e}")
return {'success': False, 'value': 0, 'counter_name': counter_name}
def batch_operations(self, operations: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""批量操作"""
try:
# 转换操作格式
formatted_ops = []
for op in operations:
formatted_ops.append({
'key': f"counter:{op['counter_name']}",
'action': op['action'],
'value': op.get('value', 1)
})
result = self.script_manager.execute_script_sha(
self.batch_operations_sha,
keys=[],
args=[json.dumps(formatted_ops)]
)
return json.loads(result)
except Exception as e:
self.script_manager.logger.error(f"Batch operations failed: {e}")
return []
def get_value(self, counter_name: str) -> int:
"""获取计数器值"""
key = f"counter:{counter_name}"
value = self.redis_client.get(key)
return int(value) if value is not None else 0
def reset(self, counter_name: str, value: int = 0):
"""重置计数器"""
key = f"counter:{counter_name}"
self.redis_client.set(key, value)
# 使用示例
if __name__ == "__main__":
redis_client = redis.Redis(decode_responses=True)
# 分布式锁示例
lock = DistributedLock(redis_client)
import uuid
identifier = str(uuid.uuid4())
if lock.acquire("resource1", identifier, 30):
print("Lock acquired successfully")
# 模拟工作
time.sleep(2)
# 续期锁
if lock.renew("resource1", identifier, 30):
print("Lock renewed")
# 释放锁
if lock.release("resource1", identifier):
print("Lock released successfully")
else:
print("Failed to acquire lock")
# 限流器示例
rate_limiter = RateLimiter(redis_client)
# 滑动窗口限流测试
for i in range(5):
result = rate_limiter.check_sliding_window("user123", 60, 3) # 每分钟最多3次
print(f"Request {i+1}: {result}")
time.sleep(1)
# 令牌桶限流测试
for i in range(5):
result = rate_limiter.check_token_bucket("api_endpoint", 10, 1, 1) # 容量10,每秒补充1个
print(f"API call {i+1}: {result}")
time.sleep(0.5)
# 原子计数器示例
counter = AtomicCounter(redis_client)
# 带限制的递增
result = counter.increment_with_limit("daily_requests", 1, 1000) # 每日最多1000次
print(f"Increment result: {result}")
# 条件递减
result = counter.decrement_if_positive("available_tokens", 5)
print(f"Decrement result: {result}")
# 批量操作
operations = [
{'counter_name': 'counter1', 'action': 'incr', 'value': 5},
{'counter_name': 'counter2', 'action': 'set', 'value': 100},
{'counter_name': 'counter3', 'action': 'decr', 'value': 3}
]
results = counter.batch_operations(operations)
print(f"Batch operations results: {results}")
9.4 性能优化
9.4.1 脚本优化策略
class OptimizedScriptManager:
def __init__(self, redis_client: redis.Redis):
self.redis_client = redis_client
self.script_cache = {}
self.performance_stats = {}
self.logger = logging.getLogger(__name__)
def register_script(self, name: str, script: str,
cache_locally: bool = True) -> str:
"""注册并优化脚本"""
# 脚本优化:移除注释和多余空格
optimized_script = self._optimize_script(script)
# 加载到Redis
sha1 = self.redis_client.script_load(optimized_script)
# 本地缓存
if cache_locally:
self.script_cache[name] = {
'sha1': sha1,
'script': optimized_script,
'original_script': script,
'load_time': time.time(),
'execution_count': 0,
'total_execution_time': 0
}
self.logger.info(f"Script '{name}' registered with SHA1: {sha1}")
return sha1
def _optimize_script(self, script: str) -> str:
"""优化Lua脚本"""
lines = script.split('\n')
optimized_lines = []
for line in lines:
# 移除注释
if '--' in line:
line = line[:line.index('--')]
# 移除前后空格
line = line.strip()
# 跳过空行
if line:
optimized_lines.append(line)
return '\n'.join(optimized_lines)
def execute_registered_script(self, name: str, keys: List[str] = None,
args: List[Any] = None) -> Any:
"""执行已注册的脚本"""
if name not in self.script_cache:
raise ValueError(f"Script '{name}' not registered")
script_info = self.script_cache[name]
keys = keys or []
args = args or []
start_time = time.time()
try:
result = self.redis_client.evalsha(
script_info['sha1'],
len(keys),
*(keys + args)
)
# 更新性能统计
execution_time = time.time() - start_time
script_info['execution_count'] += 1
script_info['total_execution_time'] += execution_time
return result
except redis.NoScriptError:
# 脚本不存在,重新加载
self.logger.warning(f"Script '{name}' not found in Redis, reloading...")
sha1 = self.redis_client.script_load(script_info['script'])
script_info['sha1'] = sha1
result = self.redis_client.evalsha(sha1, len(keys), *(keys + args))
# 更新性能统计
execution_time = time.time() - start_time
script_info['execution_count'] += 1
script_info['total_execution_time'] += execution_time
return result
def get_performance_stats(self, name: str = None) -> Dict[str, Any]:
"""获取性能统计"""
if name:
if name not in self.script_cache:
return {'error': f"Script '{name}' not found"}
script_info = self.script_cache[name]
avg_time = (script_info['total_execution_time'] /
script_info['execution_count']
if script_info['execution_count'] > 0 else 0)
return {
'name': name,
'sha1': script_info['sha1'],
'execution_count': script_info['execution_count'],
'total_execution_time': script_info['total_execution_time'],
'average_execution_time': avg_time,
'load_time': script_info['load_time']
}
else:
# 返回所有脚本的统计
stats = {}
for script_name, script_info in self.script_cache.items():
avg_time = (script_info['total_execution_time'] /
script_info['execution_count']
if script_info['execution_count'] > 0 else 0)
stats[script_name] = {
'execution_count': script_info['execution_count'],
'average_execution_time': avg_time
}
return stats
def preload_scripts(self, script_names: List[str] = None):
"""预加载脚本"""
scripts_to_load = script_names or list(self.script_cache.keys())
for name in scripts_to_load:
if name in self.script_cache:
script_info = self.script_cache[name]
if not self.redis_client.script_exists(script_info['sha1'])[0]:
sha1 = self.redis_client.script_load(script_info['script'])
script_info['sha1'] = sha1
self.logger.info(f"Preloaded script '{name}'")
### 9.4.2 批量操作优化
```python
class BatchProcessor:
def __init__(self, redis_client: redis.Redis, batch_size: int = 100):
self.redis_client = redis_client
self.batch_size = batch_size
self.script_manager = OptimizedScriptManager(redis_client)
# 批量处理脚本
self.batch_process_script = """
local operations = cjson.decode(ARGV[1])
local results = {}
for i, op in ipairs(operations) do
local op_type = op.type
local key = op.key
local value = op.value
local result
if op_type == 'set' then
result = redis.call('SET', key, value)
elseif op_type == 'get' then
result = redis.call('GET', key)
elseif op_type == 'incr' then
result = redis.call('INCRBY', key, tonumber(value) or 1)
elseif op_type == 'decr' then
result = redis.call('DECRBY', key, tonumber(value) or 1)
elseif op_type == 'del' then
result = redis.call('DEL', key)
elseif op_type == 'expire' then
result = redis.call('EXPIRE', key, tonumber(value))
elseif op_type == 'hset' then
result = redis.call('HSET', key, op.field, value)
elseif op_type == 'hget' then
result = redis.call('HGET', key, op.field)
elseif op_type == 'lpush' then
result = redis.call('LPUSH', key, value)
elseif op_type == 'rpush' then
result = redis.call('RPUSH', key, value)
elseif op_type == 'sadd' then
result = redis.call('SADD', key, value)
elseif op_type == 'zadd' then
result = redis.call('ZADD', key, tonumber(op.score), value)
else
result = 'UNSUPPORTED_OPERATION'
end
table.insert(results, {
index = i,
type = op_type,
key = key,
result = result
})
end
return cjson.encode(results)
"""
self.script_manager.register_script('batch_process', self.batch_process_script)
def process_batch(self, operations: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""批量处理操作"""
if not operations:
return []
# 分批处理
all_results = []
for i in range(0, len(operations), self.batch_size):
batch = operations[i:i + self.batch_size]
try:
result = self.script_manager.execute_registered_script(
'batch_process',
keys=[],
args=[json.dumps(batch)]
)
batch_results = json.loads(result)
all_results.extend(batch_results)
except Exception as e:
self.script_manager.logger.error(f"Batch processing failed: {e}")
# 为失败的批次添加错误结果
for j, op in enumerate(batch):
all_results.append({
'index': i + j + 1,
'type': op.get('type', 'unknown'),
'key': op.get('key', ''),
'result': f'ERROR: {str(e)}'
})
return all_results
def bulk_set(self, key_value_pairs: Dict[str, Any], ttl: int = None) -> List[Dict[str, Any]]:
"""批量设置键值"""
operations = []
for key, value in key_value_pairs.items():
operations.append({
'type': 'set',
'key': key,
'value': str(value)
})
if ttl:
operations.append({
'type': 'expire',
'key': key,
'value': ttl
})
return self.process_batch(operations)
def bulk_get(self, keys: List[str]) -> Dict[str, Any]:
"""批量获取值"""
operations = []
for key in keys:
operations.append({
'type': 'get',
'key': key,
'value': None
})
results = self.process_batch(operations)
# 转换为字典格式
result_dict = {}
for result in results:
result_dict[result['key']] = result['result']
return result_dict
### 9.4.3 连接池优化
```python
class OptimizedRedisManager:
def __init__(self, host: str = 'localhost', port: int = 6379,
db: int = 0, max_connections: int = 50):
# 创建连接池
self.pool = redis.ConnectionPool(
host=host,
port=port,
db=db,
max_connections=max_connections,
retry_on_timeout=True,
socket_keepalive=True,
socket_keepalive_options={},
health_check_interval=30
)
self.redis_client = redis.Redis(connection_pool=self.pool, decode_responses=True)
self.script_manager = OptimizedScriptManager(self.redis_client)
self.batch_processor = BatchProcessor(self.redis_client)
# 性能监控
self.connection_stats = {
'total_commands': 0,
'total_time': 0,
'error_count': 0,
'start_time': time.time()
}
self.logger = logging.getLogger(__name__)
def execute_with_monitoring(self, func, *args, **kwargs):
"""带监控的执行"""
start_time = time.time()
try:
result = func(*args, **kwargs)
# 更新统计
execution_time = time.time() - start_time
self.connection_stats['total_commands'] += 1
self.connection_stats['total_time'] += execution_time
return result
except Exception as e:
self.connection_stats['error_count'] += 1
self.logger.error(f"Redis operation failed: {e}")
raise
def get_connection_stats(self) -> Dict[str, Any]:
"""获取连接统计"""
uptime = time.time() - self.connection_stats['start_time']
avg_time = (self.connection_stats['total_time'] /
self.connection_stats['total_commands']
if self.connection_stats['total_commands'] > 0 else 0)
pool_info = {
'created_connections': self.pool.created_connections,
'available_connections': len(self.pool._available_connections),
'in_use_connections': len(self.pool._in_use_connections)
}
return {
'uptime_seconds': uptime,
'total_commands': self.connection_stats['total_commands'],
'total_execution_time': self.connection_stats['total_time'],
'average_execution_time': avg_time,
'error_count': self.connection_stats['error_count'],
'commands_per_second': self.connection_stats['total_commands'] / uptime if uptime > 0 else 0,
'pool_info': pool_info
}
def health_check(self) -> Dict[str, Any]:
"""健康检查"""
try:
start_time = time.time()
# 基本连接测试
ping_result = self.redis_client.ping()
ping_time = time.time() - start_time
# 获取Redis信息
info = self.redis_client.info()
# 检查内存使用
memory_usage = info.get('used_memory', 0)
max_memory = info.get('maxmemory', 0)
memory_usage_percent = (memory_usage / max_memory * 100) if max_memory > 0 else 0
return {
'status': 'healthy' if ping_result else 'unhealthy',
'ping_time_ms': ping_time * 1000,
'redis_version': info.get('redis_version', 'unknown'),
'connected_clients': info.get('connected_clients', 0),
'memory_usage_bytes': memory_usage,
'memory_usage_percent': memory_usage_percent,
'uptime_seconds': info.get('uptime_in_seconds', 0),
'total_commands_processed': info.get('total_commands_processed', 0)
}
except Exception as e:
return {
'status': 'unhealthy',
'error': str(e)
}
def close(self):
"""关闭连接池"""
self.pool.disconnect()
## 9.5 监控和调试
### 9.5.1 事务监控
```python
class TransactionMonitor:
def __init__(self, redis_client: redis.Redis):
self.redis_client = redis_client
self.transaction_log = []
self.logger = logging.getLogger(__name__)
def monitor_transaction(self, transaction_func, *args, **kwargs):
"""监控事务执行"""
transaction_id = f"tx_{int(time.time() * 1000000)}"
start_time = time.time()
transaction_info = {
'transaction_id': transaction_id,
'start_time': start_time,
'function_name': transaction_func.__name__,
'args': str(args),
'kwargs': str(kwargs),
'status': 'started'
}
try:
result = transaction_func(*args, **kwargs)
end_time = time.time()
transaction_info.update({
'end_time': end_time,
'duration': end_time - start_time,
'status': 'completed',
'result': str(result)[:1000] # 限制结果长度
})
self.logger.info(f"Transaction {transaction_id} completed in {end_time - start_time:.3f}s")
return result
except redis.WatchError as e:
end_time = time.time()
transaction_info.update({
'end_time': end_time,
'duration': end_time - start_time,
'status': 'watch_error',
'error': str(e)
})
self.logger.warning(f"Transaction {transaction_id} failed due to watch error: {e}")
raise
except Exception as e:
end_time = time.time()
transaction_info.update({
'end_time': end_time,
'duration': end_time - start_time,
'status': 'error',
'error': str(e)
})
self.logger.error(f"Transaction {transaction_id} failed: {e}")
raise
finally:
self.transaction_log.append(transaction_info)
# 保留最近1000个事务记录
if len(self.transaction_log) > 1000:
self.transaction_log.pop(0)
def get_transaction_stats(self, hours: int = 24) -> Dict[str, Any]:
"""获取事务统计"""
cutoff_time = time.time() - (hours * 3600)
recent_transactions = [
tx for tx in self.transaction_log
if tx['start_time'] >= cutoff_time
]
if not recent_transactions:
return {'error': 'No transactions in the specified time period'}
# 统计信息
total_transactions = len(recent_transactions)
completed_transactions = len([tx for tx in recent_transactions if tx['status'] == 'completed'])
failed_transactions = len([tx for tx in recent_transactions if tx['status'] in ['error', 'watch_error']])
watch_errors = len([tx for tx in recent_transactions if tx['status'] == 'watch_error'])
# 计算平均执行时间
completed_durations = [tx['duration'] for tx in recent_transactions if tx.get('duration') and tx['status'] == 'completed']
avg_duration = sum(completed_durations) / len(completed_durations) if completed_durations else 0
max_duration = max(completed_durations) if completed_durations else 0
min_duration = min(completed_durations) if completed_durations else 0
# 按函数名统计
function_stats = {}
for tx in recent_transactions:
func_name = tx['function_name']
if func_name not in function_stats:
function_stats[func_name] = {'count': 0, 'success': 0, 'failed': 0}
function_stats[func_name]['count'] += 1
if tx['status'] == 'completed':
function_stats[func_name]['success'] += 1
else:
function_stats[func_name]['failed'] += 1
return {
'period_hours': hours,
'total_transactions': total_transactions,
'completed_transactions': completed_transactions,
'failed_transactions': failed_transactions,
'watch_errors': watch_errors,
'success_rate': (completed_transactions / total_transactions * 100) if total_transactions > 0 else 0,
'average_duration': avg_duration,
'max_duration': max_duration,
'min_duration': min_duration,
'function_stats': function_stats
}
def get_recent_failures(self, limit: int = 10) -> List[Dict[str, Any]]:
"""获取最近的失败事务"""
failed_transactions = [
tx for tx in self.transaction_log
if tx['status'] in ['error', 'watch_error']
]
# 按时间倒序排列
failed_transactions.sort(key=lambda x: x['start_time'], reverse=True)
return failed_transactions[:limit]
### 9.5.2 脚本调试工具
```python
class ScriptDebugger:
def __init__(self, redis_client: redis.Redis):
self.redis_client = redis_client
self.execution_log = []
self.logger = logging.getLogger(__name__)
def debug_script(self, script: str, keys: List[str] = None,
args: List[Any] = None, enable_logging: bool = True) -> Dict[str, Any]:
"""调试脚本执行"""
keys = keys or []
args = args or []
# 添加调试信息到脚本
if enable_logging:
debug_script = self._add_debug_logging(script)
else:
debug_script = script
execution_id = f"script_{int(time.time() * 1000000)}"
start_time = time.time()
execution_info = {
'execution_id': execution_id,
'start_time': start_time,
'script_hash': hashlib.md5(script.encode()).hexdigest(),
'keys': keys,
'args': args,
'status': 'started'
}
try:
result = self.redis_client.eval(debug_script, len(keys), *(keys + args))
end_time = time.time()
execution_info.update({
'end_time': end_time,
'duration': end_time - start_time,
'status': 'completed',
'result': result
})
self.logger.info(f"Script {execution_id} completed in {end_time - start_time:.3f}s")
return {
'success': True,
'result': result,
'execution_info': execution_info
}
except redis.ResponseError as e:
end_time = time.time()
execution_info.update({
'end_time': end_time,
'duration': end_time - start_time,
'status': 'error',
'error': str(e)
})
self.logger.error(f"Script {execution_id} failed: {e}")
return {
'success': False,
'error': str(e),
'execution_info': execution_info,
'debug_info': self._parse_lua_error(str(e))
}
finally:
self.execution_log.append(execution_info)
# 保留最近500个执行记录
if len(self.execution_log) > 500:
self.execution_log.pop(0)
def _add_debug_logging(self, script: str) -> str:
"""为脚本添加调试日志"""
debug_prefix = """
local function debug_log(message)
redis.log(redis.LOG_NOTICE, "[DEBUG] " .. tostring(message))
end
debug_log("Script execution started")
debug_log("Keys: " .. table.concat(KEYS, ", "))
debug_log("Args: " .. table.concat(ARGV, ", "))
"""
debug_suffix = """
debug_log("Script execution completed")
"""
return debug_prefix + script + debug_suffix
def _parse_lua_error(self, error_message: str) -> Dict[str, Any]:
"""解析Lua错误信息"""
debug_info = {
'error_type': 'unknown',
'line_number': None,
'description': error_message
}
# 解析常见错误类型
if 'attempt to call' in error_message:
debug_info['error_type'] = 'function_call_error'
elif 'attempt to index' in error_message:
debug_info['error_type'] = 'index_error'
elif 'attempt to perform arithmetic' in error_message:
debug_info['error_type'] = 'arithmetic_error'
elif 'syntax error' in error_message:
debug_info['error_type'] = 'syntax_error'
# 尝试提取行号
import re
line_match = re.search(r'line (\d+)', error_message)
if line_match:
debug_info['line_number'] = int(line_match.group(1))
return debug_info
def validate_script_syntax(self, script: str) -> Dict[str, Any]:
"""验证脚本语法"""
try:
# 尝试加载脚本
sha1 = self.redis_client.script_load(script)
return {
'valid': True,
'sha1': sha1,
'message': 'Script syntax is valid'
}
except redis.ResponseError as e:
return {
'valid': False,
'error': str(e),
'debug_info': self._parse_lua_error(str(e))
}
def profile_script(self, script: str, keys: List[str] = None,
args: List[Any] = None, iterations: int = 100) -> Dict[str, Any]:
"""性能分析脚本"""
keys = keys or []
args = args or []
# 加载脚本
sha1 = self.redis_client.script_load(script)
execution_times = []
errors = 0
for i in range(iterations):
start_time = time.time()
try:
self.redis_client.evalsha(sha1, len(keys), *(keys + args))
execution_time = time.time() - start_time
execution_times.append(execution_time)
except Exception as e:
errors += 1
self.logger.warning(f"Script execution {i+1} failed: {e}")
if execution_times:
avg_time = sum(execution_times) / len(execution_times)
min_time = min(execution_times)
max_time = max(execution_times)
# 计算百分位数
sorted_times = sorted(execution_times)
p50 = sorted_times[len(sorted_times) // 2]
p95 = sorted_times[int(len(sorted_times) * 0.95)]
p99 = sorted_times[int(len(sorted_times) * 0.99)]
return {
'iterations': iterations,
'successful_executions': len(execution_times),
'errors': errors,
'average_time': avg_time,
'min_time': min_time,
'max_time': max_time,
'p50_time': p50,
'p95_time': p95,
'p99_time': p99,
'executions_per_second': len(execution_times) / sum(execution_times) if sum(execution_times) > 0 else 0
}
else:
return {
'iterations': iterations,
'successful_executions': 0,
'errors': errors,
'message': 'All executions failed'
}
# 使用示例
if __name__ == "__main__":
redis_client = redis.Redis(decode_responses=True)
# 优化的Redis管理器
redis_manager = OptimizedRedisManager()
# 健康检查
health = redis_manager.health_check()
print(f"Redis health: {health}")
# 事务监控示例
transaction_monitor = TransactionMonitor(redis_client)
def sample_transaction():
pipe = redis_client.pipeline()
pipe.multi()
pipe.set("test_key", "test_value")
pipe.incr("counter")
return pipe.execute()
# 监控事务执行
result = transaction_monitor.monitor_transaction(sample_transaction)
print(f"Transaction result: {result}")
# 获取事务统计
stats = transaction_monitor.get_transaction_stats(hours=1)
print(f"Transaction stats: {stats}")
# 脚本调试示例
script_debugger = ScriptDebugger(redis_client)
test_script = """
local key = KEYS[1]
local value = ARGV[1]
redis.call('SET', key, value)
local result = redis.call('GET', key)
return result
"""
# 验证脚本语法
syntax_check = script_debugger.validate_script_syntax(test_script)
print(f"Syntax check: {syntax_check}")
# 调试脚本执行
debug_result = script_debugger.debug_script(
test_script,
keys=["debug_key"],
args=["debug_value"]
)
print(f"Debug result: {debug_result}")
# 性能分析
profile_result = script_debugger.profile_script(
test_script,
keys=["profile_key"],
args=["profile_value"],
iterations=50
)
print(f"Profile result: {profile_result}")
# 批量处理示例
batch_processor = BatchProcessor(redis_client)
operations = [
{'type': 'set', 'key': 'batch_key1', 'value': 'value1'},
{'type': 'set', 'key': 'batch_key2', 'value': 'value2'},
{'type': 'incr', 'key': 'batch_counter', 'value': 5},
{'type': 'get', 'key': 'batch_key1', 'value': None}
]
batch_results = batch_processor.process_batch(operations)
print(f"Batch results: {batch_results}")
# 获取连接统计
connection_stats = redis_manager.get_connection_stats()
print(f"Connection stats: {connection_stats}")
# 清理
redis_manager.close()
9.6 最佳实践
9.6.1 事务设计原则
保持事务简短
- 减少事务中的操作数量
- 避免长时间运行的操作
- 及时释放资源
合理使用WATCH
- 只监视必要的键
- 避免监视过多键导致冲突
- 实现重试机制
错误处理
- 处理WatchError异常
- 实现指数退避重试
- 记录详细的错误日志
9.6.2 脚本开发指南
脚本优化
- 避免复杂的循环和递归
- 使用局部变量减少内存使用
- 合理使用Redis命令
错误处理
- 验证输入参数
- 处理Redis命令错误
- 返回有意义的错误信息
性能考虑
- 缓存脚本SHA1值
- 使用EVALSHA而不是EVAL
- 监控脚本执行时间
9.6.3 安全考虑
输入验证
- 验证键名和参数
- 防止脚本注入
- 限制脚本权限
资源限制
- 设置脚本执行超时
- 限制脚本内存使用
- 监控脚本执行频率
9.7 总结
Redis事务和脚本是实现复杂业务逻辑的重要工具:
事务特点: - 提供基本的原子性保证 - 支持乐观锁机制(WATCH) - 适合简单的多命令操作 - 需要处理并发冲突
Lua脚本优势: - 真正的原子性执行 - 支持复杂逻辑控制 - 减少网络往返次数 - 可缓存和重复使用
适用场景: - 银行转账和支付系统 - 库存管理和预留 - 分布式锁实现 - 限流和计数器 - 缓存一致性维护
性能优化要点: - 使用连接池管理连接 - 缓存脚本SHA1值 - 实现批量操作 - 监控执行性能
最佳实践: - 保持操作简短高效 - 实现健壮的错误处理 - 做好性能监控和调试 - 考虑安全和资源限制
通过合理使用Redis事务和脚本,可以构建高性能、高可靠的分布式应用系统。