3.1 哈希类型概述

3.1.1 什么是Redis哈希

Redis哈希(Hash)是一个键值对集合,类似于编程语言中的字典或映射。哈希特别适合用于存储对象,因为它可以将对象的各个属性作为字段存储在一个哈希键中。

3.1.2 哈希的特点

  1. 结构化存储

    • 一个哈希可以存储多个字段
    • 每个字段都有对应的值
    • 字段名和值都是字符串类型
  2. 内存效率

    • 小哈希使用压缩编码(ziplist)
    • 节省内存空间
    • 访问速度快
  3. 原子操作

    • 所有哈希操作都是原子性的
    • 支持单个字段的原子更新
    • 支持批量操作

3.1.3 应用场景

  1. 对象存储

    • 用户信息存储
    • 商品信息管理
    • 配置信息存储
  2. 缓存优化

    • 减少序列化开销
    • 支持部分字段更新
    • 提高缓存效率
  3. 计数器组合

    • 多维度统计
    • 分类计数
    • 实时统计

3.2 基本哈希操作

3.2.1 设置和获取字段

# 设置单个字段
HSET key field value
HSET user:1 name "Alice"
HSET user:1 age 25
HSET user:1 email "alice@example.com"

# 获取单个字段
HGET key field
HGET user:1 name
# 返回:"Alice"

# 设置多个字段
HMSET key field1 value1 field2 value2 field3 value3
HMSET user:2 name "Bob" age 30 email "bob@example.com" city "Beijing"

# 获取多个字段
HMGET key field1 field2 field3
HMGET user:2 name age email
# 返回:1) "Bob" 2) "30" 3) "bob@example.com"

# 获取所有字段和值
HGETALL key
HGETALL user:1
# 返回:1) "name" 2) "Alice" 3) "age" 4) "25" 5) "email" 6) "alice@example.com"

3.2.2 条件设置

# 仅当字段不存在时设置
HSETNX key field value
HSETNX user:1 phone "13800138000"
# 返回:1(设置成功)或0(字段已存在)

# 检查字段是否存在
HEXISTS key field
HEXISTS user:1 name
# 返回:1(存在)或0(不存在)

# 删除字段
HDEL key field [field ...]
HDEL user:1 phone
HDEL user:2 city email
# 返回:删除的字段数量

3.2.3 获取哈希信息

# 获取字段数量
HLEN key
HLEN user:1
# 返回:3

# 获取所有字段名
HKEYS key
HKEYS user:1
# 返回:1) "name" 2) "age" 3) "email"

# 获取所有字段值
HVALS key
HVALS user:1
# 返回:1) "Alice" 2) "25" 3) "alice@example.com"

3.3 数值操作

3.3.1 整数递增递减

# 字段值递增
HINCRBY key field increment
HSET stats:user:1 login_count 10
HINCRBY stats:user:1 login_count 1
# 返回:11

HINCRBY stats:user:1 login_count 5
# 返回:16

# 字段值递减(使用负数)
HINCRBY stats:user:1 login_count -2
# 返回:14

3.3.2 浮点数递增

# 浮点数递增
HINCRBYFLOAT key field increment
HSET product:1 price 99.99
HINCRBYFLOAT product:1 price 10.01
# 返回:"110"

HINCRBYFLOAT product:1 price -5.50
# 返回:"104.5"

3.4 字符串长度操作

# 获取字段值的字符串长度
HSTRLEN key field
HSET user:1 description "This is a long description"
HSTRLEN user:1 description
# 返回:26

HSTRLEN user:1 name
# 返回:5("Alice"的长度)

3.5 扫描操作

# 扫描哈希中的字段
HSCAN key cursor [MATCH pattern] [COUNT count]

# 基本扫描
HSCAN user:1 0
# 返回:1) "0" 2) 1) "name" 2) "Alice" 3) "age" 4) "25" 5) "email" 6) "alice@example.com"

# 模式匹配扫描
HSET config app_name "MyApp" app_version "1.0" app_debug "true" db_host "localhost" db_port "3306"
HSCAN config 0 MATCH app_*
# 返回匹配app_开头的字段

# 限制返回数量
HSCAN config 0 COUNT 2
# 每次最多返回2个字段

3.6 实际应用示例

3.6.1 用户信息管理系统

import redis
import json
from typing import Dict, Any, Optional, List
from datetime import datetime

class RedisUserManager:
    def __init__(self, host='localhost', port=6379, db=0, password=None):
        self.redis_client = redis.Redis(
            host=host, 
            port=port, 
            db=db, 
            password=password,
            decode_responses=True
        )
        self.user_prefix = "user"
        self.stats_prefix = "user_stats"
    
    def _get_user_key(self, user_id: str) -> str:
        """获取用户键名"""
        return f"{self.user_prefix}:{user_id}"
    
    def _get_stats_key(self, user_id: str) -> str:
        """获取用户统计键名"""
        return f"{self.stats_prefix}:{user_id}"
    
    def create_user(self, user_id: str, user_data: Dict[str, Any]) -> bool:
        """创建用户"""
        try:
            user_key = self._get_user_key(user_id)
            
            # 检查用户是否已存在
            if self.redis_client.exists(user_key):
                return False
            
            # 添加创建时间
            user_data['created_at'] = datetime.now().isoformat()
            user_data['updated_at'] = datetime.now().isoformat()
            
            # 存储用户信息
            self.redis_client.hmset(user_key, user_data)
            
            # 初始化用户统计
            stats_key = self._get_stats_key(user_id)
            self.redis_client.hmset(stats_key, {
                'login_count': 0,
                'last_login': '',
                'total_sessions': 0,
                'total_time_spent': 0
            })
            
            return True
        except Exception as e:
            print(f"创建用户失败: {e}")
            return False
    
    def get_user(self, user_id: str) -> Optional[Dict[str, str]]:
        """获取用户信息"""
        try:
            user_key = self._get_user_key(user_id)
            user_data = self.redis_client.hgetall(user_key)
            return user_data if user_data else None
        except Exception as e:
            print(f"获取用户失败: {e}")
            return None
    
    def update_user(self, user_id: str, update_data: Dict[str, Any]) -> bool:
        """更新用户信息"""
        try:
            user_key = self._get_user_key(user_id)
            
            # 检查用户是否存在
            if not self.redis_client.exists(user_key):
                return False
            
            # 添加更新时间
            update_data['updated_at'] = datetime.now().isoformat()
            
            # 更新用户信息
            self.redis_client.hmset(user_key, update_data)
            return True
        except Exception as e:
            print(f"更新用户失败: {e}")
            return False
    
    def delete_user(self, user_id: str) -> bool:
        """删除用户"""
        try:
            user_key = self._get_user_key(user_id)
            stats_key = self._get_stats_key(user_id)
            
            # 删除用户信息和统计信息
            deleted_count = self.redis_client.delete(user_key, stats_key)
            return deleted_count > 0
        except Exception as e:
            print(f"删除用户失败: {e}")
            return False
    
    def get_user_field(self, user_id: str, field: str) -> Optional[str]:
        """获取用户特定字段"""
        try:
            user_key = self._get_user_key(user_id)
            return self.redis_client.hget(user_key, field)
        except Exception as e:
            print(f"获取用户字段失败: {e}")
            return None
    
    def update_user_field(self, user_id: str, field: str, value: str) -> bool:
        """更新用户特定字段"""
        try:
            user_key = self._get_user_key(user_id)
            
            if not self.redis_client.exists(user_key):
                return False
            
            self.redis_client.hset(user_key, field, value)
            self.redis_client.hset(user_key, 'updated_at', datetime.now().isoformat())
            return True
        except Exception as e:
            print(f"更新用户字段失败: {e}")
            return False
    
    def increment_login_count(self, user_id: str) -> int:
        """增加登录次数"""
        try:
            stats_key = self._get_stats_key(user_id)
            count = self.redis_client.hincrby(stats_key, 'login_count', 1)
            self.redis_client.hset(stats_key, 'last_login', datetime.now().isoformat())
            return count
        except Exception as e:
            print(f"增加登录次数失败: {e}")
            return 0
    
    def get_user_stats(self, user_id: str) -> Optional[Dict[str, str]]:
        """获取用户统计信息"""
        try:
            stats_key = self._get_stats_key(user_id)
            return self.redis_client.hgetall(stats_key)
        except Exception as e:
            print(f"获取用户统计失败: {e}")
            return None
    
    def search_users_by_field(self, field: str, value: str) -> List[str]:
        """根据字段值搜索用户"""
        matching_users = []
        
        try:
            # 扫描所有用户键
            for key in self.redis_client.scan_iter(match=f"{self.user_prefix}:*"):
                field_value = self.redis_client.hget(key, field)
                if field_value == value:
                    user_id = key.split(':')[-1]
                    matching_users.append(user_id)
        except Exception as e:
            print(f"搜索用户失败: {e}")
        
        return matching_users
    
    def get_all_users(self) -> List[Dict[str, Any]]:
        """获取所有用户信息"""
        users = []
        
        try:
            for key in self.redis_client.scan_iter(match=f"{self.user_prefix}:*"):
                user_data = self.redis_client.hgetall(key)
                if user_data:
                    user_id = key.split(':')[-1]
                    user_data['user_id'] = user_id
                    users.append(user_data)
        except Exception as e:
            print(f"获取所有用户失败: {e}")
        
        return users

# 使用示例
user_manager = RedisUserManager()

# 创建用户
user_data = {
    'name': 'Alice Johnson',
    'email': 'alice@example.com',
    'age': '28',
    'city': 'Shanghai',
    'role': 'developer',
    'department': 'engineering'
}

if user_manager.create_user('user_001', user_data):
    print("用户创建成功")
else:
    print("用户创建失败")

# 获取用户信息
user_info = user_manager.get_user('user_001')
if user_info:
    print(f"用户信息: {user_info}")

# 更新用户信息
user_manager.update_user('user_001', {
    'age': '29',
    'city': 'Beijing',
    'last_project': 'Redis Tutorial'
})

# 获取特定字段
user_name = user_manager.get_user_field('user_001', 'name')
print(f"用户姓名: {user_name}")

# 更新特定字段
user_manager.update_user_field('user_001', 'status', 'active')

# 增加登录次数
login_count = user_manager.increment_login_count('user_001')
print(f"登录次数: {login_count}")

# 获取用户统计
stats = user_manager.get_user_stats('user_001')
if stats:
    print(f"用户统计: {stats}")

# 搜索用户
developers = user_manager.search_users_by_field('role', 'developer')
print(f"开发者用户: {developers}")

3.6.2 商品信息管理系统

import redis
from typing import Dict, Any, Optional, List
from datetime import datetime
import json

class RedisProductManager:
    def __init__(self, host='localhost', port=6379, db=0):
        self.redis_client = redis.Redis(
            host=host, 
            port=port, 
            db=db,
            decode_responses=True
        )
        self.product_prefix = "product"
        self.category_prefix = "category"
        self.inventory_prefix = "inventory"
    
    def _get_product_key(self, product_id: str) -> str:
        return f"{self.product_prefix}:{product_id}"
    
    def _get_inventory_key(self, product_id: str) -> str:
        return f"{self.inventory_prefix}:{product_id}"
    
    def create_product(self, product_id: str, product_data: Dict[str, Any]) -> bool:
        """创建商品"""
        try:
            product_key = self._get_product_key(product_id)
            
            if self.redis_client.exists(product_key):
                return False
            
            # 添加时间戳
            product_data['created_at'] = datetime.now().isoformat()
            product_data['updated_at'] = datetime.now().isoformat()
            
            # 存储商品信息
            self.redis_client.hmset(product_key, product_data)
            
            # 初始化库存信息
            inventory_key = self._get_inventory_key(product_id)
            self.redis_client.hmset(inventory_key, {
                'stock': product_data.get('initial_stock', 0),
                'reserved': 0,
                'sold': 0,
                'last_updated': datetime.now().isoformat()
            })
            
            return True
        except Exception as e:
            print(f"创建商品失败: {e}")
            return False
    
    def get_product(self, product_id: str) -> Optional[Dict[str, str]]:
        """获取商品信息"""
        try:
            product_key = self._get_product_key(product_id)
            return self.redis_client.hgetall(product_key)
        except Exception as e:
            print(f"获取商品失败: {e}")
            return None
    
    def update_product(self, product_id: str, update_data: Dict[str, Any]) -> bool:
        """更新商品信息"""
        try:
            product_key = self._get_product_key(product_id)
            
            if not self.redis_client.exists(product_key):
                return False
            
            update_data['updated_at'] = datetime.now().isoformat()
            self.redis_client.hmset(product_key, update_data)
            return True
        except Exception as e:
            print(f"更新商品失败: {e}")
            return False
    
    def update_price(self, product_id: str, new_price: float) -> bool:
        """更新商品价格"""
        try:
            product_key = self._get_product_key(product_id)
            
            if not self.redis_client.exists(product_key):
                return False
            
            # 保存价格历史
            old_price = self.redis_client.hget(product_key, 'price')
            if old_price:
                self.redis_client.hset(product_key, 'previous_price', old_price)
            
            # 更新新价格
            self.redis_client.hset(product_key, 'price', str(new_price))
            self.redis_client.hset(product_key, 'price_updated_at', datetime.now().isoformat())
            
            return True
        except Exception as e:
            print(f"更新价格失败: {e}")
            return False
    
    def adjust_stock(self, product_id: str, quantity: int) -> Optional[int]:
        """调整库存数量"""
        try:
            inventory_key = self._get_inventory_key(product_id)
            
            if not self.redis_client.exists(inventory_key):
                return None
            
            # 更新库存
            new_stock = self.redis_client.hincrby(inventory_key, 'stock', quantity)
            self.redis_client.hset(inventory_key, 'last_updated', datetime.now().isoformat())
            
            return new_stock
        except Exception as e:
            print(f"调整库存失败: {e}")
            return None
    
    def reserve_stock(self, product_id: str, quantity: int) -> bool:
        """预留库存"""
        try:
            inventory_key = self._get_inventory_key(product_id)
            
            # 检查可用库存
            current_stock = int(self.redis_client.hget(inventory_key, 'stock') or 0)
            current_reserved = int(self.redis_client.hget(inventory_key, 'reserved') or 0)
            
            available = current_stock - current_reserved
            if available < quantity:
                return False
            
            # 预留库存
            self.redis_client.hincrby(inventory_key, 'reserved', quantity)
            self.redis_client.hset(inventory_key, 'last_updated', datetime.now().isoformat())
            
            return True
        except Exception as e:
            print(f"预留库存失败: {e}")
            return False
    
    def confirm_sale(self, product_id: str, quantity: int) -> bool:
        """确认销售"""
        try:
            inventory_key = self._get_inventory_key(product_id)
            
            # 减少库存和预留数量,增加销售数量
            pipe = self.redis_client.pipeline()
            pipe.hincrby(inventory_key, 'stock', -quantity)
            pipe.hincrby(inventory_key, 'reserved', -quantity)
            pipe.hincrby(inventory_key, 'sold', quantity)
            pipe.hset(inventory_key, 'last_updated', datetime.now().isoformat())
            pipe.execute()
            
            return True
        except Exception as e:
            print(f"确认销售失败: {e}")
            return False
    
    def get_inventory(self, product_id: str) -> Optional[Dict[str, str]]:
        """获取库存信息"""
        try:
            inventory_key = self._get_inventory_key(product_id)
            return self.redis_client.hgetall(inventory_key)
        except Exception as e:
            print(f"获取库存失败: {e}")
            return None
    
    def get_products_by_category(self, category: str) -> List[Dict[str, Any]]:
        """根据分类获取商品"""
        products = []
        
        try:
            for key in self.redis_client.scan_iter(match=f"{self.product_prefix}:*"):
                product_category = self.redis_client.hget(key, 'category')
                if product_category == category:
                    product_data = self.redis_client.hgetall(key)
                    product_id = key.split(':')[-1]
                    product_data['product_id'] = product_id
                    products.append(product_data)
        except Exception as e:
            print(f"获取分类商品失败: {e}")
        
        return products
    
    def search_products(self, **criteria) -> List[Dict[str, Any]]:
        """搜索商品"""
        matching_products = []
        
        try:
            for key in self.redis_client.scan_iter(match=f"{self.product_prefix}:*"):
                product_data = self.redis_client.hgetall(key)
                
                # 检查是否匹配所有条件
                match = True
                for field, value in criteria.items():
                    if product_data.get(field) != str(value):
                        match = False
                        break
                
                if match:
                    product_id = key.split(':')[-1]
                    product_data['product_id'] = product_id
                    matching_products.append(product_data)
        except Exception as e:
            print(f"搜索商品失败: {e}")
        
        return matching_products
    
    def get_low_stock_products(self, threshold: int = 10) -> List[Dict[str, Any]]:
        """获取低库存商品"""
        low_stock_products = []
        
        try:
            for key in self.redis_client.scan_iter(match=f"{self.inventory_prefix}:*"):
                stock = int(self.redis_client.hget(key, 'stock') or 0)
                reserved = int(self.redis_client.hget(key, 'reserved') or 0)
                available = stock - reserved
                
                if available <= threshold:
                    product_id = key.split(':')[-1]
                    product_data = self.get_product(product_id)
                    if product_data:
                        product_data['product_id'] = product_id
                        product_data['available_stock'] = available
                        low_stock_products.append(product_data)
        except Exception as e:
            print(f"获取低库存商品失败: {e}")
        
        return low_stock_products

# 使用示例
product_manager = RedisProductManager()

# 创建商品
product_data = {
    'name': 'iPhone 15 Pro',
    'category': 'electronics',
    'brand': 'Apple',
    'price': '7999.00',
    'description': 'Latest iPhone with advanced features',
    'sku': 'IPH15PRO001',
    'initial_stock': '100'
}

if product_manager.create_product('prod_001', product_data):
    print("商品创建成功")

# 获取商品信息
product = product_manager.get_product('prod_001')
if product:
    print(f"商品信息: {product}")

# 更新商品价格
product_manager.update_price('prod_001', 7599.00)
print("价格已更新")

# 调整库存
new_stock = product_manager.adjust_stock('prod_001', -10)
print(f"调整后库存: {new_stock}")

# 预留库存
if product_manager.reserve_stock('prod_001', 5):
    print("库存预留成功")

# 确认销售
if product_manager.confirm_sale('prod_001', 3):
    print("销售确认成功")

# 获取库存信息
inventory = product_manager.get_inventory('prod_001')
if inventory:
    print(f"库存信息: {inventory}")

# 搜索商品
electronics = product_manager.get_products_by_category('electronics')
print(f"电子产品数量: {len(electronics)}")

# 获取低库存商品
low_stock = product_manager.get_low_stock_products(20)
print(f"低库存商品: {len(low_stock)}")

3.6.3 配置管理系统

import redis
from typing import Dict, Any, Optional, List
import json
from datetime import datetime

class RedisConfigManager:
    def __init__(self, host='localhost', port=6379, db=0):
        self.redis_client = redis.Redis(
            host=host, 
            port=port, 
            db=db,
            decode_responses=True
        )
        self.config_prefix = "config"
        self.history_prefix = "config_history"
    
    def _get_config_key(self, app_name: str, env: str = 'default') -> str:
        return f"{self.config_prefix}:{app_name}:{env}"
    
    def _get_history_key(self, app_name: str, env: str = 'default') -> str:
        return f"{self.history_prefix}:{app_name}:{env}"
    
    def set_config(self, app_name: str, config_data: Dict[str, Any], 
                   env: str = 'default', user: str = 'system') -> bool:
        """设置配置"""
        try:
            config_key = self._get_config_key(app_name, env)
            
            # 保存历史配置
            if self.redis_client.exists(config_key):
                self._save_config_history(app_name, env, user)
            
            # 添加元数据
            config_data['_updated_at'] = datetime.now().isoformat()
            config_data['_updated_by'] = user
            config_data['_version'] = str(int(datetime.now().timestamp()))
            
            # 存储配置
            self.redis_client.hmset(config_key, config_data)
            return True
        except Exception as e:
            print(f"设置配置失败: {e}")
            return False
    
    def get_config(self, app_name: str, env: str = 'default') -> Optional[Dict[str, str]]:
        """获取配置"""
        try:
            config_key = self._get_config_key(app_name, env)
            return self.redis_client.hgetall(config_key)
        except Exception as e:
            print(f"获取配置失败: {e}")
            return None
    
    def get_config_value(self, app_name: str, key: str, 
                        env: str = 'default', default: Any = None) -> Any:
        """获取配置值"""
        try:
            config_key = self._get_config_key(app_name, env)
            value = self.redis_client.hget(config_key, key)
            return value if value is not None else default
        except Exception as e:
            print(f"获取配置值失败: {e}")
            return default
    
    def update_config_value(self, app_name: str, key: str, value: Any,
                           env: str = 'default', user: str = 'system') -> bool:
        """更新单个配置值"""
        try:
            config_key = self._get_config_key(app_name, env)
            
            if not self.redis_client.exists(config_key):
                return False
            
            # 保存历史
            self._save_config_history(app_name, env, user)
            
            # 更新配置
            self.redis_client.hset(config_key, key, str(value))
            self.redis_client.hset(config_key, '_updated_at', datetime.now().isoformat())
            self.redis_client.hset(config_key, '_updated_by', user)
            
            return True
        except Exception as e:
            print(f"更新配置值失败: {e}")
            return False
    
    def delete_config_key(self, app_name: str, key: str, 
                         env: str = 'default', user: str = 'system') -> bool:
        """删除配置键"""
        try:
            config_key = self._get_config_key(app_name, env)
            
            if not self.redis_client.hexists(config_key, key):
                return False
            
            # 保存历史
            self._save_config_history(app_name, env, user)
            
            # 删除配置键
            self.redis_client.hdel(config_key, key)
            self.redis_client.hset(config_key, '_updated_at', datetime.now().isoformat())
            self.redis_client.hset(config_key, '_updated_by', user)
            
            return True
        except Exception as e:
            print(f"删除配置键失败: {e}")
            return False
    
    def _save_config_history(self, app_name: str, env: str, user: str) -> None:
        """保存配置历史"""
        try:
            config_key = self._get_config_key(app_name, env)
            current_config = self.redis_client.hgetall(config_key)
            
            if current_config:
                history_key = self._get_history_key(app_name, env)
                timestamp = datetime.now().isoformat()
                
                # 保存历史记录
                history_record = {
                    'timestamp': timestamp,
                    'user': user,
                    'config': json.dumps(current_config)
                }
                
                self.redis_client.lpush(history_key, json.dumps(history_record))
                # 只保留最近50个历史记录
                self.redis_client.ltrim(history_key, 0, 49)
        except Exception as e:
            print(f"保存配置历史失败: {e}")
    
    def get_config_history(self, app_name: str, env: str = 'default', 
                          limit: int = 10) -> List[Dict[str, Any]]:
        """获取配置历史"""
        try:
            history_key = self._get_history_key(app_name, env)
            history_records = self.redis_client.lrange(history_key, 0, limit - 1)
            
            history = []
            for record in history_records:
                history_data = json.loads(record)
                history_data['config'] = json.loads(history_data['config'])
                history.append(history_data)
            
            return history
        except Exception as e:
            print(f"获取配置历史失败: {e}")
            return []
    
    def rollback_config(self, app_name: str, env: str = 'default', 
                       version_index: int = 0, user: str = 'system') -> bool:
        """回滚配置"""
        try:
            history = self.get_config_history(app_name, env, version_index + 1)
            
            if len(history) <= version_index:
                return False
            
            # 获取要回滚的配置
            target_config = history[version_index]['config']
            
            # 移除元数据字段
            rollback_config = {k: v for k, v in target_config.items() 
                             if not k.startswith('_')}
            
            # 设置回滚配置
            return self.set_config(app_name, rollback_config, env, f"{user}(rollback)")
        except Exception as e:
            print(f"回滚配置失败: {e}")
            return False
    
    def list_apps(self) -> List[str]:
        """列出所有应用"""
        apps = set()
        
        try:
            for key in self.redis_client.scan_iter(match=f"{self.config_prefix}:*"):
                parts = key.split(':')
                if len(parts) >= 3:
                    apps.add(parts[1])
        except Exception as e:
            print(f"列出应用失败: {e}")
        
        return list(apps)
    
    def list_environments(self, app_name: str) -> List[str]:
        """列出应用的所有环境"""
        environments = set()
        
        try:
            pattern = f"{self.config_prefix}:{app_name}:*"
            for key in self.redis_client.scan_iter(match=pattern):
                parts = key.split(':')
                if len(parts) >= 3:
                    environments.add(parts[2])
        except Exception as e:
            print(f"列出环境失败: {e}")
        
        return list(environments)
    
    def export_config(self, app_name: str, env: str = 'default') -> Optional[str]:
        """导出配置为JSON"""
        try:
            config = self.get_config(app_name, env)
            if config:
                # 移除元数据
                export_config = {k: v for k, v in config.items() 
                               if not k.startswith('_')}
                return json.dumps(export_config, indent=2)
            return None
        except Exception as e:
            print(f"导出配置失败: {e}")
            return None
    
    def import_config(self, app_name: str, config_json: str, 
                     env: str = 'default', user: str = 'system') -> bool:
        """从JSON导入配置"""
        try:
            config_data = json.loads(config_json)
            return self.set_config(app_name, config_data, env, user)
        except Exception as e:
            print(f"导入配置失败: {e}")
            return False

# 使用示例
config_manager = RedisConfigManager()

# 设置应用配置
app_config = {
    'database_host': 'localhost',
    'database_port': '5432',
    'database_name': 'myapp',
    'redis_host': 'localhost',
    'redis_port': '6379',
    'log_level': 'INFO',
    'debug_mode': 'false',
    'max_connections': '100'
}

config_manager.set_config('myapp', app_config, 'production', 'admin')
print("生产环境配置已设置")

# 设置开发环境配置
dev_config = app_config.copy()
dev_config.update({
    'database_host': 'dev-db.local',
    'debug_mode': 'true',
    'log_level': 'DEBUG'
})

config_manager.set_config('myapp', dev_config, 'development', 'admin')
print("开发环境配置已设置")

# 获取配置
prod_config = config_manager.get_config('myapp', 'production')
print(f"生产环境配置: {prod_config}")

# 获取单个配置值
db_host = config_manager.get_config_value('myapp', 'database_host', 'production')
print(f"数据库主机: {db_host}")

# 更新配置值
config_manager.update_config_value('myapp', 'max_connections', '200', 'production', 'admin')
print("最大连接数已更新")

# 获取配置历史
history = config_manager.get_config_history('myapp', 'production', 5)
print(f"配置历史记录数: {len(history)}")

# 列出所有应用
apps = config_manager.list_apps()
print(f"应用列表: {apps}")

# 列出应用环境
envs = config_manager.list_environments('myapp')
print(f"myapp环境列表: {envs}")

# 导出配置
exported_config = config_manager.export_config('myapp', 'production')
if exported_config:
    print(f"导出的配置:\n{exported_config}")

3.7 性能优化

3.7.1 哈希编码优化

# 查看哈希编码
OBJECT ENCODING hash_key

# 小哈希使用ziplist编码(节省内存)
HSET small_hash field1 value1 field2 value2
OBJECT ENCODING small_hash
# 返回:"ziplist"

# 大哈希使用hashtable编码
# 当字段数量或值大小超过阈值时自动转换

3.7.2 内存优化配置

# 哈希优化配置参数
# redis.conf

# ziplist编码的最大字段数
hash-max-ziplist-entries 512

# ziplist编码的最大值大小
hash-max-ziplist-value 64

# 查看当前配置
CONFIG GET hash-max-ziplist-*

3.7.3 批量操作优化

import redis
from typing import Dict, List

class RedisHashOptimizer:
    def __init__(self, redis_client: redis.Redis):
        self.redis_client = redis_client
    
    def batch_set_hash_fields(self, key: str, field_values: Dict[str, str]) -> bool:
        """批量设置哈希字段"""
        try:
            # 使用HMSET批量设置
            self.redis_client.hmset(key, field_values)
            return True
        except Exception as e:
            print(f"批量设置失败: {e}")
            return False
    
    def batch_get_hash_fields(self, key: str, fields: List[str]) -> Dict[str, str]:
        """批量获取哈希字段"""
        try:
            values = self.redis_client.hmget(key, fields)
            return dict(zip(fields, values))
        except Exception as e:
            print(f"批量获取失败: {e}")
            return {}
    
    def pipeline_hash_operations(self, operations: List[Dict]) -> List:
        """管道化哈希操作"""
        try:
            pipe = self.redis_client.pipeline()
            
            for op in operations:
                if op['type'] == 'hset':
                    pipe.hset(op['key'], op['field'], op['value'])
                elif op['type'] == 'hget':
                    pipe.hget(op['key'], op['field'])
                elif op['type'] == 'hmset':
                    pipe.hmset(op['key'], op['data'])
                elif op['type'] == 'hgetall':
                    pipe.hgetall(op['key'])
            
            return pipe.execute()
        except Exception as e:
            print(f"管道操作失败: {e}")
            return []
    
    def analyze_hash_memory(self, key: str) -> Dict[str, Any]:
        """分析哈希内存使用"""
        try:
            if not self.redis_client.exists(key):
                return {'error': 'Key does not exist'}
            
            encoding = self.redis_client.object('encoding', key)
            memory_usage = self.redis_client.memory_usage(key)
            field_count = self.redis_client.hlen(key)
            
            return {
                'key': key,
                'encoding': encoding,
                'memory_usage': memory_usage,
                'field_count': field_count,
                'avg_memory_per_field': memory_usage / field_count if field_count > 0 else 0
            }
        except Exception as e:
            print(f"分析内存失败: {e}")
            return {'error': str(e)}

# 使用示例
redis_client = redis.Redis(decode_responses=True)
optimizer = RedisHashOptimizer(redis_client)

# 批量设置
user_data = {
    'name': 'Alice',
    'age': '25',
    'email': 'alice@example.com',
    'city': 'Shanghai'
}
optimizer.batch_set_hash_fields('user:1', user_data)

# 批量获取
fields = ['name', 'age', 'email']
result = optimizer.batch_get_hash_fields('user:1', fields)
print(f"批量获取结果: {result}")

# 管道操作
operations = [
    {'type': 'hset', 'key': 'user:2', 'field': 'name', 'value': 'Bob'},
    {'type': 'hset', 'key': 'user:2', 'field': 'age', 'value': '30'},
    {'type': 'hget', 'key': 'user:1', 'field': 'name'},
    {'type': 'hgetall', 'key': 'user:1'}
]
results = optimizer.pipeline_hash_operations(operations)
print(f"管道操作结果: {results}")

# 内存分析
memory_analysis = optimizer.analyze_hash_memory('user:1')
print(f"内存分析: {memory_analysis}")

3.8 总结

本章详细介绍了Redis哈希类型的使用方法,包括:

  1. 基本操作:HSET、HGET、HMSET、HMGET、HGETALL等命令
  2. 高级功能:条件设置、数值操作、字符串长度、扫描操作
  3. 实际应用:用户信息管理、商品管理、配置管理系统
  4. 性能优化:编码优化、批量操作、管道化处理

哈希类型特别适合存储结构化数据,如用户信息、商品信息等对象。相比于将整个对象序列化为字符串,使用哈希可以:

  • 节省内存空间(特别是小哈希)
  • 支持部分字段更新
  • 提供更好的可读性和操作性
  • 避免序列化/反序列化开销

在实际应用中,应根据数据特点和访问模式选择合适的数据结构和操作方法。


下一章将介绍Redis的列表类型及其应用场景。