8.1 章节概述

时间序列分析是Pandas的核心功能之一,广泛应用于金融、经济、气象、物联网等领域。本章将深入探讨Pandas中时间序列数据的处理方法,包括日期时间索引、时间序列操作、重采样、滚动窗口分析等高级技术。

graph TD
    A[时间序列分析] --> B[日期时间基础]
    A --> C[时间序列索引]
    A --> D[时间序列操作]
    A --> E[重采样与频率转换]
    A --> F[滚动窗口分析]
    A --> G[时间序列可视化]
    
    B --> B1[datetime对象]
    B --> B2[时间戳与周期]
    B --> B3[时区处理]
    
    C --> C1[DatetimeIndex]
    C --> C2[PeriodIndex]
    C --> C3[TimedeltaIndex]
    
    D --> D1[时间选择与切片]
    D --> D2[时间偏移]
    D --> D3[时间序列运算]
    
    E --> E1[上采样与下采样]
    E --> E2[聚合函数]
    E --> E3[插值方法]
    
    F --> F1[移动平均]
    F --> F2[滚动统计]
    F --> F3[指数加权]
    
    G --> G1[趋势分析]
    G --> G2[季节性分析]
    G --> G3[异常检测]

8.1.1 学习目标

  • 掌握Pandas中日期时间数据的处理方法
  • 理解时间序列索引的创建和使用
  • 学会时间序列数据的选择、切片和操作
  • 掌握重采样和频率转换技术
  • 学习滚动窗口分析和移动统计
  • 了解时间序列数据的可视化方法

8.1.2 应用场景

  • 金融分析:股价分析、风险管理、投资组合优化
  • 经济预测:GDP预测、通胀分析、经济指标监控
  • 运营分析:销售趋势、用户行为、业务指标监控
  • 物联网:传感器数据分析、设备监控、预测性维护
  • 气象分析:天气预报、气候变化、环境监测

8.2 日期时间基础

8.2.1 Python datetime基础

import pandas as pd
import numpy as np
from datetime import datetime, date, time, timedelta
import matplotlib.pyplot as plt
import seaborn as sns

# 设置中文字体和样式
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
sns.set_style("whitegrid")

print("=== Python datetime基础 ===")

# 1. 创建datetime对象
print("1. 创建datetime对象:")
dt1 = datetime(2024, 1, 15, 10, 30, 45)
dt2 = datetime.now()
dt3 = datetime.today()

print(f"指定日期时间:{dt1}")
print(f"当前时间:{dt2}")
print(f"今天日期:{dt3}")

# 2. 日期时间组件
print("\n2. 日期时间组件:")
print(f"年份:{dt1.year}")
print(f"月份:{dt1.month}")
print(f"日期:{dt1.day}")
print(f"小时:{dt1.hour}")
print(f"分钟:{dt1.minute}")
print(f"秒数:{dt1.second}")
print(f"星期几:{dt1.weekday()} (0=周一)")
print(f"一年中第几天:{dt1.timetuple().tm_yday}")

# 3. 时间差计算
print("\n3. 时间差计算:")
td = timedelta(days=30, hours=12, minutes=30)
future_date = dt1 + td
past_date = dt1 - td

print(f"原始日期:{dt1}")
print(f"30天12小时30分钟后:{future_date}")
print(f"30天12小时30分钟前:{past_date}")

# 4. 字符串转换
print("\n4. 字符串转换:")
date_str = "2024-01-15 10:30:45"
parsed_date = datetime.strptime(date_str, "%Y-%m-%d %H:%M:%S")
formatted_str = dt1.strftime("%Y年%m月%d日 %H:%M:%S")

print(f"字符串转日期:{parsed_date}")
print(f"日期转字符串:{formatted_str}")

8.2.2 Pandas时间戳

print("\n=== Pandas时间戳 ===")

# 1. 创建Timestamp
print("1. 创建Timestamp:")
ts1 = pd.Timestamp('2024-01-15')
ts2 = pd.Timestamp('2024-01-15 10:30:45')
ts3 = pd.Timestamp(2024, 1, 15, 10, 30, 45)
ts4 = pd.Timestamp.now()

print(f"日期时间戳:{ts1}")
print(f"完整时间戳:{ts2}")
print(f"参数构造:{ts3}")
print(f"当前时间戳:{ts4}")

# 2. Timestamp属性
print("\n2. Timestamp属性:")
print(f"年份:{ts2.year}")
print(f"季度:{ts2.quarter}")
print(f"月份:{ts2.month}")
print(f"周数:{ts2.week}")
print(f"星期几:{ts2.dayofweek}")
print(f"是否月末:{ts2.is_month_end}")
print(f"是否年末:{ts2.is_year_end}")

# 3. 时间戳运算
print("\n3. 时间戳运算:")
ts_future = ts2 + pd.Timedelta(days=30)
ts_past = ts2 - pd.Timedelta(weeks=2)

print(f"原始时间:{ts2}")
print(f"30天后:{ts_future}")
print(f"2周前:{ts_past}")

# 4. 时间戳比较
print("\n4. 时间戳比较:")
ts_list = [
    pd.Timestamp('2024-01-15'),
    pd.Timestamp('2024-02-15'),
    pd.Timestamp('2024-01-10')
]

print(f"时间戳列表:{ts_list}")
print(f"最早时间:{min(ts_list)}")
print(f"最晚时间:{max(ts_list)}")
print(f"排序结果:{sorted(ts_list)}")

8.2.3 时区处理

print("\n=== 时区处理 ===")

# 1. 创建带时区的时间戳
print("1. 创建带时区的时间戳:")
ts_utc = pd.Timestamp('2024-01-15 10:30:45', tz='UTC')
ts_beijing = pd.Timestamp('2024-01-15 10:30:45', tz='Asia/Shanghai')
ts_ny = pd.Timestamp('2024-01-15 10:30:45', tz='America/New_York')

print(f"UTC时间:{ts_utc}")
print(f"北京时间:{ts_beijing}")
print(f"纽约时间:{ts_ny}")

# 2. 时区转换
print("\n2. 时区转换:")
ts_local = pd.Timestamp('2024-01-15 10:30:45')
ts_with_tz = ts_local.tz_localize('Asia/Shanghai')
ts_converted = ts_with_tz.tz_convert('UTC')

print(f"本地时间:{ts_local}")
print(f"添加时区:{ts_with_tz}")
print(f"转换时区:{ts_converted}")

# 3. 常用时区
print("\n3. 常用时区:")
common_timezones = [
    'UTC',
    'Asia/Shanghai',
    'Asia/Tokyo',
    'Europe/London',
    'America/New_York',
    'America/Los_Angeles'
]

base_time = pd.Timestamp('2024-01-15 12:00:00', tz='UTC')
for tz in common_timezones:
    converted = base_time.tz_convert(tz)
    print(f"{tz:20}: {converted}")

8.3 时间序列索引

8.3.1 DatetimeIndex

print("\n=== DatetimeIndex ===")

# 1. 创建DatetimeIndex
print("1. 创建DatetimeIndex:")

# 方法1:从字符串列表创建
date_strings = ['2024-01-01', '2024-01-02', '2024-01-03', '2024-01-04']
dt_index1 = pd.DatetimeIndex(date_strings)

# 方法2:使用date_range
dt_index2 = pd.date_range('2024-01-01', periods=10, freq='D')
dt_index3 = pd.date_range('2024-01-01', '2024-01-31', freq='D')

print(f"从字符串创建:{dt_index1}")
print(f"指定周期:{dt_index2}")
print(f"指定范围:{dt_index3[:5]}...")  # 只显示前5个

# 3. 不同频率的时间索引
print("\n2. 不同频率的时间索引:")
frequencies = {
    'D': '每日',
    'W': '每周',
    'M': '每月末',
    'MS': '每月初',
    'Q': '每季度末',
    'QS': '每季度初',
    'Y': '每年末',
    'YS': '每年初',
    'H': '每小时',
    'T': '每分钟',
    'S': '每秒'
}

for freq, desc in frequencies.items():
    if freq in ['D', 'W', 'M', 'Q', 'Y']:
        idx = pd.date_range('2024-01-01', periods=5, freq=freq)
        print(f"{desc:8} ({freq:2}): {idx[0]} 到 {idx[-1]}")

# 4. 自定义频率
print("\n3. 自定义频率:")
custom_freqs = {
    '2D': '每2天',
    '3H': '每3小时',
    '15T': '每15分钟',
    'W-MON': '每周一',
    'BM': '每月最后一个工作日',
    'BQ': '每季度最后一个工作日'
}

for freq, desc in custom_freqs.items():
    idx = pd.date_range('2024-01-01', periods=3, freq=freq)
    print(f"{desc:15} ({freq:6}): {list(idx)}")

8.3.2 时间序列DataFrame

print("\n=== 时间序列DataFrame ===")

# 1. 创建时间序列数据
print("1. 创建时间序列数据:")
dates = pd.date_range('2024-01-01', periods=100, freq='D')
np.random.seed(42)

# 模拟股价数据
price_data = {
    'open': 100 + np.cumsum(np.random.randn(100) * 0.5),
    'high': 100 + np.cumsum(np.random.randn(100) * 0.5) + np.random.rand(100) * 2,
    'low': 100 + np.cumsum(np.random.randn(100) * 0.5) - np.random.rand(100) * 2,
    'close': 100 + np.cumsum(np.random.randn(100) * 0.5),
    'volume': np.random.randint(1000, 10000, 100)
}

# 确保high >= max(open, close), low <= min(open, close)
for i in range(100):
    price_data['high'][i] = max(price_data['high'][i], 
                               price_data['open'][i], 
                               price_data['close'][i])
    price_data['low'][i] = min(price_data['low'][i], 
                              price_data['open'][i], 
                              price_data['close'][i])

stock_df = pd.DataFrame(price_data, index=dates)
print("股价数据示例:")
print(stock_df.head())

# 2. 时间序列基本信息
print("\n2. 时间序列基本信息:")
print(f"数据形状:{stock_df.shape}")
print(f"时间范围:{stock_df.index.min()} 到 {stock_df.index.max()}")
print(f"频率:{stock_df.index.freq}")
print(f"时间跨度:{stock_df.index.max() - stock_df.index.min()}")

# 3. 时间索引属性
print("\n3. 时间索引属性:")
print(f"年份范围:{stock_df.index.year.unique()}")
print(f"月份范围:{stock_df.index.month.unique()}")
print(f"星期分布:{stock_df.index.dayofweek.value_counts().sort_index()}")

# 4. 添加时间特征
print("\n4. 添加时间特征:")
stock_df['year'] = stock_df.index.year
stock_df['month'] = stock_df.index.month
stock_df['quarter'] = stock_df.index.quarter
stock_df['dayofweek'] = stock_df.index.dayofweek
stock_df['is_month_end'] = stock_df.index.is_month_end

print("添加时间特征后:")
print(stock_df[['close', 'year', 'month', 'quarter', 'dayofweek', 'is_month_end']].head())

8.3.3 PeriodIndex

print("\n=== PeriodIndex ===")

# 1. 创建PeriodIndex
print("1. 创建PeriodIndex:")
period_index1 = pd.period_range('2024-01', periods=12, freq='M')
period_index2 = pd.period_range('2024Q1', periods=4, freq='Q')
period_index3 = pd.period_range('2024', periods=5, freq='Y')

print(f"月度周期:{period_index1}")
print(f"季度周期:{period_index2}")
print(f"年度周期:{period_index3}")

# 2. Period对象属性
print("\n2. Period对象属性:")
period = pd.Period('2024-01', freq='M')
print(f"周期:{period}")
print(f"开始时间:{period.start_time}")
print(f"结束时间:{period.end_time}")
print(f"天数:{period.days_in_month}")

# 3. Period与Timestamp转换
print("\n3. Period与Timestamp转换:")
timestamp_index = pd.date_range('2024-01-01', periods=12, freq='M')
period_from_timestamp = timestamp_index.to_period('M')
timestamp_from_period = period_index1.to_timestamp()

print(f"Timestamp索引:{timestamp_index[:3]}")
print(f"转换为Period:{period_from_timestamp[:3]}")
print(f"Period转Timestamp:{timestamp_from_period[:3]}")

# 4. 使用PeriodIndex的DataFrame
print("\n4. 使用PeriodIndex的DataFrame:")
monthly_data = pd.DataFrame({
    'sales': np.random.randint(1000, 5000, 12),
    'profit': np.random.randint(100, 800, 12)
}, index=period_index1)

print("月度数据:")
print(monthly_data.head())

8.4 时间序列操作

8.4.1 时间选择与切片

print("\n=== 时间选择与切片 ===")

# 使用之前创建的stock_df
print("1. 基本时间选择:")

# 选择特定日期
specific_date = stock_df.loc['2024-01-15']
print(f"特定日期数据:\n{specific_date}")

# 选择日期范围
date_range_data = stock_df.loc['2024-01-10':'2024-01-20']
print(f"\n日期范围数据:\n{date_range_data.head()}")

# 选择特定月份
january_data = stock_df.loc['2024-01']
print(f"\n1月份数据形状:{january_data.shape}")

# 2. 高级时间选择
print("\n2. 高级时间选择:")

# 选择特定年份
year_2024 = stock_df.loc['2024']
print(f"2024年数据形状:{year_2024.shape}")

# 选择多个不连续日期
specific_dates = ['2024-01-01', '2024-01-15', '2024-02-01']
multi_dates_data = stock_df.loc[specific_dates]
print(f"多个日期数据:\n{multi_dates_data}")

# 3. 条件选择
print("\n3. 条件选择:")

# 选择周末数据
weekend_data = stock_df[stock_df.index.dayofweek >= 5]
print(f"周末数据形状:{weekend_data.shape}")

# 选择月末数据
month_end_data = stock_df[stock_df.index.is_month_end]
print(f"月末数据:\n{month_end_data}")

# 选择特定时间段的高价股票
high_price_jan = stock_df.loc['2024-01'][stock_df.loc['2024-01', 'close'] > 102]
print(f"1月高价数据形状:{high_price_jan.shape}")

# 4. 时间切片技巧
print("\n4. 时间切片技巧:")

# 最近N天
last_10_days = stock_df.tail(10)
print(f"最近10天数据:\n{last_10_days.index}")

# 每周第一天
weekly_first = stock_df.groupby(stock_df.index.to_period('W')).first()
print(f"每周第一天数据形状:{weekly_first.shape}")

# 每月最后一个交易日
monthly_last = stock_df.groupby(stock_df.index.to_period('M')).last()
print(f"每月最后交易日:\n{monthly_last.index}")

8.4.2 时间偏移

print("\n=== 时间偏移 ===")

# 1. 基本偏移操作
print("1. 基本偏移操作:")

# 数据向前偏移
shifted_forward = stock_df['close'].shift(1)  # 向前偏移1天
shifted_backward = stock_df['close'].shift(-1)  # 向后偏移1天

comparison_df = pd.DataFrame({
    'original': stock_df['close'],
    'shift_1': shifted_forward,
    'shift_-1': shifted_backward
})

print("偏移对比:")
print(comparison_df.head())

# 2. 计算收益率
print("\n2. 计算收益率:")

# 日收益率
stock_df['daily_return'] = stock_df['close'].pct_change()

# 对数收益率
stock_df['log_return'] = np.log(stock_df['close'] / stock_df['close'].shift(1))

# 累积收益率
stock_df['cumulative_return'] = (1 + stock_df['daily_return']).cumprod() - 1

print("收益率计算:")
print(stock_df[['close', 'daily_return', 'log_return', 'cumulative_return']].head(10))

# 3. 时间偏移与频率
print("\n3. 时间偏移与频率:")

# 使用不同频率的偏移
offsets = {
    'BDay': '工作日',
    'Week': '周',
    'MonthEnd': '月末',
    'QuarterEnd': '季度末'
}

base_date = pd.Timestamp('2024-01-15')
for offset_name, desc in offsets.items():
    offset = getattr(pd.offsets, offset_name)()
    new_date = base_date + offset
    print(f"{desc:8}: {base_date} + {offset_name} = {new_date}")

# 4. 自定义偏移
print("\n4. 自定义偏移:")

# 创建自定义工作日偏移
custom_offset = pd.offsets.CustomBusinessDay(weekmask='Mon Tue Wed Thu Fri')
holidays = pd.to_datetime(['2024-01-01', '2024-02-14'])  # 假设的节假日
holiday_offset = pd.offsets.CustomBusinessDay(holidays=holidays)

print(f"自定义工作日:{base_date + custom_offset}")
print(f"考虑节假日:{base_date + holiday_offset}")

# 5. 滞后特征创建
print("\n5. 滞后特征创建:")

# 创建多个滞后特征
for lag in [1, 3, 5, 10]:
    stock_df[f'close_lag_{lag}'] = stock_df['close'].shift(lag)
    stock_df[f'volume_lag_{lag}'] = stock_df['volume'].shift(lag)

lag_features = [col for col in stock_df.columns if 'lag' in col]
print(f"滞后特征:{lag_features}")
print(stock_df[['close'] + lag_features[:4]].head(12))

8.4.3 时间序列运算

print("\n=== 时间序列运算 ===")

# 1. 基本统计运算
print("1. 基本统计运算:")

# 移动平均
stock_df['ma_5'] = stock_df['close'].rolling(window=5).mean()
stock_df['ma_20'] = stock_df['close'].rolling(window=20).mean()

# 移动标准差
stock_df['std_5'] = stock_df['close'].rolling(window=5).std()

# 移动最大最小值
stock_df['max_5'] = stock_df['close'].rolling(window=5).max()
stock_df['min_5'] = stock_df['close'].rolling(window=5).min()

print("移动统计:")
print(stock_df[['close', 'ma_5', 'ma_20', 'std_5', 'max_5', 'min_5']].head(25))

# 2. 技术指标计算
print("\n2. 技术指标计算:")

# RSI (相对强弱指数)
def calculate_rsi(prices, window=14):
    delta = prices.diff()
    gain = (delta.where(delta > 0, 0)).rolling(window=window).mean()
    loss = (-delta.where(delta < 0, 0)).rolling(window=window).mean()
    rs = gain / loss
    rsi = 100 - (100 / (1 + rs))
    return rsi

stock_df['rsi'] = calculate_rsi(stock_df['close'])

# MACD (移动平均收敛散度)
def calculate_macd(prices, fast=12, slow=26, signal=9):
    ema_fast = prices.ewm(span=fast).mean()
    ema_slow = prices.ewm(span=slow).mean()
    macd = ema_fast - ema_slow
    signal_line = macd.ewm(span=signal).mean()
    histogram = macd - signal_line
    return macd, signal_line, histogram

stock_df['macd'], stock_df['macd_signal'], stock_df['macd_histogram'] = calculate_macd(stock_df['close'])

# 布林带
def calculate_bollinger_bands(prices, window=20, num_std=2):
    ma = prices.rolling(window=window).mean()
    std = prices.rolling(window=window).std()
    upper_band = ma + (std * num_std)
    lower_band = ma - (std * num_std)
    return upper_band, ma, lower_band

stock_df['bb_upper'], stock_df['bb_middle'], stock_df['bb_lower'] = calculate_bollinger_bands(stock_df['close'])

print("技术指标:")
print(stock_df[['close', 'rsi', 'macd', 'bb_upper', 'bb_lower']].tail(10))

# 3. 时间序列分解
print("\n3. 时间序列分解:")

# 创建带趋势和季节性的数据
dates_extended = pd.date_range('2023-01-01', periods=365, freq='D')
trend = np.linspace(100, 120, 365)
seasonal = 10 * np.sin(2 * np.pi * np.arange(365) / 365.25 * 4)  # 季节性
noise = np.random.normal(0, 2, 365)
ts_data = trend + seasonal + noise

ts_df = pd.DataFrame({'value': ts_data}, index=dates_extended)

# 简单的趋势提取
ts_df['trend'] = ts_df['value'].rolling(window=30, center=True).mean()
ts_df['detrended'] = ts_df['value'] - ts_df['trend']

print("时间序列分解:")
print(ts_df.head(10))

# 4. 异常值检测
print("\n4. 异常值检测:")

# 基于标准差的异常值检测
def detect_outliers_std(series, threshold=3):
    mean = series.mean()
    std = series.std()
    outliers = np.abs(series - mean) > threshold * std
    return outliers

# 基于IQR的异常值检测
def detect_outliers_iqr(series, factor=1.5):
    Q1 = series.quantile(0.25)
    Q3 = series.quantile(0.75)
    IQR = Q3 - Q1
    outliers = (series < Q1 - factor * IQR) | (series > Q3 + factor * IQR)
    return outliers

stock_df['outlier_std'] = detect_outliers_std(stock_df['close'])
stock_df['outlier_iqr'] = detect_outliers_iqr(stock_df['close'])

outlier_summary = pd.DataFrame({
    'std_method': stock_df['outlier_std'].sum(),
    'iqr_method': stock_df['outlier_iqr'].sum()
}, index=['outlier_count'])

print("异常值检测结果:")
print(outlier_summary)

if stock_df['outlier_std'].any():
    print("\n标准差方法检测到的异常值:")
    print(stock_df[stock_df['outlier_std']][['close', 'outlier_std']])

8.5 重采样与频率转换

8.5.1 下采样(降频)

print("\n=== 下采样(降频)===")

# 1. 基本下采样
print("1. 基本下采样:")

# 日数据转周数据
weekly_data = stock_df.resample('W').agg({
    'open': 'first',
    'high': 'max',
    'low': 'min',
    'close': 'last',
    'volume': 'sum'
})

print("周数据:")
print(weekly_data.head())

# 日数据转月数据
monthly_data = stock_df.resample('M').agg({
    'open': 'first',
    'high': 'max',
    'low': 'min',
    'close': 'last',
    'volume': 'mean',
    'daily_return': 'mean'
})

print("\n月数据:")
print(monthly_data)

# 2. 不同聚合函数
print("\n2. 不同聚合函数:")

# 多种聚合方式
price_summary = stock_df['close'].resample('W').agg([
    'mean',      # 平均值
    'median',    # 中位数
    'std',       # 标准差
    'min',       # 最小值
    'max',       # 最大值
    'count'      # 计数
])

print("价格周汇总:")
print(price_summary.head())

# 3. 自定义聚合函数
print("\n3. 自定义聚合函数:")

def price_range(series):
    """计算价格区间"""
    return series.max() - series.min()

def volatility(series):
    """计算波动率"""
    return series.std() / series.mean() if series.mean() != 0 else 0

custom_agg = stock_df['close'].resample('W').agg({
    'range': price_range,
    'volatility': volatility,
    'first': 'first',
    'last': 'last'
})

print("自定义聚合:")
print(custom_agg.head())

# 4. 分组下采样
print("\n4. 分组下采样:")

# 按月份和星期几分组
monthly_weekday = stock_df.groupby([
    stock_df.index.month,
    stock_df.index.dayofweek
])['close'].mean().unstack()

print("月份-星期几平均价格:")
print(monthly_weekday)

8.5.2 上采样(升频)

print("\n=== 上采样(升频)===")

# 1. 基本上采样
print("1. 基本上采样:")

# 创建低频数据
low_freq_dates = pd.date_range('2024-01-01', periods=10, freq='W')
low_freq_data = pd.DataFrame({
    'value': np.random.randn(10).cumsum()
}, index=low_freq_dates)

print("原始周数据:")
print(low_freq_data)

# 上采样到日频率
daily_upsampled = low_freq_data.resample('D').asfreq()
print("\n上采样到日频率(不填充):")
print(daily_upsampled.head(10))

# 2. 插值方法
print("\n2. 插值方法:")

# 前向填充
forward_fill = low_freq_data.resample('D').ffill()
print("前向填充:")
print(forward_fill.head(10))

# 后向填充
backward_fill = low_freq_data.resample('D').bfill()
print("\n后向填充:")
print(backward_fill.head(10))

# 线性插值
linear_interp = low_freq_data.resample('D').interpolate(method='linear')
print("\n线性插值:")
print(linear_interp.head(10))

# 3. 高级插值方法
print("\n3. 高级插值方法:")

# 样条插值
spline_interp = low_freq_data.resample('D').interpolate(method='spline', order=2)

# 时间插值
time_interp = low_freq_data.resample('D').interpolate(method='time')

# 多项式插值
poly_interp = low_freq_data.resample('D').interpolate(method='polynomial', order=2)

interp_comparison = pd.DataFrame({
    'original': low_freq_data.resample('D').asfreq()['value'],
    'linear': linear_interp['value'],
    'spline': spline_interp['value'],
    'time': time_interp['value'],
    'polynomial': poly_interp['value']
})

print("插值方法对比:")
print(interp_comparison.head(15))

# 4. 限制插值
print("\n4. 限制插值:")

# 限制插值的最大间隔
limited_interp = low_freq_data.resample('D').interpolate(method='linear', limit=3)
print("限制插值(最多3天):")
print(limited_interp.head(15))

# 只在特定方向插值
forward_limited = low_freq_data.resample('D').interpolate(method='linear', limit_direction='forward')
print("\n只向前插值:")
print(forward_limited.head(15))

8.5.3 重采样高级技巧

print("\n=== 重采样高级技巧 ===")

# 1. 标签和闭合规则
print("1. 标签和闭合规则:")

# 不同的标签位置
left_label = stock_df['close'].resample('W', label='left').mean()
right_label = stock_df['close'].resample('W', label='right').mean()

print("左标签(周开始):")
print(left_label.head())
print("\n右标签(周结束):")
print(right_label.head())

# 不同的闭合规则
left_closed = stock_df['close'].resample('W', closed='left').mean()
right_closed = stock_df['close'].resample('W', closed='right').mean()

print("\n左闭合:")
print(left_closed.head())
print("\n右闭合:")
print(right_closed.head())

# 2. 偏移重采样
print("\n2. 偏移重采样:")

# 使用偏移量
offset_resample = stock_df['close'].resample('W', loffset=pd.Timedelta(days=2)).mean()
print("偏移2天的周重采样:")
print(offset_resample.head())

# 3. 分组重采样
print("\n3. 分组重采样:")

# 按月份分组,然后重采样
monthly_groups = stock_df.groupby(stock_df.index.month)
monthly_weekly_avg = monthly_groups.resample('W')['close'].mean()

print("按月分组的周平均:")
print(monthly_weekly_avg.head(10))

# 4. 多列重采样
print("\n4. 多列重采样:")

# 对不同列使用不同的聚合函数
multi_col_resample = stock_df.resample('W').agg({
    'open': 'first',
    'high': 'max',
    'low': 'min',
    'close': 'last',
    'volume': ['sum', 'mean'],
    'daily_return': ['mean', 'std']
})

print("多列重采样:")
print(multi_col_resample.head())

# 5. 条件重采样
print("\n5. 条件重采样:")

# 只对交易量大于平均值的数据重采样
high_volume_mask = stock_df['volume'] > stock_df['volume'].mean()
high_volume_weekly = stock_df[high_volume_mask].resample('W')['close'].mean()

print("高交易量周平均价格:")
print(high_volume_weekly.head())

# 6. 重采样后的数据处理
print("\n6. 重采样后的数据处理:")

# 重采样后计算变化率
weekly_prices = stock_df['close'].resample('W').last()
weekly_returns = weekly_prices.pct_change()

print("周收益率:")
print(weekly_returns.head())

# 重采样后的滚动统计
weekly_ma = weekly_prices.rolling(window=4).mean()  # 4周移动平均

print("\n4周移动平均:")
print(weekly_ma.head(10))

8.6 滚动窗口分析

8.6.1 移动统计

print("\n=== 移动统计 ===")

# 1. 基本移动统计
print("1. 基本移动统计:")

# 移动平均
stock_df['sma_5'] = stock_df['close'].rolling(window=5).mean()
stock_df['sma_10'] = stock_df['close'].rolling(window=10).mean()
stock_df['sma_20'] = stock_df['close'].rolling(window=20).mean()

# 移动标准差
stock_df['rolling_std'] = stock_df['close'].rolling(window=10).std()

# 移动方差
stock_df['rolling_var'] = stock_df['close'].rolling(window=10).var()

# 移动最大最小值
stock_df['rolling_max'] = stock_df['close'].rolling(window=10).max()
stock_df['rolling_min'] = stock_df['close'].rolling(window=10).min()

print("移动统计指标:")
print(stock_df[['close', 'sma_5', 'sma_10', 'sma_20', 'rolling_std']].tail(10))

# 2. 移动分位数
print("\n2. 移动分位数:")

# 移动中位数
stock_df['rolling_median'] = stock_df['close'].rolling(window=10).median()

# 移动分位数
stock_df['rolling_q25'] = stock_df['close'].rolling(window=10).quantile(0.25)
stock_df['rolling_q75'] = stock_df['close'].rolling(window=10).quantile(0.75)

print("移动分位数:")
print(stock_df[['close', 'rolling_median', 'rolling_q25', 'rolling_q75']].tail(10))

# 3. 移动相关性
print("\n3. 移动相关性:")

# 创建另一个时间序列
stock_df['price_2'] = stock_df['close'] + np.random.randn(len(stock_df)) * 2

# 移动相关系数
stock_df['rolling_corr'] = stock_df['close'].rolling(window=20).corr(stock_df['price_2'])

print("移动相关性:")
print(stock_df[['close', 'price_2', 'rolling_corr']].tail(10))

# 4. 自定义移动函数
print("\n4. 自定义移动函数:")

def rolling_sharpe_ratio(returns, window=20, risk_free_rate=0.02):
    """计算滚动夏普比率"""
    rolling_mean = returns.rolling(window=window).mean()
    rolling_std = returns.rolling(window=window).std()
    annual_return = rolling_mean * 252  # 假设252个交易日
    annual_std = rolling_std * np.sqrt(252)
    sharpe = (annual_return - risk_free_rate) / annual_std
    return sharpe

def rolling_max_drawdown(prices, window=20):
    """计算滚动最大回撤"""
    rolling_max = prices.rolling(window=window).max()
    drawdown = (prices - rolling_max) / rolling_max
    return drawdown

# 应用自定义函数
stock_df['rolling_sharpe'] = rolling_sharpe_ratio(stock_df['daily_return'])
stock_df['rolling_drawdown'] = rolling_max_drawdown(stock_df['close'])

print("自定义移动指标:")
print(stock_df[['close', 'daily_return', 'rolling_sharpe', 'rolling_drawdown']].tail(10))

8.6.2 指数加权移动平均

print("\n=== 指数加权移动平均 ===")

# 1. 基本指数加权统计
print("1. 基本指数加权统计:")

# 指数加权移动平均
stock_df['ewm_12'] = stock_df['close'].ewm(span=12).mean()
stock_df['ewm_26'] = stock_df['close'].ewm(span=26).mean()

# 指数加权标准差
stock_df['ewm_std'] = stock_df['close'].ewm(span=20).std()

# 指数加权方差
stock_df['ewm_var'] = stock_df['close'].ewm(span=20).var()

print("指数加权统计:")
print(stock_df[['close', 'ewm_12', 'ewm_26', 'ewm_std']].tail(10))

# 2. 不同的衰减参数
print("\n2. 不同的衰减参数:")

# 使用alpha参数
stock_df['ewm_alpha_01'] = stock_df['close'].ewm(alpha=0.1).mean()
stock_df['ewm_alpha_03'] = stock_df['close'].ewm(alpha=0.3).mean()

# 使用com参数(质心)
stock_df['ewm_com_10'] = stock_df['close'].ewm(com=10).mean()

# 使用halflife参数
stock_df['ewm_halflife_10'] = stock_df['close'].ewm(halflife=10).mean()

print("不同衰减参数的EWM:")
print(stock_df[['close', 'ewm_alpha_01', 'ewm_alpha_03', 'ewm_com_10', 'ewm_halflife_10']].tail(10))

# 3. 指数加权相关性
print("\n3. 指数加权相关性:")

# 指数加权相关系数
stock_df['ewm_corr'] = stock_df['close'].ewm(span=20).corr(stock_df['volume'])

print("指数加权相关性:")
print(stock_df[['close', 'volume', 'ewm_corr']].tail(10))

# 4. EWMA在风险管理中的应用
print("\n4. EWMA在风险管理中的应用:")

# 计算EWMA波动率
stock_df['ewm_volatility'] = stock_df['daily_return'].ewm(span=30).std() * np.sqrt(252)

# 计算VaR (Value at Risk)
def calculate_var(returns, confidence_level=0.05):
    """计算VaR"""
    return returns.quantile(confidence_level)

stock_df['rolling_var_5'] = stock_df['daily_return'].rolling(window=30).apply(
    lambda x: calculate_var(x, 0.05)
)

print("风险指标:")
print(stock_df[['daily_return', 'ewm_volatility', 'rolling_var_5']].tail(10))

# 5. 自适应指数加权
print("\n5. 自适应指数加权:")

def adaptive_ewm(series, lookback_period=20):
    """自适应指数加权移动平均"""
    # 根据波动率调整衰减因子
    volatility = series.rolling(window=lookback_period).std()
    normalized_vol = volatility / volatility.rolling(window=lookback_period).mean()
    
    # 高波动率时使用更快的衰减
    adaptive_alpha = 0.1 + 0.2 * normalized_vol
    adaptive_alpha = adaptive_alpha.clip(0.05, 0.5)  # 限制范围
    
    result = pd.Series(index=series.index, dtype=float)
    result.iloc[0] = series.iloc[0]
    
    for i in range(1, len(series)):
        if pd.notna(adaptive_alpha.iloc[i]):
            alpha = adaptive_alpha.iloc[i]
            result.iloc[i] = alpha * series.iloc[i] + (1 - alpha) * result.iloc[i-1]
        else:
            result.iloc[i] = result.iloc[i-1]
    
    return result

stock_df['adaptive_ewm'] = adaptive_ewm(stock_df['close'])

print("自适应EWM:")
print(stock_df[['close', 'ewm_12', 'adaptive_ewm']].tail(10))

8.6.3 滚动窗口高级应用

print("\n=== 滚动窗口高级应用 ===")

# 1. 多变量滚动分析
print("1. 多变量滚动分析:")

# 滚动协方差矩阵
def rolling_cov_matrix(df, window=20):
    """计算滚动协方差矩阵"""
    numeric_cols = df.select_dtypes(include=[np.number]).columns
    rolling_cov = df[numeric_cols].rolling(window=window).cov()
    return rolling_cov

# 选择数值列进行分析
numeric_data = stock_df[['close', 'volume', 'daily_return']].dropna()
rolling_cov = rolling_cov_matrix(numeric_data, window=20)

print("滚动协方差矩阵示例:")
print(rolling_cov.tail(6))  # 显示最后两个时间点的协方差矩阵

# 2. 滚动回归分析
print("\n2. 滚动回归分析:")

def rolling_beta(y, x, window=20):
    """计算滚动beta系数"""
    def calc_beta(y_window, x_window):
        if len(y_window) < 2 or len(x_window) < 2:
            return np.nan
        covariance = np.cov(y_window, x_window)[0, 1]
        variance = np.var(x_window)
        return covariance / variance if variance != 0 else np.nan
    
    result = pd.Series(index=y.index, dtype=float)
    for i in range(window-1, len(y)):
        y_window = y.iloc[i-window+1:i+1]
        x_window = x.iloc[i-window+1:i+1]
        result.iloc[i] = calc_beta(y_window, x_window)
    
    return result

# 假设volume是市场因子,计算price相对于volume的beta
stock_df['rolling_beta'] = rolling_beta(stock_df['daily_return'], 
                                       stock_df['volume'].pct_change(), 
                                       window=20)

print("滚动Beta系数:")
print(stock_df[['daily_return', 'rolling_beta']].tail(10))

# 3. 滚动异常检测
print("\n3. 滚动异常检测:")

def rolling_z_score(series, window=20):
    """计算滚动Z分数"""
    rolling_mean = series.rolling(window=window).mean()
    rolling_std = series.rolling(window=window).std()
    z_score = (series - rolling_mean) / rolling_std
    return z_score

def rolling_anomaly_detection(series, window=20, threshold=2):
    """滚动异常检测"""
    z_scores = rolling_z_score(series, window)
    anomalies = np.abs(z_scores) > threshold
    return anomalies, z_scores

stock_df['z_score'], _ = rolling_anomaly_detection(stock_df['close'], window=20, threshold=2)
stock_df['is_anomaly'], stock_df['z_score_value'] = rolling_anomaly_detection(stock_df['close'], window=20, threshold=2)

print("异常检测结果:")
anomaly_data = stock_df[stock_df['is_anomaly']][['close', 'z_score_value']]
if not anomaly_data.empty:
    print(anomaly_data.head())
else:
    print("未检测到异常值")

# 4. 滚动性能指标
print("\n4. 滚动性能指标:")

def rolling_information_ratio(returns, benchmark_returns, window=20):
    """计算滚动信息比率"""
    excess_returns = returns - benchmark_returns
    rolling_mean = excess_returns.rolling(window=window).mean()
    rolling_std = excess_returns.rolling(window=window).std()
    ir = rolling_mean / rolling_std
    return ir

def rolling_sortino_ratio(returns, window=20, target_return=0):
    """计算滚动索提诺比率"""
    excess_returns = returns - target_return
    downside_returns = excess_returns.where(excess_returns < 0, 0)
    rolling_mean = excess_returns.rolling(window=window).mean()
    rolling_downside_std = downside_returns.rolling(window=window).std()
    sortino = rolling_mean / rolling_downside_std
    return sortino

# 创建基准收益(假设为0)
benchmark_returns = pd.Series(0, index=stock_df.index)

stock_df['rolling_ir'] = rolling_information_ratio(stock_df['daily_return'], 
                                                  benchmark_returns, 
                                                  window=20)
stock_df['rolling_sortino'] = rolling_sortino_ratio(stock_df['daily_return'], 
                                                   window=20)

print("滚动性能指标:")
print(stock_df[['daily_return', 'rolling_ir', 'rolling_sortino']].tail(10))

# 5. 滚动预测
print("\n5. 滚动预测:")

def rolling_forecast(series, window=20, horizon=1):
    """简单的滚动预测(移动平均)"""
    forecasts = pd.Series(index=series.index, dtype=float)
    
    for i in range(window, len(series)):
        # 使用过去window期的数据预测下一期
        historical_data = series.iloc[i-window:i]
        forecast = historical_data.mean()  # 简单移动平均预测
        
        if i + horizon - 1 < len(series):
            forecasts.iloc[i + horizon - 1] = forecast
    
    return forecasts

stock_df['price_forecast'] = rolling_forecast(stock_df['close'], window=10, horizon=1)

# 计算预测误差
stock_df['forecast_error'] = stock_df['close'] - stock_df['price_forecast']
stock_df['forecast_accuracy'] = 1 - np.abs(stock_df['forecast_error']) / stock_df['close']

print("滚动预测结果:")
forecast_results = stock_df[['close', 'price_forecast', 'forecast_error', 'forecast_accuracy']].dropna()
print(forecast_results.tail(10))

print(f"\n平均预测准确率: {stock_df['forecast_accuracy'].mean():.4f}")

8.7 本章小结

8.7.1 核心知识点

  1. 日期时间基础

    • Python datetime对象的使用
    • Pandas Timestamp和时区处理
    • 时间格式转换和解析
  2. 时间序列索引

    • DatetimeIndex的创建和操作
    • PeriodIndex和TimedeltaIndex
    • 时间频率和偏移量
  3. 时间序列操作

    • 时间选择和切片技巧
    • 时间偏移和滞后特征
    • 时间序列运算和技术指标
  4. 重采样技术

    • 下采样和上采样
    • 插值方法和策略
    • 重采样的高级参数
  5. 滚动窗口分析

    • 移动统计和技术指标
    • 指数加权移动平均
    • 滚动相关性和回归分析

8.7.2 最佳实践

  • 时区管理:始终明确时区设置,避免时区混乱
  • 频率选择:根据数据特点选择合适的时间频率
  • 插值策略:根据数据性质选择合适的插值方法
  • 窗口大小:平衡响应速度和稳定性选择窗口大小
  • 性能优化:对大数据集使用向量化操作

8.7.3 常见陷阱

  • 时区不一致导致的时间错位
  • 重采样时的边界处理问题
  • 滚动窗口的前向偏差
  • 缺失值对统计计算的影响
  • 频率转换时的数据丢失

8.7.4 下一步学习

在下一章中,我们将学习: - 数据可视化的高级技巧 - 统计分析和假设检验 - 时间序列的可视化方法 - 交互式图表的创建


练习题

  1. 创建一个完整的股票技术分析系统
  2. 实现一个时间序列异常检测算法
  3. 构建一个多频率数据融合系统
  4. 设计一个滚动风险管理模型
  5. 开发一个时间序列预测评估框架

记住:时间序列分析是数据科学的重要分支,掌握这些技能将大大提升你的数据分析能力!