8.1 章节概述
时间序列分析是Pandas的核心功能之一,广泛应用于金融、经济、气象、物联网等领域。本章将深入探讨Pandas中时间序列数据的处理方法,包括日期时间索引、时间序列操作、重采样、滚动窗口分析等高级技术。
graph TD
A[时间序列分析] --> B[日期时间基础]
A --> C[时间序列索引]
A --> D[时间序列操作]
A --> E[重采样与频率转换]
A --> F[滚动窗口分析]
A --> G[时间序列可视化]
B --> B1[datetime对象]
B --> B2[时间戳与周期]
B --> B3[时区处理]
C --> C1[DatetimeIndex]
C --> C2[PeriodIndex]
C --> C3[TimedeltaIndex]
D --> D1[时间选择与切片]
D --> D2[时间偏移]
D --> D3[时间序列运算]
E --> E1[上采样与下采样]
E --> E2[聚合函数]
E --> E3[插值方法]
F --> F1[移动平均]
F --> F2[滚动统计]
F --> F3[指数加权]
G --> G1[趋势分析]
G --> G2[季节性分析]
G --> G3[异常检测]
8.1.1 学习目标
- 掌握Pandas中日期时间数据的处理方法
- 理解时间序列索引的创建和使用
- 学会时间序列数据的选择、切片和操作
- 掌握重采样和频率转换技术
- 学习滚动窗口分析和移动统计
- 了解时间序列数据的可视化方法
8.1.2 应用场景
- 金融分析:股价分析、风险管理、投资组合优化
- 经济预测:GDP预测、通胀分析、经济指标监控
- 运营分析:销售趋势、用户行为、业务指标监控
- 物联网:传感器数据分析、设备监控、预测性维护
- 气象分析:天气预报、气候变化、环境监测
8.2 日期时间基础
8.2.1 Python datetime基础
import pandas as pd
import numpy as np
from datetime import datetime, date, time, timedelta
import matplotlib.pyplot as plt
import seaborn as sns
# 设置中文字体和样式
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
sns.set_style("whitegrid")
print("=== Python datetime基础 ===")
# 1. 创建datetime对象
print("1. 创建datetime对象:")
dt1 = datetime(2024, 1, 15, 10, 30, 45)
dt2 = datetime.now()
dt3 = datetime.today()
print(f"指定日期时间:{dt1}")
print(f"当前时间:{dt2}")
print(f"今天日期:{dt3}")
# 2. 日期时间组件
print("\n2. 日期时间组件:")
print(f"年份:{dt1.year}")
print(f"月份:{dt1.month}")
print(f"日期:{dt1.day}")
print(f"小时:{dt1.hour}")
print(f"分钟:{dt1.minute}")
print(f"秒数:{dt1.second}")
print(f"星期几:{dt1.weekday()} (0=周一)")
print(f"一年中第几天:{dt1.timetuple().tm_yday}")
# 3. 时间差计算
print("\n3. 时间差计算:")
td = timedelta(days=30, hours=12, minutes=30)
future_date = dt1 + td
past_date = dt1 - td
print(f"原始日期:{dt1}")
print(f"30天12小时30分钟后:{future_date}")
print(f"30天12小时30分钟前:{past_date}")
# 4. 字符串转换
print("\n4. 字符串转换:")
date_str = "2024-01-15 10:30:45"
parsed_date = datetime.strptime(date_str, "%Y-%m-%d %H:%M:%S")
formatted_str = dt1.strftime("%Y年%m月%d日 %H:%M:%S")
print(f"字符串转日期:{parsed_date}")
print(f"日期转字符串:{formatted_str}")
8.2.2 Pandas时间戳
print("\n=== Pandas时间戳 ===")
# 1. 创建Timestamp
print("1. 创建Timestamp:")
ts1 = pd.Timestamp('2024-01-15')
ts2 = pd.Timestamp('2024-01-15 10:30:45')
ts3 = pd.Timestamp(2024, 1, 15, 10, 30, 45)
ts4 = pd.Timestamp.now()
print(f"日期时间戳:{ts1}")
print(f"完整时间戳:{ts2}")
print(f"参数构造:{ts3}")
print(f"当前时间戳:{ts4}")
# 2. Timestamp属性
print("\n2. Timestamp属性:")
print(f"年份:{ts2.year}")
print(f"季度:{ts2.quarter}")
print(f"月份:{ts2.month}")
print(f"周数:{ts2.week}")
print(f"星期几:{ts2.dayofweek}")
print(f"是否月末:{ts2.is_month_end}")
print(f"是否年末:{ts2.is_year_end}")
# 3. 时间戳运算
print("\n3. 时间戳运算:")
ts_future = ts2 + pd.Timedelta(days=30)
ts_past = ts2 - pd.Timedelta(weeks=2)
print(f"原始时间:{ts2}")
print(f"30天后:{ts_future}")
print(f"2周前:{ts_past}")
# 4. 时间戳比较
print("\n4. 时间戳比较:")
ts_list = [
pd.Timestamp('2024-01-15'),
pd.Timestamp('2024-02-15'),
pd.Timestamp('2024-01-10')
]
print(f"时间戳列表:{ts_list}")
print(f"最早时间:{min(ts_list)}")
print(f"最晚时间:{max(ts_list)}")
print(f"排序结果:{sorted(ts_list)}")
8.2.3 时区处理
print("\n=== 时区处理 ===")
# 1. 创建带时区的时间戳
print("1. 创建带时区的时间戳:")
ts_utc = pd.Timestamp('2024-01-15 10:30:45', tz='UTC')
ts_beijing = pd.Timestamp('2024-01-15 10:30:45', tz='Asia/Shanghai')
ts_ny = pd.Timestamp('2024-01-15 10:30:45', tz='America/New_York')
print(f"UTC时间:{ts_utc}")
print(f"北京时间:{ts_beijing}")
print(f"纽约时间:{ts_ny}")
# 2. 时区转换
print("\n2. 时区转换:")
ts_local = pd.Timestamp('2024-01-15 10:30:45')
ts_with_tz = ts_local.tz_localize('Asia/Shanghai')
ts_converted = ts_with_tz.tz_convert('UTC')
print(f"本地时间:{ts_local}")
print(f"添加时区:{ts_with_tz}")
print(f"转换时区:{ts_converted}")
# 3. 常用时区
print("\n3. 常用时区:")
common_timezones = [
'UTC',
'Asia/Shanghai',
'Asia/Tokyo',
'Europe/London',
'America/New_York',
'America/Los_Angeles'
]
base_time = pd.Timestamp('2024-01-15 12:00:00', tz='UTC')
for tz in common_timezones:
converted = base_time.tz_convert(tz)
print(f"{tz:20}: {converted}")
8.3 时间序列索引
8.3.1 DatetimeIndex
print("\n=== DatetimeIndex ===")
# 1. 创建DatetimeIndex
print("1. 创建DatetimeIndex:")
# 方法1:从字符串列表创建
date_strings = ['2024-01-01', '2024-01-02', '2024-01-03', '2024-01-04']
dt_index1 = pd.DatetimeIndex(date_strings)
# 方法2:使用date_range
dt_index2 = pd.date_range('2024-01-01', periods=10, freq='D')
dt_index3 = pd.date_range('2024-01-01', '2024-01-31', freq='D')
print(f"从字符串创建:{dt_index1}")
print(f"指定周期:{dt_index2}")
print(f"指定范围:{dt_index3[:5]}...") # 只显示前5个
# 3. 不同频率的时间索引
print("\n2. 不同频率的时间索引:")
frequencies = {
'D': '每日',
'W': '每周',
'M': '每月末',
'MS': '每月初',
'Q': '每季度末',
'QS': '每季度初',
'Y': '每年末',
'YS': '每年初',
'H': '每小时',
'T': '每分钟',
'S': '每秒'
}
for freq, desc in frequencies.items():
if freq in ['D', 'W', 'M', 'Q', 'Y']:
idx = pd.date_range('2024-01-01', periods=5, freq=freq)
print(f"{desc:8} ({freq:2}): {idx[0]} 到 {idx[-1]}")
# 4. 自定义频率
print("\n3. 自定义频率:")
custom_freqs = {
'2D': '每2天',
'3H': '每3小时',
'15T': '每15分钟',
'W-MON': '每周一',
'BM': '每月最后一个工作日',
'BQ': '每季度最后一个工作日'
}
for freq, desc in custom_freqs.items():
idx = pd.date_range('2024-01-01', periods=3, freq=freq)
print(f"{desc:15} ({freq:6}): {list(idx)}")
8.3.2 时间序列DataFrame
print("\n=== 时间序列DataFrame ===")
# 1. 创建时间序列数据
print("1. 创建时间序列数据:")
dates = pd.date_range('2024-01-01', periods=100, freq='D')
np.random.seed(42)
# 模拟股价数据
price_data = {
'open': 100 + np.cumsum(np.random.randn(100) * 0.5),
'high': 100 + np.cumsum(np.random.randn(100) * 0.5) + np.random.rand(100) * 2,
'low': 100 + np.cumsum(np.random.randn(100) * 0.5) - np.random.rand(100) * 2,
'close': 100 + np.cumsum(np.random.randn(100) * 0.5),
'volume': np.random.randint(1000, 10000, 100)
}
# 确保high >= max(open, close), low <= min(open, close)
for i in range(100):
price_data['high'][i] = max(price_data['high'][i],
price_data['open'][i],
price_data['close'][i])
price_data['low'][i] = min(price_data['low'][i],
price_data['open'][i],
price_data['close'][i])
stock_df = pd.DataFrame(price_data, index=dates)
print("股价数据示例:")
print(stock_df.head())
# 2. 时间序列基本信息
print("\n2. 时间序列基本信息:")
print(f"数据形状:{stock_df.shape}")
print(f"时间范围:{stock_df.index.min()} 到 {stock_df.index.max()}")
print(f"频率:{stock_df.index.freq}")
print(f"时间跨度:{stock_df.index.max() - stock_df.index.min()}")
# 3. 时间索引属性
print("\n3. 时间索引属性:")
print(f"年份范围:{stock_df.index.year.unique()}")
print(f"月份范围:{stock_df.index.month.unique()}")
print(f"星期分布:{stock_df.index.dayofweek.value_counts().sort_index()}")
# 4. 添加时间特征
print("\n4. 添加时间特征:")
stock_df['year'] = stock_df.index.year
stock_df['month'] = stock_df.index.month
stock_df['quarter'] = stock_df.index.quarter
stock_df['dayofweek'] = stock_df.index.dayofweek
stock_df['is_month_end'] = stock_df.index.is_month_end
print("添加时间特征后:")
print(stock_df[['close', 'year', 'month', 'quarter', 'dayofweek', 'is_month_end']].head())
8.3.3 PeriodIndex
print("\n=== PeriodIndex ===")
# 1. 创建PeriodIndex
print("1. 创建PeriodIndex:")
period_index1 = pd.period_range('2024-01', periods=12, freq='M')
period_index2 = pd.period_range('2024Q1', periods=4, freq='Q')
period_index3 = pd.period_range('2024', periods=5, freq='Y')
print(f"月度周期:{period_index1}")
print(f"季度周期:{period_index2}")
print(f"年度周期:{period_index3}")
# 2. Period对象属性
print("\n2. Period对象属性:")
period = pd.Period('2024-01', freq='M')
print(f"周期:{period}")
print(f"开始时间:{period.start_time}")
print(f"结束时间:{period.end_time}")
print(f"天数:{period.days_in_month}")
# 3. Period与Timestamp转换
print("\n3. Period与Timestamp转换:")
timestamp_index = pd.date_range('2024-01-01', periods=12, freq='M')
period_from_timestamp = timestamp_index.to_period('M')
timestamp_from_period = period_index1.to_timestamp()
print(f"Timestamp索引:{timestamp_index[:3]}")
print(f"转换为Period:{period_from_timestamp[:3]}")
print(f"Period转Timestamp:{timestamp_from_period[:3]}")
# 4. 使用PeriodIndex的DataFrame
print("\n4. 使用PeriodIndex的DataFrame:")
monthly_data = pd.DataFrame({
'sales': np.random.randint(1000, 5000, 12),
'profit': np.random.randint(100, 800, 12)
}, index=period_index1)
print("月度数据:")
print(monthly_data.head())
8.4 时间序列操作
8.4.1 时间选择与切片
print("\n=== 时间选择与切片 ===")
# 使用之前创建的stock_df
print("1. 基本时间选择:")
# 选择特定日期
specific_date = stock_df.loc['2024-01-15']
print(f"特定日期数据:\n{specific_date}")
# 选择日期范围
date_range_data = stock_df.loc['2024-01-10':'2024-01-20']
print(f"\n日期范围数据:\n{date_range_data.head()}")
# 选择特定月份
january_data = stock_df.loc['2024-01']
print(f"\n1月份数据形状:{january_data.shape}")
# 2. 高级时间选择
print("\n2. 高级时间选择:")
# 选择特定年份
year_2024 = stock_df.loc['2024']
print(f"2024年数据形状:{year_2024.shape}")
# 选择多个不连续日期
specific_dates = ['2024-01-01', '2024-01-15', '2024-02-01']
multi_dates_data = stock_df.loc[specific_dates]
print(f"多个日期数据:\n{multi_dates_data}")
# 3. 条件选择
print("\n3. 条件选择:")
# 选择周末数据
weekend_data = stock_df[stock_df.index.dayofweek >= 5]
print(f"周末数据形状:{weekend_data.shape}")
# 选择月末数据
month_end_data = stock_df[stock_df.index.is_month_end]
print(f"月末数据:\n{month_end_data}")
# 选择特定时间段的高价股票
high_price_jan = stock_df.loc['2024-01'][stock_df.loc['2024-01', 'close'] > 102]
print(f"1月高价数据形状:{high_price_jan.shape}")
# 4. 时间切片技巧
print("\n4. 时间切片技巧:")
# 最近N天
last_10_days = stock_df.tail(10)
print(f"最近10天数据:\n{last_10_days.index}")
# 每周第一天
weekly_first = stock_df.groupby(stock_df.index.to_period('W')).first()
print(f"每周第一天数据形状:{weekly_first.shape}")
# 每月最后一个交易日
monthly_last = stock_df.groupby(stock_df.index.to_period('M')).last()
print(f"每月最后交易日:\n{monthly_last.index}")
8.4.2 时间偏移
print("\n=== 时间偏移 ===")
# 1. 基本偏移操作
print("1. 基本偏移操作:")
# 数据向前偏移
shifted_forward = stock_df['close'].shift(1) # 向前偏移1天
shifted_backward = stock_df['close'].shift(-1) # 向后偏移1天
comparison_df = pd.DataFrame({
'original': stock_df['close'],
'shift_1': shifted_forward,
'shift_-1': shifted_backward
})
print("偏移对比:")
print(comparison_df.head())
# 2. 计算收益率
print("\n2. 计算收益率:")
# 日收益率
stock_df['daily_return'] = stock_df['close'].pct_change()
# 对数收益率
stock_df['log_return'] = np.log(stock_df['close'] / stock_df['close'].shift(1))
# 累积收益率
stock_df['cumulative_return'] = (1 + stock_df['daily_return']).cumprod() - 1
print("收益率计算:")
print(stock_df[['close', 'daily_return', 'log_return', 'cumulative_return']].head(10))
# 3. 时间偏移与频率
print("\n3. 时间偏移与频率:")
# 使用不同频率的偏移
offsets = {
'BDay': '工作日',
'Week': '周',
'MonthEnd': '月末',
'QuarterEnd': '季度末'
}
base_date = pd.Timestamp('2024-01-15')
for offset_name, desc in offsets.items():
offset = getattr(pd.offsets, offset_name)()
new_date = base_date + offset
print(f"{desc:8}: {base_date} + {offset_name} = {new_date}")
# 4. 自定义偏移
print("\n4. 自定义偏移:")
# 创建自定义工作日偏移
custom_offset = pd.offsets.CustomBusinessDay(weekmask='Mon Tue Wed Thu Fri')
holidays = pd.to_datetime(['2024-01-01', '2024-02-14']) # 假设的节假日
holiday_offset = pd.offsets.CustomBusinessDay(holidays=holidays)
print(f"自定义工作日:{base_date + custom_offset}")
print(f"考虑节假日:{base_date + holiday_offset}")
# 5. 滞后特征创建
print("\n5. 滞后特征创建:")
# 创建多个滞后特征
for lag in [1, 3, 5, 10]:
stock_df[f'close_lag_{lag}'] = stock_df['close'].shift(lag)
stock_df[f'volume_lag_{lag}'] = stock_df['volume'].shift(lag)
lag_features = [col for col in stock_df.columns if 'lag' in col]
print(f"滞后特征:{lag_features}")
print(stock_df[['close'] + lag_features[:4]].head(12))
8.4.3 时间序列运算
print("\n=== 时间序列运算 ===")
# 1. 基本统计运算
print("1. 基本统计运算:")
# 移动平均
stock_df['ma_5'] = stock_df['close'].rolling(window=5).mean()
stock_df['ma_20'] = stock_df['close'].rolling(window=20).mean()
# 移动标准差
stock_df['std_5'] = stock_df['close'].rolling(window=5).std()
# 移动最大最小值
stock_df['max_5'] = stock_df['close'].rolling(window=5).max()
stock_df['min_5'] = stock_df['close'].rolling(window=5).min()
print("移动统计:")
print(stock_df[['close', 'ma_5', 'ma_20', 'std_5', 'max_5', 'min_5']].head(25))
# 2. 技术指标计算
print("\n2. 技术指标计算:")
# RSI (相对强弱指数)
def calculate_rsi(prices, window=14):
delta = prices.diff()
gain = (delta.where(delta > 0, 0)).rolling(window=window).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=window).mean()
rs = gain / loss
rsi = 100 - (100 / (1 + rs))
return rsi
stock_df['rsi'] = calculate_rsi(stock_df['close'])
# MACD (移动平均收敛散度)
def calculate_macd(prices, fast=12, slow=26, signal=9):
ema_fast = prices.ewm(span=fast).mean()
ema_slow = prices.ewm(span=slow).mean()
macd = ema_fast - ema_slow
signal_line = macd.ewm(span=signal).mean()
histogram = macd - signal_line
return macd, signal_line, histogram
stock_df['macd'], stock_df['macd_signal'], stock_df['macd_histogram'] = calculate_macd(stock_df['close'])
# 布林带
def calculate_bollinger_bands(prices, window=20, num_std=2):
ma = prices.rolling(window=window).mean()
std = prices.rolling(window=window).std()
upper_band = ma + (std * num_std)
lower_band = ma - (std * num_std)
return upper_band, ma, lower_band
stock_df['bb_upper'], stock_df['bb_middle'], stock_df['bb_lower'] = calculate_bollinger_bands(stock_df['close'])
print("技术指标:")
print(stock_df[['close', 'rsi', 'macd', 'bb_upper', 'bb_lower']].tail(10))
# 3. 时间序列分解
print("\n3. 时间序列分解:")
# 创建带趋势和季节性的数据
dates_extended = pd.date_range('2023-01-01', periods=365, freq='D')
trend = np.linspace(100, 120, 365)
seasonal = 10 * np.sin(2 * np.pi * np.arange(365) / 365.25 * 4) # 季节性
noise = np.random.normal(0, 2, 365)
ts_data = trend + seasonal + noise
ts_df = pd.DataFrame({'value': ts_data}, index=dates_extended)
# 简单的趋势提取
ts_df['trend'] = ts_df['value'].rolling(window=30, center=True).mean()
ts_df['detrended'] = ts_df['value'] - ts_df['trend']
print("时间序列分解:")
print(ts_df.head(10))
# 4. 异常值检测
print("\n4. 异常值检测:")
# 基于标准差的异常值检测
def detect_outliers_std(series, threshold=3):
mean = series.mean()
std = series.std()
outliers = np.abs(series - mean) > threshold * std
return outliers
# 基于IQR的异常值检测
def detect_outliers_iqr(series, factor=1.5):
Q1 = series.quantile(0.25)
Q3 = series.quantile(0.75)
IQR = Q3 - Q1
outliers = (series < Q1 - factor * IQR) | (series > Q3 + factor * IQR)
return outliers
stock_df['outlier_std'] = detect_outliers_std(stock_df['close'])
stock_df['outlier_iqr'] = detect_outliers_iqr(stock_df['close'])
outlier_summary = pd.DataFrame({
'std_method': stock_df['outlier_std'].sum(),
'iqr_method': stock_df['outlier_iqr'].sum()
}, index=['outlier_count'])
print("异常值检测结果:")
print(outlier_summary)
if stock_df['outlier_std'].any():
print("\n标准差方法检测到的异常值:")
print(stock_df[stock_df['outlier_std']][['close', 'outlier_std']])
8.5 重采样与频率转换
8.5.1 下采样(降频)
print("\n=== 下采样(降频)===")
# 1. 基本下采样
print("1. 基本下采样:")
# 日数据转周数据
weekly_data = stock_df.resample('W').agg({
'open': 'first',
'high': 'max',
'low': 'min',
'close': 'last',
'volume': 'sum'
})
print("周数据:")
print(weekly_data.head())
# 日数据转月数据
monthly_data = stock_df.resample('M').agg({
'open': 'first',
'high': 'max',
'low': 'min',
'close': 'last',
'volume': 'mean',
'daily_return': 'mean'
})
print("\n月数据:")
print(monthly_data)
# 2. 不同聚合函数
print("\n2. 不同聚合函数:")
# 多种聚合方式
price_summary = stock_df['close'].resample('W').agg([
'mean', # 平均值
'median', # 中位数
'std', # 标准差
'min', # 最小值
'max', # 最大值
'count' # 计数
])
print("价格周汇总:")
print(price_summary.head())
# 3. 自定义聚合函数
print("\n3. 自定义聚合函数:")
def price_range(series):
"""计算价格区间"""
return series.max() - series.min()
def volatility(series):
"""计算波动率"""
return series.std() / series.mean() if series.mean() != 0 else 0
custom_agg = stock_df['close'].resample('W').agg({
'range': price_range,
'volatility': volatility,
'first': 'first',
'last': 'last'
})
print("自定义聚合:")
print(custom_agg.head())
# 4. 分组下采样
print("\n4. 分组下采样:")
# 按月份和星期几分组
monthly_weekday = stock_df.groupby([
stock_df.index.month,
stock_df.index.dayofweek
])['close'].mean().unstack()
print("月份-星期几平均价格:")
print(monthly_weekday)
8.5.2 上采样(升频)
print("\n=== 上采样(升频)===")
# 1. 基本上采样
print("1. 基本上采样:")
# 创建低频数据
low_freq_dates = pd.date_range('2024-01-01', periods=10, freq='W')
low_freq_data = pd.DataFrame({
'value': np.random.randn(10).cumsum()
}, index=low_freq_dates)
print("原始周数据:")
print(low_freq_data)
# 上采样到日频率
daily_upsampled = low_freq_data.resample('D').asfreq()
print("\n上采样到日频率(不填充):")
print(daily_upsampled.head(10))
# 2. 插值方法
print("\n2. 插值方法:")
# 前向填充
forward_fill = low_freq_data.resample('D').ffill()
print("前向填充:")
print(forward_fill.head(10))
# 后向填充
backward_fill = low_freq_data.resample('D').bfill()
print("\n后向填充:")
print(backward_fill.head(10))
# 线性插值
linear_interp = low_freq_data.resample('D').interpolate(method='linear')
print("\n线性插值:")
print(linear_interp.head(10))
# 3. 高级插值方法
print("\n3. 高级插值方法:")
# 样条插值
spline_interp = low_freq_data.resample('D').interpolate(method='spline', order=2)
# 时间插值
time_interp = low_freq_data.resample('D').interpolate(method='time')
# 多项式插值
poly_interp = low_freq_data.resample('D').interpolate(method='polynomial', order=2)
interp_comparison = pd.DataFrame({
'original': low_freq_data.resample('D').asfreq()['value'],
'linear': linear_interp['value'],
'spline': spline_interp['value'],
'time': time_interp['value'],
'polynomial': poly_interp['value']
})
print("插值方法对比:")
print(interp_comparison.head(15))
# 4. 限制插值
print("\n4. 限制插值:")
# 限制插值的最大间隔
limited_interp = low_freq_data.resample('D').interpolate(method='linear', limit=3)
print("限制插值(最多3天):")
print(limited_interp.head(15))
# 只在特定方向插值
forward_limited = low_freq_data.resample('D').interpolate(method='linear', limit_direction='forward')
print("\n只向前插值:")
print(forward_limited.head(15))
8.5.3 重采样高级技巧
print("\n=== 重采样高级技巧 ===")
# 1. 标签和闭合规则
print("1. 标签和闭合规则:")
# 不同的标签位置
left_label = stock_df['close'].resample('W', label='left').mean()
right_label = stock_df['close'].resample('W', label='right').mean()
print("左标签(周开始):")
print(left_label.head())
print("\n右标签(周结束):")
print(right_label.head())
# 不同的闭合规则
left_closed = stock_df['close'].resample('W', closed='left').mean()
right_closed = stock_df['close'].resample('W', closed='right').mean()
print("\n左闭合:")
print(left_closed.head())
print("\n右闭合:")
print(right_closed.head())
# 2. 偏移重采样
print("\n2. 偏移重采样:")
# 使用偏移量
offset_resample = stock_df['close'].resample('W', loffset=pd.Timedelta(days=2)).mean()
print("偏移2天的周重采样:")
print(offset_resample.head())
# 3. 分组重采样
print("\n3. 分组重采样:")
# 按月份分组,然后重采样
monthly_groups = stock_df.groupby(stock_df.index.month)
monthly_weekly_avg = monthly_groups.resample('W')['close'].mean()
print("按月分组的周平均:")
print(monthly_weekly_avg.head(10))
# 4. 多列重采样
print("\n4. 多列重采样:")
# 对不同列使用不同的聚合函数
multi_col_resample = stock_df.resample('W').agg({
'open': 'first',
'high': 'max',
'low': 'min',
'close': 'last',
'volume': ['sum', 'mean'],
'daily_return': ['mean', 'std']
})
print("多列重采样:")
print(multi_col_resample.head())
# 5. 条件重采样
print("\n5. 条件重采样:")
# 只对交易量大于平均值的数据重采样
high_volume_mask = stock_df['volume'] > stock_df['volume'].mean()
high_volume_weekly = stock_df[high_volume_mask].resample('W')['close'].mean()
print("高交易量周平均价格:")
print(high_volume_weekly.head())
# 6. 重采样后的数据处理
print("\n6. 重采样后的数据处理:")
# 重采样后计算变化率
weekly_prices = stock_df['close'].resample('W').last()
weekly_returns = weekly_prices.pct_change()
print("周收益率:")
print(weekly_returns.head())
# 重采样后的滚动统计
weekly_ma = weekly_prices.rolling(window=4).mean() # 4周移动平均
print("\n4周移动平均:")
print(weekly_ma.head(10))
8.6 滚动窗口分析
8.6.1 移动统计
print("\n=== 移动统计 ===")
# 1. 基本移动统计
print("1. 基本移动统计:")
# 移动平均
stock_df['sma_5'] = stock_df['close'].rolling(window=5).mean()
stock_df['sma_10'] = stock_df['close'].rolling(window=10).mean()
stock_df['sma_20'] = stock_df['close'].rolling(window=20).mean()
# 移动标准差
stock_df['rolling_std'] = stock_df['close'].rolling(window=10).std()
# 移动方差
stock_df['rolling_var'] = stock_df['close'].rolling(window=10).var()
# 移动最大最小值
stock_df['rolling_max'] = stock_df['close'].rolling(window=10).max()
stock_df['rolling_min'] = stock_df['close'].rolling(window=10).min()
print("移动统计指标:")
print(stock_df[['close', 'sma_5', 'sma_10', 'sma_20', 'rolling_std']].tail(10))
# 2. 移动分位数
print("\n2. 移动分位数:")
# 移动中位数
stock_df['rolling_median'] = stock_df['close'].rolling(window=10).median()
# 移动分位数
stock_df['rolling_q25'] = stock_df['close'].rolling(window=10).quantile(0.25)
stock_df['rolling_q75'] = stock_df['close'].rolling(window=10).quantile(0.75)
print("移动分位数:")
print(stock_df[['close', 'rolling_median', 'rolling_q25', 'rolling_q75']].tail(10))
# 3. 移动相关性
print("\n3. 移动相关性:")
# 创建另一个时间序列
stock_df['price_2'] = stock_df['close'] + np.random.randn(len(stock_df)) * 2
# 移动相关系数
stock_df['rolling_corr'] = stock_df['close'].rolling(window=20).corr(stock_df['price_2'])
print("移动相关性:")
print(stock_df[['close', 'price_2', 'rolling_corr']].tail(10))
# 4. 自定义移动函数
print("\n4. 自定义移动函数:")
def rolling_sharpe_ratio(returns, window=20, risk_free_rate=0.02):
"""计算滚动夏普比率"""
rolling_mean = returns.rolling(window=window).mean()
rolling_std = returns.rolling(window=window).std()
annual_return = rolling_mean * 252 # 假设252个交易日
annual_std = rolling_std * np.sqrt(252)
sharpe = (annual_return - risk_free_rate) / annual_std
return sharpe
def rolling_max_drawdown(prices, window=20):
"""计算滚动最大回撤"""
rolling_max = prices.rolling(window=window).max()
drawdown = (prices - rolling_max) / rolling_max
return drawdown
# 应用自定义函数
stock_df['rolling_sharpe'] = rolling_sharpe_ratio(stock_df['daily_return'])
stock_df['rolling_drawdown'] = rolling_max_drawdown(stock_df['close'])
print("自定义移动指标:")
print(stock_df[['close', 'daily_return', 'rolling_sharpe', 'rolling_drawdown']].tail(10))
8.6.2 指数加权移动平均
print("\n=== 指数加权移动平均 ===")
# 1. 基本指数加权统计
print("1. 基本指数加权统计:")
# 指数加权移动平均
stock_df['ewm_12'] = stock_df['close'].ewm(span=12).mean()
stock_df['ewm_26'] = stock_df['close'].ewm(span=26).mean()
# 指数加权标准差
stock_df['ewm_std'] = stock_df['close'].ewm(span=20).std()
# 指数加权方差
stock_df['ewm_var'] = stock_df['close'].ewm(span=20).var()
print("指数加权统计:")
print(stock_df[['close', 'ewm_12', 'ewm_26', 'ewm_std']].tail(10))
# 2. 不同的衰减参数
print("\n2. 不同的衰减参数:")
# 使用alpha参数
stock_df['ewm_alpha_01'] = stock_df['close'].ewm(alpha=0.1).mean()
stock_df['ewm_alpha_03'] = stock_df['close'].ewm(alpha=0.3).mean()
# 使用com参数(质心)
stock_df['ewm_com_10'] = stock_df['close'].ewm(com=10).mean()
# 使用halflife参数
stock_df['ewm_halflife_10'] = stock_df['close'].ewm(halflife=10).mean()
print("不同衰减参数的EWM:")
print(stock_df[['close', 'ewm_alpha_01', 'ewm_alpha_03', 'ewm_com_10', 'ewm_halflife_10']].tail(10))
# 3. 指数加权相关性
print("\n3. 指数加权相关性:")
# 指数加权相关系数
stock_df['ewm_corr'] = stock_df['close'].ewm(span=20).corr(stock_df['volume'])
print("指数加权相关性:")
print(stock_df[['close', 'volume', 'ewm_corr']].tail(10))
# 4. EWMA在风险管理中的应用
print("\n4. EWMA在风险管理中的应用:")
# 计算EWMA波动率
stock_df['ewm_volatility'] = stock_df['daily_return'].ewm(span=30).std() * np.sqrt(252)
# 计算VaR (Value at Risk)
def calculate_var(returns, confidence_level=0.05):
"""计算VaR"""
return returns.quantile(confidence_level)
stock_df['rolling_var_5'] = stock_df['daily_return'].rolling(window=30).apply(
lambda x: calculate_var(x, 0.05)
)
print("风险指标:")
print(stock_df[['daily_return', 'ewm_volatility', 'rolling_var_5']].tail(10))
# 5. 自适应指数加权
print("\n5. 自适应指数加权:")
def adaptive_ewm(series, lookback_period=20):
"""自适应指数加权移动平均"""
# 根据波动率调整衰减因子
volatility = series.rolling(window=lookback_period).std()
normalized_vol = volatility / volatility.rolling(window=lookback_period).mean()
# 高波动率时使用更快的衰减
adaptive_alpha = 0.1 + 0.2 * normalized_vol
adaptive_alpha = adaptive_alpha.clip(0.05, 0.5) # 限制范围
result = pd.Series(index=series.index, dtype=float)
result.iloc[0] = series.iloc[0]
for i in range(1, len(series)):
if pd.notna(adaptive_alpha.iloc[i]):
alpha = adaptive_alpha.iloc[i]
result.iloc[i] = alpha * series.iloc[i] + (1 - alpha) * result.iloc[i-1]
else:
result.iloc[i] = result.iloc[i-1]
return result
stock_df['adaptive_ewm'] = adaptive_ewm(stock_df['close'])
print("自适应EWM:")
print(stock_df[['close', 'ewm_12', 'adaptive_ewm']].tail(10))
8.6.3 滚动窗口高级应用
print("\n=== 滚动窗口高级应用 ===")
# 1. 多变量滚动分析
print("1. 多变量滚动分析:")
# 滚动协方差矩阵
def rolling_cov_matrix(df, window=20):
"""计算滚动协方差矩阵"""
numeric_cols = df.select_dtypes(include=[np.number]).columns
rolling_cov = df[numeric_cols].rolling(window=window).cov()
return rolling_cov
# 选择数值列进行分析
numeric_data = stock_df[['close', 'volume', 'daily_return']].dropna()
rolling_cov = rolling_cov_matrix(numeric_data, window=20)
print("滚动协方差矩阵示例:")
print(rolling_cov.tail(6)) # 显示最后两个时间点的协方差矩阵
# 2. 滚动回归分析
print("\n2. 滚动回归分析:")
def rolling_beta(y, x, window=20):
"""计算滚动beta系数"""
def calc_beta(y_window, x_window):
if len(y_window) < 2 or len(x_window) < 2:
return np.nan
covariance = np.cov(y_window, x_window)[0, 1]
variance = np.var(x_window)
return covariance / variance if variance != 0 else np.nan
result = pd.Series(index=y.index, dtype=float)
for i in range(window-1, len(y)):
y_window = y.iloc[i-window+1:i+1]
x_window = x.iloc[i-window+1:i+1]
result.iloc[i] = calc_beta(y_window, x_window)
return result
# 假设volume是市场因子,计算price相对于volume的beta
stock_df['rolling_beta'] = rolling_beta(stock_df['daily_return'],
stock_df['volume'].pct_change(),
window=20)
print("滚动Beta系数:")
print(stock_df[['daily_return', 'rolling_beta']].tail(10))
# 3. 滚动异常检测
print("\n3. 滚动异常检测:")
def rolling_z_score(series, window=20):
"""计算滚动Z分数"""
rolling_mean = series.rolling(window=window).mean()
rolling_std = series.rolling(window=window).std()
z_score = (series - rolling_mean) / rolling_std
return z_score
def rolling_anomaly_detection(series, window=20, threshold=2):
"""滚动异常检测"""
z_scores = rolling_z_score(series, window)
anomalies = np.abs(z_scores) > threshold
return anomalies, z_scores
stock_df['z_score'], _ = rolling_anomaly_detection(stock_df['close'], window=20, threshold=2)
stock_df['is_anomaly'], stock_df['z_score_value'] = rolling_anomaly_detection(stock_df['close'], window=20, threshold=2)
print("异常检测结果:")
anomaly_data = stock_df[stock_df['is_anomaly']][['close', 'z_score_value']]
if not anomaly_data.empty:
print(anomaly_data.head())
else:
print("未检测到异常值")
# 4. 滚动性能指标
print("\n4. 滚动性能指标:")
def rolling_information_ratio(returns, benchmark_returns, window=20):
"""计算滚动信息比率"""
excess_returns = returns - benchmark_returns
rolling_mean = excess_returns.rolling(window=window).mean()
rolling_std = excess_returns.rolling(window=window).std()
ir = rolling_mean / rolling_std
return ir
def rolling_sortino_ratio(returns, window=20, target_return=0):
"""计算滚动索提诺比率"""
excess_returns = returns - target_return
downside_returns = excess_returns.where(excess_returns < 0, 0)
rolling_mean = excess_returns.rolling(window=window).mean()
rolling_downside_std = downside_returns.rolling(window=window).std()
sortino = rolling_mean / rolling_downside_std
return sortino
# 创建基准收益(假设为0)
benchmark_returns = pd.Series(0, index=stock_df.index)
stock_df['rolling_ir'] = rolling_information_ratio(stock_df['daily_return'],
benchmark_returns,
window=20)
stock_df['rolling_sortino'] = rolling_sortino_ratio(stock_df['daily_return'],
window=20)
print("滚动性能指标:")
print(stock_df[['daily_return', 'rolling_ir', 'rolling_sortino']].tail(10))
# 5. 滚动预测
print("\n5. 滚动预测:")
def rolling_forecast(series, window=20, horizon=1):
"""简单的滚动预测(移动平均)"""
forecasts = pd.Series(index=series.index, dtype=float)
for i in range(window, len(series)):
# 使用过去window期的数据预测下一期
historical_data = series.iloc[i-window:i]
forecast = historical_data.mean() # 简单移动平均预测
if i + horizon - 1 < len(series):
forecasts.iloc[i + horizon - 1] = forecast
return forecasts
stock_df['price_forecast'] = rolling_forecast(stock_df['close'], window=10, horizon=1)
# 计算预测误差
stock_df['forecast_error'] = stock_df['close'] - stock_df['price_forecast']
stock_df['forecast_accuracy'] = 1 - np.abs(stock_df['forecast_error']) / stock_df['close']
print("滚动预测结果:")
forecast_results = stock_df[['close', 'price_forecast', 'forecast_error', 'forecast_accuracy']].dropna()
print(forecast_results.tail(10))
print(f"\n平均预测准确率: {stock_df['forecast_accuracy'].mean():.4f}")
8.7 本章小结
8.7.1 核心知识点
日期时间基础
- Python datetime对象的使用
- Pandas Timestamp和时区处理
- 时间格式转换和解析
时间序列索引
- DatetimeIndex的创建和操作
- PeriodIndex和TimedeltaIndex
- 时间频率和偏移量
时间序列操作
- 时间选择和切片技巧
- 时间偏移和滞后特征
- 时间序列运算和技术指标
重采样技术
- 下采样和上采样
- 插值方法和策略
- 重采样的高级参数
滚动窗口分析
- 移动统计和技术指标
- 指数加权移动平均
- 滚动相关性和回归分析
8.7.2 最佳实践
- 时区管理:始终明确时区设置,避免时区混乱
- 频率选择:根据数据特点选择合适的时间频率
- 插值策略:根据数据性质选择合适的插值方法
- 窗口大小:平衡响应速度和稳定性选择窗口大小
- 性能优化:对大数据集使用向量化操作
8.7.3 常见陷阱
- 时区不一致导致的时间错位
- 重采样时的边界处理问题
- 滚动窗口的前向偏差
- 缺失值对统计计算的影响
- 频率转换时的数据丢失
8.7.4 下一步学习
在下一章中,我们将学习: - 数据可视化的高级技巧 - 统计分析和假设检验 - 时间序列的可视化方法 - 交互式图表的创建
练习题
- 创建一个完整的股票技术分析系统
- 实现一个时间序列异常检测算法
- 构建一个多频率数据融合系统
- 设计一个滚动风险管理模型
- 开发一个时间序列预测评估框架
记住:时间序列分析是数据科学的重要分支,掌握这些技能将大大提升你的数据分析能力!