2.1 字符串类型概述
2.1.1 什么是Redis字符串
Redis字符串是Redis中最基本的数据类型,它是二进制安全的,这意味着Redis字符串可以包含任何数据,比如JPEG图片或者序列化的对象。一个Redis字符串值最大可以是512MB。
2.1.2 字符串的特点
二进制安全
- 可以存储任何二进制数据
- 不会因为特殊字符而截断
- 支持图片、序列化对象等
高效存储
- 内部使用SDS(Simple Dynamic String)
- 自动扩容和缩容
- O(1)时间复杂度获取长度
多种编码
- int:整数编码
- embstr:短字符串编码
- raw:长字符串编码
2.1.3 应用场景
缓存
- 数据库查询结果缓存
- 页面缓存
- 对象序列化缓存
计数器
- 网站访问量
- 点赞数、评论数
- 库存数量
会话存储
- 用户登录状态
- 购物车信息
- 临时数据
分布式锁
- 资源互斥访问
- 防止重复操作
2.2 基本字符串操作
2.2.1 设置和获取
# 设置键值
SET key value
SET name "Redis"
SET age 5
# 获取值
GET key
GET name
# 返回:"Redis"
# 设置多个键值
MSET key1 value1 key2 value2 key3 value3
MSET user:1:name "Alice" user:1:age 25 user:1:email "alice@example.com"
# 获取多个值
MGET key1 key2 key3
MGET user:1:name user:1:age user:1:email
# 返回:1) "Alice" 2) "25" 3) "alice@example.com"
# 获取并设置新值
GETSET key new_value
GETSET counter 100
# 返回旧值,设置新值为100
2.2.2 条件设置
# 仅当键不存在时设置
SETNX key value
SETNX lock "locked"
# 返回:1(设置成功)或0(键已存在)
# 仅当键存在时设置
SET key value XX
SET existing_key "new_value" XX
# 仅当键不存在时设置
SET key value NX
SET new_key "value" NX
# 设置多个键(仅当所有键都不存在)
MSETNX key1 value1 key2 value2
MSETNX user:2:name "Bob" user:2:age 30
# 返回:1(全部设置成功)或0(至少一个键已存在)
2.2.3 带过期时间的设置
# 设置键值并指定过期时间(秒)
SETEX key seconds value
SETEX session:123 3600 "user_data"
# 设置键值并指定过期时间(毫秒)
PSETEX key milliseconds value
PSETEX temp_key 5000 "temporary_data"
# 设置键值和过期时间(通用格式)
SET key value EX seconds
SET key value PX milliseconds
SET cache_key "data" EX 300
SET temp_key "data" PX 30000
2.3 字符串操作命令
2.3.1 字符串长度和范围
# 获取字符串长度
STRLEN key
SET message "Hello Redis"
STRLEN message
# 返回:11
# 获取子字符串
GETRANGE key start end
GETRANGE message 0 4
# 返回:"Hello"
GETRANGE message -5 -1
# 返回:"Redis"
# 设置子字符串
SETRANGE key offset value
SETRANGE message 6 "World"
GET message
# 返回:"Hello World"
2.3.2 字符串追加
# 追加字符串
APPEND key value
SET greeting "Hello"
APPEND greeting " World"
GET greeting
# 返回:"Hello World"
# 如果键不存在,APPEND等同于SET
APPEND new_key "First"
GET new_key
# 返回:"First"
2.3.3 数值操作
# 递增操作
INCR key
SET counter 10
INCR counter
# 返回:11
# 递减操作
DECR key
DECR counter
# 返回:10
# 按指定值递增
INCRBY key increment
INCRBY counter 5
# 返回:15
# 按指定值递减
DECRBY key decrement
DECRBY counter 3
# 返回:12
# 浮点数递增
INCRBYFLOAT key increment
SET price 10.50
INCRBYFLOAT price 2.25
# 返回:"12.75"
2.4 位操作
2.4.1 位操作基础
# 设置位值
SETBIT key offset value
SETBIT bitmap 0 1
SETBIT bitmap 2 1
SETBIT bitmap 5 1
# 获取位值
GETBIT key offset
GETBIT bitmap 0
# 返回:1
GETBIT bitmap 1
# 返回:0
# 统计位为1的数量
BITCOUNT key [start end]
BITCOUNT bitmap
# 返回:3
BITCOUNT bitmap 0 0
# 返回:统计第一个字节中位为1的数量
2.4.2 位操作运算
# 位运算操作
BITOP operation destkey key [key ...]
# 设置测试数据
SET key1 "\x0f" # 二进制:00001111
SET key2 "\xf0" # 二进制:11110000
# 位与运算
BITOP AND result key1 key2
GET result
# 结果:00000000
# 位或运算
BITOP OR result key1 key2
GET result
# 结果:11111111
# 位异或运算
BITOP XOR result key1 key2
GET result
# 结果:11111111
# 位非运算
BITOP NOT result key1
GET result
# 结果:11110000
2.4.3 位查找
# 查找第一个指定位值的位置
BITPOS key bit [start] [end]
# 设置测试数据
SET mykey "\xff\x00\x00"
# 查找第一个0位
BITPOS mykey 0
# 返回:8
# 查找第一个1位
BITPOS mykey 1
# 返回:0
# 在指定范围内查找
BITPOS mykey 0 1 2
# 在第1到第2字节范围内查找第一个0位
2.5 实际应用示例
2.5.1 缓存系统实现
import redis
import json
import time
from typing import Any, Optional
class RedisCache:
def __init__(self, host='localhost', port=6379, db=0, password=None):
self.redis_client = redis.Redis(
host=host,
port=port,
db=db,
password=password,
decode_responses=True
)
def set(self, key: str, value: Any, expire: Optional[int] = None) -> bool:
"""设置缓存"""
try:
# 序列化复杂对象
if isinstance(value, (dict, list)):
value = json.dumps(value)
if expire:
return self.redis_client.setex(key, expire, value)
else:
return self.redis_client.set(key, value)
except Exception as e:
print(f"设置缓存失败: {e}")
return False
def get(self, key: str) -> Any:
"""获取缓存"""
try:
value = self.redis_client.get(key)
if value is None:
return None
# 尝试反序列化JSON
try:
return json.loads(value)
except (json.JSONDecodeError, TypeError):
return value
except Exception as e:
print(f"获取缓存失败: {e}")
return None
def delete(self, key: str) -> bool:
"""删除缓存"""
try:
return bool(self.redis_client.delete(key))
except Exception as e:
print(f"删除缓存失败: {e}")
return False
def exists(self, key: str) -> bool:
"""检查键是否存在"""
try:
return bool(self.redis_client.exists(key))
except Exception as e:
print(f"检查键存在失败: {e}")
return False
def expire(self, key: str, seconds: int) -> bool:
"""设置过期时间"""
try:
return bool(self.redis_client.expire(key, seconds))
except Exception as e:
print(f"设置过期时间失败: {e}")
return False
def ttl(self, key: str) -> int:
"""获取剩余生存时间"""
try:
return self.redis_client.ttl(key)
except Exception as e:
print(f"获取TTL失败: {e}")
return -1
# 使用示例
cache = RedisCache()
# 缓存字符串
cache.set("user:name", "Alice", expire=3600)
print(cache.get("user:name")) # 输出: Alice
# 缓存对象
user_data = {
"id": 1,
"name": "Alice",
"email": "alice@example.com",
"age": 25
}
cache.set("user:1", user_data, expire=1800)
print(cache.get("user:1")) # 输出: {'id': 1, 'name': 'Alice', ...}
# 检查缓存
if cache.exists("user:1"):
print(f"缓存剩余时间: {cache.ttl('user:1')} 秒")
2.5.2 计数器系统
import redis
from datetime import datetime, timedelta
from typing import Dict, List
class RedisCounter:
def __init__(self, host='localhost', port=6379, db=0):
self.redis_client = redis.Redis(
host=host,
port=port,
db=db,
decode_responses=True
)
def increment(self, key: str, amount: int = 1) -> int:
"""递增计数器"""
return self.redis_client.incrby(key, amount)
def decrement(self, key: str, amount: int = 1) -> int:
"""递减计数器"""
return self.redis_client.decrby(key, amount)
def get_count(self, key: str) -> int:
"""获取计数值"""
value = self.redis_client.get(key)
return int(value) if value else 0
def reset_counter(self, key: str) -> bool:
"""重置计数器"""
return bool(self.redis_client.delete(key))
def set_counter(self, key: str, value: int) -> bool:
"""设置计数器值"""
return self.redis_client.set(key, value)
def increment_with_expire(self, key: str, amount: int = 1, expire: int = 3600) -> int:
"""递增计数器并设置过期时间"""
pipe = self.redis_client.pipeline()
pipe.incrby(key, amount)
pipe.expire(key, expire)
results = pipe.execute()
return results[0]
def get_daily_counter(self, prefix: str) -> int:
"""获取今日计数"""
today = datetime.now().strftime("%Y-%m-%d")
key = f"{prefix}:{today}"
return self.get_count(key)
def increment_daily_counter(self, prefix: str, amount: int = 1) -> int:
"""递增今日计数"""
today = datetime.now().strftime("%Y-%m-%d")
key = f"{prefix}:{today}"
# 设置过期时间为明天凌晨
tomorrow = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) + timedelta(days=1)
expire_seconds = int((tomorrow - datetime.now()).total_seconds())
return self.increment_with_expire(key, amount, expire_seconds)
def get_hourly_stats(self, prefix: str, hours: int = 24) -> Dict[str, int]:
"""获取小时级统计"""
stats = {}
now = datetime.now()
for i in range(hours):
hour_time = now - timedelta(hours=i)
hour_key = hour_time.strftime("%Y-%m-%d:%H")
key = f"{prefix}:{hour_key}"
stats[hour_key] = self.get_count(key)
return stats
def increment_hourly_counter(self, prefix: str, amount: int = 1) -> int:
"""递增小时计数"""
hour = datetime.now().strftime("%Y-%m-%d:%H")
key = f"{prefix}:{hour}"
return self.increment_with_expire(key, amount, 3600 * 25) # 保留25小时
# 使用示例
counter = RedisCounter()
# 网站访问量统计
counter.increment("website:visits")
counter.increment("website:visits", 5)
print(f"总访问量: {counter.get_count('website:visits')}")
# 今日访问量统计
counter.increment_daily_counter("website:daily_visits")
print(f"今日访问量: {counter.get_daily_counter('website:daily_visits')}")
# 小时级访问量统计
counter.increment_hourly_counter("website:hourly_visits")
hourly_stats = counter.get_hourly_stats("website:hourly_visits", 12)
print("过去12小时访问量:")
for hour, count in hourly_stats.items():
print(f" {hour}: {count}")
# 用户点赞统计
user_id = 123
post_id = 456
counter.increment(f"user:{user_id}:likes")
counter.increment(f"post:{post_id}:likes")
print(f"用户{user_id}获得点赞数: {counter.get_count(f'user:{user_id}:likes')}")
print(f"帖子{post_id}点赞数: {counter.get_count(f'post:{post_id}:likes')}")
2.5.3 分布式锁实现
import redis
import time
import uuid
from typing import Optional
class RedisDistributedLock:
def __init__(self, redis_client: redis.Redis, key: str, timeout: int = 10):
self.redis_client = redis_client
self.key = key
self.timeout = timeout
self.identifier = str(uuid.uuid4())
self.acquired = False
def acquire(self, blocking: bool = True, blocking_timeout: Optional[int] = None) -> bool:
"""获取锁"""
end_time = None
if blocking_timeout is not None:
end_time = time.time() + blocking_timeout
while True:
# 尝试获取锁
if self.redis_client.set(self.key, self.identifier, nx=True, ex=self.timeout):
self.acquired = True
return True
if not blocking:
return False
if end_time is not None and time.time() > end_time:
return False
time.sleep(0.001) # 短暂等待
def release(self) -> bool:
"""释放锁"""
if not self.acquired:
return False
# 使用Lua脚本确保原子性
lua_script = """
if redis.call('GET', KEYS[1]) == ARGV[1] then
return redis.call('DEL', KEYS[1])
else
return 0
end
"""
result = self.redis_client.eval(lua_script, 1, self.key, self.identifier)
if result:
self.acquired = False
return True
return False
def extend(self, additional_time: int) -> bool:
"""延长锁的过期时间"""
if not self.acquired:
return False
lua_script = """
if redis.call('GET', KEYS[1]) == ARGV[1] then
return redis.call('EXPIRE', KEYS[1], ARGV[2])
else
return 0
end
"""
result = self.redis_client.eval(
lua_script, 1, self.key, self.identifier, self.timeout + additional_time
)
return bool(result)
def __enter__(self):
if not self.acquire():
raise RuntimeError(f"无法获取锁: {self.key}")
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.release()
class RedisLockManager:
def __init__(self, host='localhost', port=6379, db=0):
self.redis_client = redis.Redis(
host=host,
port=port,
db=db,
decode_responses=True
)
def get_lock(self, key: str, timeout: int = 10) -> RedisDistributedLock:
"""获取分布式锁实例"""
return RedisDistributedLock(self.redis_client, f"lock:{key}", timeout)
def is_locked(self, key: str) -> bool:
"""检查锁是否存在"""
return bool(self.redis_client.exists(f"lock:{key}"))
def force_release(self, key: str) -> bool:
"""强制释放锁(谨慎使用)"""
return bool(self.redis_client.delete(f"lock:{key}"))
# 使用示例
lock_manager = RedisLockManager()
# 基本锁使用
lock = lock_manager.get_lock("resource:123", timeout=30)
if lock.acquire(blocking=True, blocking_timeout=5):
try:
print("获取锁成功,执行业务逻辑")
time.sleep(2) # 模拟业务处理
print("业务逻辑执行完成")
finally:
lock.release()
print("锁已释放")
else:
print("获取锁失败")
# 使用上下文管理器
try:
with lock_manager.get_lock("resource:456", timeout=20):
print("在锁保护下执行业务逻辑")
time.sleep(1)
print("业务逻辑执行完成")
except RuntimeError as e:
print(f"锁获取失败: {e}")
# 防止重复操作的示例
def process_order(order_id: str):
"""处理订单(防止重复处理)"""
lock_key = f"order_process:{order_id}"
lock = lock_manager.get_lock(lock_key, timeout=60)
if not lock.acquire(blocking=False):
print(f"订单 {order_id} 正在处理中,跳过")
return False
try:
print(f"开始处理订单 {order_id}")
# 模拟订单处理逻辑
time.sleep(3)
print(f"订单 {order_id} 处理完成")
return True
finally:
lock.release()
# 测试防重复处理
process_order("ORD001")
2.5.4 会话存储系统
import redis
import json
import hashlib
import time
from typing import Dict, Any, Optional
class RedisSessionManager:
def __init__(self, host='localhost', port=6379, db=0, prefix='session'):
self.redis_client = redis.Redis(
host=host,
port=port,
db=db,
decode_responses=True
)
self.prefix = prefix
self.default_expire = 3600 # 默认1小时过期
def _get_session_key(self, session_id: str) -> str:
"""获取会话键名"""
return f"{self.prefix}:{session_id}"
def create_session(self, user_id: str, user_data: Dict[str, Any],
expire: Optional[int] = None) -> str:
"""创建新会话"""
# 生成会话ID
timestamp = str(time.time())
session_id = hashlib.md5(f"{user_id}:{timestamp}".encode()).hexdigest()
# 会话数据
session_data = {
'user_id': user_id,
'created_at': timestamp,
'last_access': timestamp,
'data': user_data
}
# 存储会话
session_key = self._get_session_key(session_id)
expire_time = expire or self.default_expire
self.redis_client.setex(
session_key,
expire_time,
json.dumps(session_data)
)
return session_id
def get_session(self, session_id: str) -> Optional[Dict[str, Any]]:
"""获取会话数据"""
session_key = self._get_session_key(session_id)
session_data = self.redis_client.get(session_key)
if not session_data:
return None
try:
data = json.loads(session_data)
# 更新最后访问时间
data['last_access'] = str(time.time())
self.redis_client.setex(
session_key,
self.redis_client.ttl(session_key),
json.dumps(data)
)
return data
except json.JSONDecodeError:
return None
def update_session(self, session_id: str, user_data: Dict[str, Any]) -> bool:
"""更新会话数据"""
session_data = self.get_session(session_id)
if not session_data:
return False
session_data['data'].update(user_data)
session_data['last_access'] = str(time.time())
session_key = self._get_session_key(session_id)
ttl = self.redis_client.ttl(session_key)
if ttl > 0:
self.redis_client.setex(
session_key,
ttl,
json.dumps(session_data)
)
return True
return False
def delete_session(self, session_id: str) -> bool:
"""删除会话"""
session_key = self._get_session_key(session_id)
return bool(self.redis_client.delete(session_key))
def extend_session(self, session_id: str, additional_time: int = None) -> bool:
"""延长会话过期时间"""
session_key = self._get_session_key(session_id)
if not self.redis_client.exists(session_key):
return False
extend_time = additional_time or self.default_expire
return bool(self.redis_client.expire(session_key, extend_time))
def get_session_info(self, session_id: str) -> Optional[Dict[str, Any]]:
"""获取会话信息(不更新访问时间)"""
session_key = self._get_session_key(session_id)
session_data = self.redis_client.get(session_key)
if not session_data:
return None
try:
data = json.loads(session_data)
ttl = self.redis_client.ttl(session_key)
return {
'session_id': session_id,
'user_id': data.get('user_id'),
'created_at': data.get('created_at'),
'last_access': data.get('last_access'),
'ttl': ttl,
'data': data.get('data', {})
}
except json.JSONDecodeError:
return None
def get_user_sessions(self, user_id: str) -> List[str]:
"""获取用户的所有会话ID"""
pattern = f"{self.prefix}:*"
sessions = []
for key in self.redis_client.scan_iter(match=pattern):
session_data = self.redis_client.get(key)
if session_data:
try:
data = json.loads(session_data)
if data.get('user_id') == user_id:
session_id = key.split(':')[-1]
sessions.append(session_id)
except json.JSONDecodeError:
continue
return sessions
def cleanup_expired_sessions(self) -> int:
"""清理过期会话(Redis会自动清理,这里用于统计)"""
pattern = f"{self.prefix}:*"
expired_count = 0
for key in self.redis_client.scan_iter(match=pattern):
if self.redis_client.ttl(key) == -2: # 键已过期
expired_count += 1
return expired_count
# 使用示例
session_manager = RedisSessionManager()
# 创建用户会话
user_data = {
'username': 'alice',
'email': 'alice@example.com',
'role': 'user',
'preferences': {
'theme': 'dark',
'language': 'zh-CN'
}
}
session_id = session_manager.create_session('user_123', user_data, expire=7200)
print(f"创建会话: {session_id}")
# 获取会话数据
session_data = session_manager.get_session(session_id)
if session_data:
print(f"用户ID: {session_data['user_id']}")
print(f"用户名: {session_data['data']['username']}")
print(f"最后访问: {session_data['last_access']}")
# 更新会话数据
session_manager.update_session(session_id, {
'last_login_ip': '192.168.1.100',
'preferences': {
'theme': 'light',
'language': 'en-US'
}
})
# 获取会话信息
session_info = session_manager.get_session_info(session_id)
if session_info:
print(f"会话TTL: {session_info['ttl']} 秒")
print(f"用户偏好: {session_info['data']['preferences']}")
# 延长会话
session_manager.extend_session(session_id, 3600)
print("会话已延长1小时")
# 获取用户所有会话
user_sessions = session_manager.get_user_sessions('user_123')
print(f"用户会话列表: {user_sessions}")
# 删除会话
# session_manager.delete_session(session_id)
# print("会话已删除")
2.6 性能优化
2.6.1 字符串编码优化
# 查看字符串编码
OBJECT ENCODING key
# 整数编码(最节省内存)
SET counter 123
OBJECT ENCODING counter
# 返回:"int"
# 短字符串编码(embstr)
SET short_string "hello"
OBJECT ENCODING short_string
# 返回:"embstr"
# 长字符串编码(raw)
SET long_string "very long string..."
OBJECT ENCODING long_string
# 返回:"raw"
2.6.2 内存使用优化
import redis
class RedisStringOptimizer:
def __init__(self, redis_client: redis.Redis):
self.redis_client = redis_client
def analyze_string_memory(self, key: str) -> Dict[str, Any]:
"""分析字符串内存使用"""
if not self.redis_client.exists(key):
return {'error': 'Key does not exist'}
# 获取基本信息
value = self.redis_client.get(key)
encoding = self.redis_client.object('encoding', key)
memory_usage = self.redis_client.memory_usage(key)
ttl = self.redis_client.ttl(key)
return {
'key': key,
'value_length': len(value) if value else 0,
'encoding': encoding,
'memory_usage': memory_usage,
'ttl': ttl,
'type': self.redis_client.type(key)
}
def optimize_integer_storage(self, key: str, value: int) -> bool:
"""优化整数存储"""
# 直接设置整数,Redis会自动使用int编码
return self.redis_client.set(key, value)
def compress_json_string(self, key: str, data: dict, expire: int = None) -> bool:
"""压缩JSON字符串存储"""
import json
import gzip
# 序列化并压缩
json_str = json.dumps(data, separators=(',', ':')) # 紧凑格式
compressed_data = gzip.compress(json_str.encode('utf-8'))
# 存储压缩数据
if expire:
return self.redis_client.setex(f"{key}:compressed", expire, compressed_data)
else:
return self.redis_client.set(f"{key}:compressed", compressed_data)
def decompress_json_string(self, key: str) -> dict:
"""解压缩JSON字符串"""
import json
import gzip
compressed_data = self.redis_client.get(f"{key}:compressed")
if not compressed_data:
return None
# 解压缩并反序列化
json_str = gzip.decompress(compressed_data).decode('utf-8')
return json.loads(json_str)
def batch_analyze_keys(self, pattern: str = '*') -> List[Dict[str, Any]]:
"""批量分析键的内存使用"""
results = []
for key in self.redis_client.scan_iter(match=pattern):
if self.redis_client.type(key) == 'string':
analysis = self.analyze_string_memory(key)
results.append(analysis)
# 按内存使用排序
results.sort(key=lambda x: x.get('memory_usage', 0), reverse=True)
return results
# 使用示例
redis_client = redis.Redis(decode_responses=True)
optimizer = RedisStringOptimizer(redis_client)
# 分析字符串内存使用
analysis = optimizer.analyze_string_memory('user:1')
print(f"内存分析: {analysis}")
# 优化整数存储
optimizer.optimize_integer_storage('counter', 12345)
print(f"整数编码: {redis_client.object('encoding', 'counter')}")
# 压缩存储大型JSON
large_data = {
'users': [{'id': i, 'name': f'user_{i}', 'data': 'x' * 100} for i in range(1000)]
}
optimizer.compress_json_string('large_data', large_data, expire=3600)
# 解压缩数据
decompressed_data = optimizer.decompress_json_string('large_data')
print(f"解压缩后数据长度: {len(decompressed_data['users'])}")
# 批量分析内存使用
memory_analysis = optimizer.batch_analyze_keys('user:*')
print("内存使用最多的前5个键:")
for item in memory_analysis[:5]:
print(f" {item['key']}: {item['memory_usage']} bytes")
2.7 总结
本章详细介绍了Redis字符串类型的使用方法,包括:
- 基本操作:SET、GET、MSET、MGET等基础命令
- 高级功能:条件设置、过期时间、字符串操作、数值操作
- 位操作:SETBIT、GETBIT、BITCOUNT、BITOP等位级操作
- 实际应用:缓存系统、计数器、分布式锁、会话存储
- 性能优化:编码优化、内存使用优化、压缩存储
字符串类型是Redis中最基础也是最重要的数据类型,掌握其使用方法对于高效使用Redis至关重要。在实际应用中,应根据具体场景选择合适的操作方法和优化策略。
下一章将介绍Redis的哈希类型及其应用场景。