3.1 哈希类型概述
3.1.1 什么是Redis哈希
Redis哈希(Hash)是一个键值对集合,类似于编程语言中的字典或映射。哈希特别适合用于存储对象,因为它可以将对象的各个属性作为字段存储在一个哈希键中。
3.1.2 哈希的特点
结构化存储
- 一个哈希可以存储多个字段
- 每个字段都有对应的值
- 字段名和值都是字符串类型
内存效率
- 小哈希使用压缩编码(ziplist)
- 节省内存空间
- 访问速度快
原子操作
- 所有哈希操作都是原子性的
- 支持单个字段的原子更新
- 支持批量操作
3.1.3 应用场景
对象存储
- 用户信息存储
- 商品信息管理
- 配置信息存储
缓存优化
- 减少序列化开销
- 支持部分字段更新
- 提高缓存效率
计数器组合
- 多维度统计
- 分类计数
- 实时统计
3.2 基本哈希操作
3.2.1 设置和获取字段
# 设置单个字段
HSET key field value
HSET user:1 name "Alice"
HSET user:1 age 25
HSET user:1 email "alice@example.com"
# 获取单个字段
HGET key field
HGET user:1 name
# 返回:"Alice"
# 设置多个字段
HMSET key field1 value1 field2 value2 field3 value3
HMSET user:2 name "Bob" age 30 email "bob@example.com" city "Beijing"
# 获取多个字段
HMGET key field1 field2 field3
HMGET user:2 name age email
# 返回:1) "Bob" 2) "30" 3) "bob@example.com"
# 获取所有字段和值
HGETALL key
HGETALL user:1
# 返回:1) "name" 2) "Alice" 3) "age" 4) "25" 5) "email" 6) "alice@example.com"
3.2.2 条件设置
# 仅当字段不存在时设置
HSETNX key field value
HSETNX user:1 phone "13800138000"
# 返回:1(设置成功)或0(字段已存在)
# 检查字段是否存在
HEXISTS key field
HEXISTS user:1 name
# 返回:1(存在)或0(不存在)
# 删除字段
HDEL key field [field ...]
HDEL user:1 phone
HDEL user:2 city email
# 返回:删除的字段数量
3.2.3 获取哈希信息
# 获取字段数量
HLEN key
HLEN user:1
# 返回:3
# 获取所有字段名
HKEYS key
HKEYS user:1
# 返回:1) "name" 2) "age" 3) "email"
# 获取所有字段值
HVALS key
HVALS user:1
# 返回:1) "Alice" 2) "25" 3) "alice@example.com"
3.3 数值操作
3.3.1 整数递增递减
# 字段值递增
HINCRBY key field increment
HSET stats:user:1 login_count 10
HINCRBY stats:user:1 login_count 1
# 返回:11
HINCRBY stats:user:1 login_count 5
# 返回:16
# 字段值递减(使用负数)
HINCRBY stats:user:1 login_count -2
# 返回:14
3.3.2 浮点数递增
# 浮点数递增
HINCRBYFLOAT key field increment
HSET product:1 price 99.99
HINCRBYFLOAT product:1 price 10.01
# 返回:"110"
HINCRBYFLOAT product:1 price -5.50
# 返回:"104.5"
3.4 字符串长度操作
# 获取字段值的字符串长度
HSTRLEN key field
HSET user:1 description "This is a long description"
HSTRLEN user:1 description
# 返回:26
HSTRLEN user:1 name
# 返回:5("Alice"的长度)
3.5 扫描操作
# 扫描哈希中的字段
HSCAN key cursor [MATCH pattern] [COUNT count]
# 基本扫描
HSCAN user:1 0
# 返回:1) "0" 2) 1) "name" 2) "Alice" 3) "age" 4) "25" 5) "email" 6) "alice@example.com"
# 模式匹配扫描
HSET config app_name "MyApp" app_version "1.0" app_debug "true" db_host "localhost" db_port "3306"
HSCAN config 0 MATCH app_*
# 返回匹配app_开头的字段
# 限制返回数量
HSCAN config 0 COUNT 2
# 每次最多返回2个字段
3.6 实际应用示例
3.6.1 用户信息管理系统
import redis
import json
from typing import Dict, Any, Optional, List
from datetime import datetime
class RedisUserManager:
def __init__(self, host='localhost', port=6379, db=0, password=None):
self.redis_client = redis.Redis(
host=host,
port=port,
db=db,
password=password,
decode_responses=True
)
self.user_prefix = "user"
self.stats_prefix = "user_stats"
def _get_user_key(self, user_id: str) -> str:
"""获取用户键名"""
return f"{self.user_prefix}:{user_id}"
def _get_stats_key(self, user_id: str) -> str:
"""获取用户统计键名"""
return f"{self.stats_prefix}:{user_id}"
def create_user(self, user_id: str, user_data: Dict[str, Any]) -> bool:
"""创建用户"""
try:
user_key = self._get_user_key(user_id)
# 检查用户是否已存在
if self.redis_client.exists(user_key):
return False
# 添加创建时间
user_data['created_at'] = datetime.now().isoformat()
user_data['updated_at'] = datetime.now().isoformat()
# 存储用户信息
self.redis_client.hmset(user_key, user_data)
# 初始化用户统计
stats_key = self._get_stats_key(user_id)
self.redis_client.hmset(stats_key, {
'login_count': 0,
'last_login': '',
'total_sessions': 0,
'total_time_spent': 0
})
return True
except Exception as e:
print(f"创建用户失败: {e}")
return False
def get_user(self, user_id: str) -> Optional[Dict[str, str]]:
"""获取用户信息"""
try:
user_key = self._get_user_key(user_id)
user_data = self.redis_client.hgetall(user_key)
return user_data if user_data else None
except Exception as e:
print(f"获取用户失败: {e}")
return None
def update_user(self, user_id: str, update_data: Dict[str, Any]) -> bool:
"""更新用户信息"""
try:
user_key = self._get_user_key(user_id)
# 检查用户是否存在
if not self.redis_client.exists(user_key):
return False
# 添加更新时间
update_data['updated_at'] = datetime.now().isoformat()
# 更新用户信息
self.redis_client.hmset(user_key, update_data)
return True
except Exception as e:
print(f"更新用户失败: {e}")
return False
def delete_user(self, user_id: str) -> bool:
"""删除用户"""
try:
user_key = self._get_user_key(user_id)
stats_key = self._get_stats_key(user_id)
# 删除用户信息和统计信息
deleted_count = self.redis_client.delete(user_key, stats_key)
return deleted_count > 0
except Exception as e:
print(f"删除用户失败: {e}")
return False
def get_user_field(self, user_id: str, field: str) -> Optional[str]:
"""获取用户特定字段"""
try:
user_key = self._get_user_key(user_id)
return self.redis_client.hget(user_key, field)
except Exception as e:
print(f"获取用户字段失败: {e}")
return None
def update_user_field(self, user_id: str, field: str, value: str) -> bool:
"""更新用户特定字段"""
try:
user_key = self._get_user_key(user_id)
if not self.redis_client.exists(user_key):
return False
self.redis_client.hset(user_key, field, value)
self.redis_client.hset(user_key, 'updated_at', datetime.now().isoformat())
return True
except Exception as e:
print(f"更新用户字段失败: {e}")
return False
def increment_login_count(self, user_id: str) -> int:
"""增加登录次数"""
try:
stats_key = self._get_stats_key(user_id)
count = self.redis_client.hincrby(stats_key, 'login_count', 1)
self.redis_client.hset(stats_key, 'last_login', datetime.now().isoformat())
return count
except Exception as e:
print(f"增加登录次数失败: {e}")
return 0
def get_user_stats(self, user_id: str) -> Optional[Dict[str, str]]:
"""获取用户统计信息"""
try:
stats_key = self._get_stats_key(user_id)
return self.redis_client.hgetall(stats_key)
except Exception as e:
print(f"获取用户统计失败: {e}")
return None
def search_users_by_field(self, field: str, value: str) -> List[str]:
"""根据字段值搜索用户"""
matching_users = []
try:
# 扫描所有用户键
for key in self.redis_client.scan_iter(match=f"{self.user_prefix}:*"):
field_value = self.redis_client.hget(key, field)
if field_value == value:
user_id = key.split(':')[-1]
matching_users.append(user_id)
except Exception as e:
print(f"搜索用户失败: {e}")
return matching_users
def get_all_users(self) -> List[Dict[str, Any]]:
"""获取所有用户信息"""
users = []
try:
for key in self.redis_client.scan_iter(match=f"{self.user_prefix}:*"):
user_data = self.redis_client.hgetall(key)
if user_data:
user_id = key.split(':')[-1]
user_data['user_id'] = user_id
users.append(user_data)
except Exception as e:
print(f"获取所有用户失败: {e}")
return users
# 使用示例
user_manager = RedisUserManager()
# 创建用户
user_data = {
'name': 'Alice Johnson',
'email': 'alice@example.com',
'age': '28',
'city': 'Shanghai',
'role': 'developer',
'department': 'engineering'
}
if user_manager.create_user('user_001', user_data):
print("用户创建成功")
else:
print("用户创建失败")
# 获取用户信息
user_info = user_manager.get_user('user_001')
if user_info:
print(f"用户信息: {user_info}")
# 更新用户信息
user_manager.update_user('user_001', {
'age': '29',
'city': 'Beijing',
'last_project': 'Redis Tutorial'
})
# 获取特定字段
user_name = user_manager.get_user_field('user_001', 'name')
print(f"用户姓名: {user_name}")
# 更新特定字段
user_manager.update_user_field('user_001', 'status', 'active')
# 增加登录次数
login_count = user_manager.increment_login_count('user_001')
print(f"登录次数: {login_count}")
# 获取用户统计
stats = user_manager.get_user_stats('user_001')
if stats:
print(f"用户统计: {stats}")
# 搜索用户
developers = user_manager.search_users_by_field('role', 'developer')
print(f"开发者用户: {developers}")
3.6.2 商品信息管理系统
import redis
from typing import Dict, Any, Optional, List
from datetime import datetime
import json
class RedisProductManager:
def __init__(self, host='localhost', port=6379, db=0):
self.redis_client = redis.Redis(
host=host,
port=port,
db=db,
decode_responses=True
)
self.product_prefix = "product"
self.category_prefix = "category"
self.inventory_prefix = "inventory"
def _get_product_key(self, product_id: str) -> str:
return f"{self.product_prefix}:{product_id}"
def _get_inventory_key(self, product_id: str) -> str:
return f"{self.inventory_prefix}:{product_id}"
def create_product(self, product_id: str, product_data: Dict[str, Any]) -> bool:
"""创建商品"""
try:
product_key = self._get_product_key(product_id)
if self.redis_client.exists(product_key):
return False
# 添加时间戳
product_data['created_at'] = datetime.now().isoformat()
product_data['updated_at'] = datetime.now().isoformat()
# 存储商品信息
self.redis_client.hmset(product_key, product_data)
# 初始化库存信息
inventory_key = self._get_inventory_key(product_id)
self.redis_client.hmset(inventory_key, {
'stock': product_data.get('initial_stock', 0),
'reserved': 0,
'sold': 0,
'last_updated': datetime.now().isoformat()
})
return True
except Exception as e:
print(f"创建商品失败: {e}")
return False
def get_product(self, product_id: str) -> Optional[Dict[str, str]]:
"""获取商品信息"""
try:
product_key = self._get_product_key(product_id)
return self.redis_client.hgetall(product_key)
except Exception as e:
print(f"获取商品失败: {e}")
return None
def update_product(self, product_id: str, update_data: Dict[str, Any]) -> bool:
"""更新商品信息"""
try:
product_key = self._get_product_key(product_id)
if not self.redis_client.exists(product_key):
return False
update_data['updated_at'] = datetime.now().isoformat()
self.redis_client.hmset(product_key, update_data)
return True
except Exception as e:
print(f"更新商品失败: {e}")
return False
def update_price(self, product_id: str, new_price: float) -> bool:
"""更新商品价格"""
try:
product_key = self._get_product_key(product_id)
if not self.redis_client.exists(product_key):
return False
# 保存价格历史
old_price = self.redis_client.hget(product_key, 'price')
if old_price:
self.redis_client.hset(product_key, 'previous_price', old_price)
# 更新新价格
self.redis_client.hset(product_key, 'price', str(new_price))
self.redis_client.hset(product_key, 'price_updated_at', datetime.now().isoformat())
return True
except Exception as e:
print(f"更新价格失败: {e}")
return False
def adjust_stock(self, product_id: str, quantity: int) -> Optional[int]:
"""调整库存数量"""
try:
inventory_key = self._get_inventory_key(product_id)
if not self.redis_client.exists(inventory_key):
return None
# 更新库存
new_stock = self.redis_client.hincrby(inventory_key, 'stock', quantity)
self.redis_client.hset(inventory_key, 'last_updated', datetime.now().isoformat())
return new_stock
except Exception as e:
print(f"调整库存失败: {e}")
return None
def reserve_stock(self, product_id: str, quantity: int) -> bool:
"""预留库存"""
try:
inventory_key = self._get_inventory_key(product_id)
# 检查可用库存
current_stock = int(self.redis_client.hget(inventory_key, 'stock') or 0)
current_reserved = int(self.redis_client.hget(inventory_key, 'reserved') or 0)
available = current_stock - current_reserved
if available < quantity:
return False
# 预留库存
self.redis_client.hincrby(inventory_key, 'reserved', quantity)
self.redis_client.hset(inventory_key, 'last_updated', datetime.now().isoformat())
return True
except Exception as e:
print(f"预留库存失败: {e}")
return False
def confirm_sale(self, product_id: str, quantity: int) -> bool:
"""确认销售"""
try:
inventory_key = self._get_inventory_key(product_id)
# 减少库存和预留数量,增加销售数量
pipe = self.redis_client.pipeline()
pipe.hincrby(inventory_key, 'stock', -quantity)
pipe.hincrby(inventory_key, 'reserved', -quantity)
pipe.hincrby(inventory_key, 'sold', quantity)
pipe.hset(inventory_key, 'last_updated', datetime.now().isoformat())
pipe.execute()
return True
except Exception as e:
print(f"确认销售失败: {e}")
return False
def get_inventory(self, product_id: str) -> Optional[Dict[str, str]]:
"""获取库存信息"""
try:
inventory_key = self._get_inventory_key(product_id)
return self.redis_client.hgetall(inventory_key)
except Exception as e:
print(f"获取库存失败: {e}")
return None
def get_products_by_category(self, category: str) -> List[Dict[str, Any]]:
"""根据分类获取商品"""
products = []
try:
for key in self.redis_client.scan_iter(match=f"{self.product_prefix}:*"):
product_category = self.redis_client.hget(key, 'category')
if product_category == category:
product_data = self.redis_client.hgetall(key)
product_id = key.split(':')[-1]
product_data['product_id'] = product_id
products.append(product_data)
except Exception as e:
print(f"获取分类商品失败: {e}")
return products
def search_products(self, **criteria) -> List[Dict[str, Any]]:
"""搜索商品"""
matching_products = []
try:
for key in self.redis_client.scan_iter(match=f"{self.product_prefix}:*"):
product_data = self.redis_client.hgetall(key)
# 检查是否匹配所有条件
match = True
for field, value in criteria.items():
if product_data.get(field) != str(value):
match = False
break
if match:
product_id = key.split(':')[-1]
product_data['product_id'] = product_id
matching_products.append(product_data)
except Exception as e:
print(f"搜索商品失败: {e}")
return matching_products
def get_low_stock_products(self, threshold: int = 10) -> List[Dict[str, Any]]:
"""获取低库存商品"""
low_stock_products = []
try:
for key in self.redis_client.scan_iter(match=f"{self.inventory_prefix}:*"):
stock = int(self.redis_client.hget(key, 'stock') or 0)
reserved = int(self.redis_client.hget(key, 'reserved') or 0)
available = stock - reserved
if available <= threshold:
product_id = key.split(':')[-1]
product_data = self.get_product(product_id)
if product_data:
product_data['product_id'] = product_id
product_data['available_stock'] = available
low_stock_products.append(product_data)
except Exception as e:
print(f"获取低库存商品失败: {e}")
return low_stock_products
# 使用示例
product_manager = RedisProductManager()
# 创建商品
product_data = {
'name': 'iPhone 15 Pro',
'category': 'electronics',
'brand': 'Apple',
'price': '7999.00',
'description': 'Latest iPhone with advanced features',
'sku': 'IPH15PRO001',
'initial_stock': '100'
}
if product_manager.create_product('prod_001', product_data):
print("商品创建成功")
# 获取商品信息
product = product_manager.get_product('prod_001')
if product:
print(f"商品信息: {product}")
# 更新商品价格
product_manager.update_price('prod_001', 7599.00)
print("价格已更新")
# 调整库存
new_stock = product_manager.adjust_stock('prod_001', -10)
print(f"调整后库存: {new_stock}")
# 预留库存
if product_manager.reserve_stock('prod_001', 5):
print("库存预留成功")
# 确认销售
if product_manager.confirm_sale('prod_001', 3):
print("销售确认成功")
# 获取库存信息
inventory = product_manager.get_inventory('prod_001')
if inventory:
print(f"库存信息: {inventory}")
# 搜索商品
electronics = product_manager.get_products_by_category('electronics')
print(f"电子产品数量: {len(electronics)}")
# 获取低库存商品
low_stock = product_manager.get_low_stock_products(20)
print(f"低库存商品: {len(low_stock)}")
3.6.3 配置管理系统
import redis
from typing import Dict, Any, Optional, List
import json
from datetime import datetime
class RedisConfigManager:
def __init__(self, host='localhost', port=6379, db=0):
self.redis_client = redis.Redis(
host=host,
port=port,
db=db,
decode_responses=True
)
self.config_prefix = "config"
self.history_prefix = "config_history"
def _get_config_key(self, app_name: str, env: str = 'default') -> str:
return f"{self.config_prefix}:{app_name}:{env}"
def _get_history_key(self, app_name: str, env: str = 'default') -> str:
return f"{self.history_prefix}:{app_name}:{env}"
def set_config(self, app_name: str, config_data: Dict[str, Any],
env: str = 'default', user: str = 'system') -> bool:
"""设置配置"""
try:
config_key = self._get_config_key(app_name, env)
# 保存历史配置
if self.redis_client.exists(config_key):
self._save_config_history(app_name, env, user)
# 添加元数据
config_data['_updated_at'] = datetime.now().isoformat()
config_data['_updated_by'] = user
config_data['_version'] = str(int(datetime.now().timestamp()))
# 存储配置
self.redis_client.hmset(config_key, config_data)
return True
except Exception as e:
print(f"设置配置失败: {e}")
return False
def get_config(self, app_name: str, env: str = 'default') -> Optional[Dict[str, str]]:
"""获取配置"""
try:
config_key = self._get_config_key(app_name, env)
return self.redis_client.hgetall(config_key)
except Exception as e:
print(f"获取配置失败: {e}")
return None
def get_config_value(self, app_name: str, key: str,
env: str = 'default', default: Any = None) -> Any:
"""获取配置值"""
try:
config_key = self._get_config_key(app_name, env)
value = self.redis_client.hget(config_key, key)
return value if value is not None else default
except Exception as e:
print(f"获取配置值失败: {e}")
return default
def update_config_value(self, app_name: str, key: str, value: Any,
env: str = 'default', user: str = 'system') -> bool:
"""更新单个配置值"""
try:
config_key = self._get_config_key(app_name, env)
if not self.redis_client.exists(config_key):
return False
# 保存历史
self._save_config_history(app_name, env, user)
# 更新配置
self.redis_client.hset(config_key, key, str(value))
self.redis_client.hset(config_key, '_updated_at', datetime.now().isoformat())
self.redis_client.hset(config_key, '_updated_by', user)
return True
except Exception as e:
print(f"更新配置值失败: {e}")
return False
def delete_config_key(self, app_name: str, key: str,
env: str = 'default', user: str = 'system') -> bool:
"""删除配置键"""
try:
config_key = self._get_config_key(app_name, env)
if not self.redis_client.hexists(config_key, key):
return False
# 保存历史
self._save_config_history(app_name, env, user)
# 删除配置键
self.redis_client.hdel(config_key, key)
self.redis_client.hset(config_key, '_updated_at', datetime.now().isoformat())
self.redis_client.hset(config_key, '_updated_by', user)
return True
except Exception as e:
print(f"删除配置键失败: {e}")
return False
def _save_config_history(self, app_name: str, env: str, user: str) -> None:
"""保存配置历史"""
try:
config_key = self._get_config_key(app_name, env)
current_config = self.redis_client.hgetall(config_key)
if current_config:
history_key = self._get_history_key(app_name, env)
timestamp = datetime.now().isoformat()
# 保存历史记录
history_record = {
'timestamp': timestamp,
'user': user,
'config': json.dumps(current_config)
}
self.redis_client.lpush(history_key, json.dumps(history_record))
# 只保留最近50个历史记录
self.redis_client.ltrim(history_key, 0, 49)
except Exception as e:
print(f"保存配置历史失败: {e}")
def get_config_history(self, app_name: str, env: str = 'default',
limit: int = 10) -> List[Dict[str, Any]]:
"""获取配置历史"""
try:
history_key = self._get_history_key(app_name, env)
history_records = self.redis_client.lrange(history_key, 0, limit - 1)
history = []
for record in history_records:
history_data = json.loads(record)
history_data['config'] = json.loads(history_data['config'])
history.append(history_data)
return history
except Exception as e:
print(f"获取配置历史失败: {e}")
return []
def rollback_config(self, app_name: str, env: str = 'default',
version_index: int = 0, user: str = 'system') -> bool:
"""回滚配置"""
try:
history = self.get_config_history(app_name, env, version_index + 1)
if len(history) <= version_index:
return False
# 获取要回滚的配置
target_config = history[version_index]['config']
# 移除元数据字段
rollback_config = {k: v for k, v in target_config.items()
if not k.startswith('_')}
# 设置回滚配置
return self.set_config(app_name, rollback_config, env, f"{user}(rollback)")
except Exception as e:
print(f"回滚配置失败: {e}")
return False
def list_apps(self) -> List[str]:
"""列出所有应用"""
apps = set()
try:
for key in self.redis_client.scan_iter(match=f"{self.config_prefix}:*"):
parts = key.split(':')
if len(parts) >= 3:
apps.add(parts[1])
except Exception as e:
print(f"列出应用失败: {e}")
return list(apps)
def list_environments(self, app_name: str) -> List[str]:
"""列出应用的所有环境"""
environments = set()
try:
pattern = f"{self.config_prefix}:{app_name}:*"
for key in self.redis_client.scan_iter(match=pattern):
parts = key.split(':')
if len(parts) >= 3:
environments.add(parts[2])
except Exception as e:
print(f"列出环境失败: {e}")
return list(environments)
def export_config(self, app_name: str, env: str = 'default') -> Optional[str]:
"""导出配置为JSON"""
try:
config = self.get_config(app_name, env)
if config:
# 移除元数据
export_config = {k: v for k, v in config.items()
if not k.startswith('_')}
return json.dumps(export_config, indent=2)
return None
except Exception as e:
print(f"导出配置失败: {e}")
return None
def import_config(self, app_name: str, config_json: str,
env: str = 'default', user: str = 'system') -> bool:
"""从JSON导入配置"""
try:
config_data = json.loads(config_json)
return self.set_config(app_name, config_data, env, user)
except Exception as e:
print(f"导入配置失败: {e}")
return False
# 使用示例
config_manager = RedisConfigManager()
# 设置应用配置
app_config = {
'database_host': 'localhost',
'database_port': '5432',
'database_name': 'myapp',
'redis_host': 'localhost',
'redis_port': '6379',
'log_level': 'INFO',
'debug_mode': 'false',
'max_connections': '100'
}
config_manager.set_config('myapp', app_config, 'production', 'admin')
print("生产环境配置已设置")
# 设置开发环境配置
dev_config = app_config.copy()
dev_config.update({
'database_host': 'dev-db.local',
'debug_mode': 'true',
'log_level': 'DEBUG'
})
config_manager.set_config('myapp', dev_config, 'development', 'admin')
print("开发环境配置已设置")
# 获取配置
prod_config = config_manager.get_config('myapp', 'production')
print(f"生产环境配置: {prod_config}")
# 获取单个配置值
db_host = config_manager.get_config_value('myapp', 'database_host', 'production')
print(f"数据库主机: {db_host}")
# 更新配置值
config_manager.update_config_value('myapp', 'max_connections', '200', 'production', 'admin')
print("最大连接数已更新")
# 获取配置历史
history = config_manager.get_config_history('myapp', 'production', 5)
print(f"配置历史记录数: {len(history)}")
# 列出所有应用
apps = config_manager.list_apps()
print(f"应用列表: {apps}")
# 列出应用环境
envs = config_manager.list_environments('myapp')
print(f"myapp环境列表: {envs}")
# 导出配置
exported_config = config_manager.export_config('myapp', 'production')
if exported_config:
print(f"导出的配置:\n{exported_config}")
3.7 性能优化
3.7.1 哈希编码优化
# 查看哈希编码
OBJECT ENCODING hash_key
# 小哈希使用ziplist编码(节省内存)
HSET small_hash field1 value1 field2 value2
OBJECT ENCODING small_hash
# 返回:"ziplist"
# 大哈希使用hashtable编码
# 当字段数量或值大小超过阈值时自动转换
3.7.2 内存优化配置
# 哈希优化配置参数
# redis.conf
# ziplist编码的最大字段数
hash-max-ziplist-entries 512
# ziplist编码的最大值大小
hash-max-ziplist-value 64
# 查看当前配置
CONFIG GET hash-max-ziplist-*
3.7.3 批量操作优化
import redis
from typing import Dict, List
class RedisHashOptimizer:
def __init__(self, redis_client: redis.Redis):
self.redis_client = redis_client
def batch_set_hash_fields(self, key: str, field_values: Dict[str, str]) -> bool:
"""批量设置哈希字段"""
try:
# 使用HMSET批量设置
self.redis_client.hmset(key, field_values)
return True
except Exception as e:
print(f"批量设置失败: {e}")
return False
def batch_get_hash_fields(self, key: str, fields: List[str]) -> Dict[str, str]:
"""批量获取哈希字段"""
try:
values = self.redis_client.hmget(key, fields)
return dict(zip(fields, values))
except Exception as e:
print(f"批量获取失败: {e}")
return {}
def pipeline_hash_operations(self, operations: List[Dict]) -> List:
"""管道化哈希操作"""
try:
pipe = self.redis_client.pipeline()
for op in operations:
if op['type'] == 'hset':
pipe.hset(op['key'], op['field'], op['value'])
elif op['type'] == 'hget':
pipe.hget(op['key'], op['field'])
elif op['type'] == 'hmset':
pipe.hmset(op['key'], op['data'])
elif op['type'] == 'hgetall':
pipe.hgetall(op['key'])
return pipe.execute()
except Exception as e:
print(f"管道操作失败: {e}")
return []
def analyze_hash_memory(self, key: str) -> Dict[str, Any]:
"""分析哈希内存使用"""
try:
if not self.redis_client.exists(key):
return {'error': 'Key does not exist'}
encoding = self.redis_client.object('encoding', key)
memory_usage = self.redis_client.memory_usage(key)
field_count = self.redis_client.hlen(key)
return {
'key': key,
'encoding': encoding,
'memory_usage': memory_usage,
'field_count': field_count,
'avg_memory_per_field': memory_usage / field_count if field_count > 0 else 0
}
except Exception as e:
print(f"分析内存失败: {e}")
return {'error': str(e)}
# 使用示例
redis_client = redis.Redis(decode_responses=True)
optimizer = RedisHashOptimizer(redis_client)
# 批量设置
user_data = {
'name': 'Alice',
'age': '25',
'email': 'alice@example.com',
'city': 'Shanghai'
}
optimizer.batch_set_hash_fields('user:1', user_data)
# 批量获取
fields = ['name', 'age', 'email']
result = optimizer.batch_get_hash_fields('user:1', fields)
print(f"批量获取结果: {result}")
# 管道操作
operations = [
{'type': 'hset', 'key': 'user:2', 'field': 'name', 'value': 'Bob'},
{'type': 'hset', 'key': 'user:2', 'field': 'age', 'value': '30'},
{'type': 'hget', 'key': 'user:1', 'field': 'name'},
{'type': 'hgetall', 'key': 'user:1'}
]
results = optimizer.pipeline_hash_operations(operations)
print(f"管道操作结果: {results}")
# 内存分析
memory_analysis = optimizer.analyze_hash_memory('user:1')
print(f"内存分析: {memory_analysis}")
3.8 总结
本章详细介绍了Redis哈希类型的使用方法,包括:
- 基本操作:HSET、HGET、HMSET、HMGET、HGETALL等命令
- 高级功能:条件设置、数值操作、字符串长度、扫描操作
- 实际应用:用户信息管理、商品管理、配置管理系统
- 性能优化:编码优化、批量操作、管道化处理
哈希类型特别适合存储结构化数据,如用户信息、商品信息等对象。相比于将整个对象序列化为字符串,使用哈希可以:
- 节省内存空间(特别是小哈希)
- 支持部分字段更新
- 提供更好的可读性和操作性
- 避免序列化/反序列化开销
在实际应用中,应根据数据特点和访问模式选择合适的数据结构和操作方法。
下一章将介绍Redis的列表类型及其应用场景。