基于LSTM的A股股票价格预测系统(torch) :从数据获取到模型训练的完整实现
1. 项目简介
本文介绍了一个使用LSTM(长短期记忆网络)进行股票价格预测的完整系统。该系统使用Python实现,集成了数据获取、预处理、模型训练和预测等功能。
这个代码使用的是 LSTM (Long Short-Term Memory) 模型,这是一种特殊的循环神经网络 (RNN)
2. 技术栈
- Python 3.x
- PyTorch (深度学习框架)
- AKShare (股票数据获取)
- Pandas (数据处理)
- NumPy (数值计算)
- Scikit-learn (数据预处理)
3. 系统架构
3.1 数据获取模块
def get_stock_data(stock_code, start_date, end_date, stock_name):
"""获取股票历史数据"""
print(f"正在获取 {stock_name}({stock_code})的数据...")
try:
df = ak.stock_zh_a_hist(symbol=stock_code,
period="daily",
start_date=start_date,
end_date=end_date,
adjust="qfq") # 使用前复权数据
# ... 数据处理代码
return df
except Exception as e:
print(f"获取{stock_name}数据时发生错误:{str(e)}")
return None
3.2 LSTM模型定义
class StockRNN(nn.Module):
"""股票预测的LSTM模型"""
def __init__(self, input_size, hidden_size, num_layers):
super(StockRNN, self).__init__()
self.hidden_size = hidden_size
self.num_layers = num_layers
self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
self.fc = nn.Linear(hidden_size, 1)
3.3 数据预处理
def prepare_data(df, sequence_length):
"""准备训练数据"""
scaler = MinMaxScaler()
scaled_data = scaler.fit_transform(df[['close']].values)
X, y = [], []
for i in range(len(scaled_data) - sequence_length):
X.append(scaled_data[i:(i + sequence_length)])
y.append(scaled_data[i + sequence_length])
return np.array(X), np.array(y), scaler
4. 主要功能
- 股票数据获取和分析
- 市场状态评估
- 数据预处理和归一化
- LSTM模型训练
- 股价预测
5. 使用方法
- 运行程序
- 输入股票代码(或使用默认值)
- 设置日期范围
- 等待模型训练
- 获取预测结果
# 示例使用
stock_code = "002830" # 股票代码
start_date = "20230101" # 起始日期
end_date = "20240120" # 结束日期
predict_date = "20241209" # 预测日期
6. 模型参数
- 序列长度:10天
- LSTM隐藏层大小:64
- LSTM层数:2
- 训练轮数:100
- 学习率:0.001
7. 风险提示
- 预测结果仅供参考,不构成投资建议
- 长期预测的准确性会显著降低
- 股市受多种因素影响,模型无法预测突发事件
8. 可能的改进方向
- 增加更多特征(如交易量、技术指标等)
- 优化模型架构
- 添加更多市场分析指标
- 实现实时数据更新
- 添加可视化功能
9. 总结
本项目展示了如何使用深度学习技术进行股票价格预测。通过整合数据获取、预处理和模型训练等功能,为股票分析提供了一个完整的解决方案。虽然预测结果仅供参考,但项目的实现过程对理解金融数据分析和深度学习应用具有重要的学习价值。
10. 环境配置与安装
10.1 Python环境要求
- Python 3.8+
10.2 依赖包安装
# 创建虚拟环境(推荐)
python -m venv myvenv
source myvenv/bin/activate # Linux/Mac
# 或
myvenv\Scripts\activate # Windows
# 安装依赖包
pip install akshare
pip install torch
pip install pandas
pip install numpy
pip install scikit-learn
11. 完整代码实现
11.1 股票预测主程序 (stock_prediction_akshare.py)
import akshare as ak # 导入akshare库,用于获取股票数据
import pandas as pd # 导入pandas库,用于数据处理
import numpy as np # 导入numpy库,用于数值计算
import torch # 导入PyTorch库,用于深度学习
import torch.nn as nn # 导入神经网络模块
import torch.optim as optim # 导入优化器模块
from sklearn.preprocessing import MinMaxScaler # 导入数据归一化工具
import random # 导入随机数模块
[此处是完整的 stock_prediction_akshare.py 代码,与之前提供的相同]
11.2 下跌五日监控程序 (downfive5.py)
# 如果有 downfive5.py 的代码,请提供给我,我会添加到这里
11.3 Web应用接口 (app.py)
# 如果有 app.py 的代码,请提供给我,我会添加到这里
12. 运行说明
- 克隆或下载代码到本地
git clone [repository_url]
cd stock-prediction-system
- 安装依赖
pip install -r requirements.txt
- 运行股票预测程序
python stock_prediction_akshare.py
- 按提示输入:
- 股票代码(例如:002830)
- 起始日期(格式:YYYYMMDD)
- 结束日期(格式:YYYYMMDD)
- 预测日期(格式:YYYYMMDD)
13. 常见问题解答
-
数据获取失败
- 检查网络连接
- 确认股票代码是否正确
- 验证日期格式
-
模型训练时间过长
- 可以减少训练轮数(epochs)
- 缩短历史数据范围
- 使用GPU加速(如果可用)
-
预测结果异常
- 检查数据预处理步骤
- 调整模型参数
- 验证输入数据的质量
14. 维护与更新
本项目仍在持续改进中,计划添加的功能包括:
- 多股票同时预测
- 更多技术指标支持
- 预测结果可视化
- 实时数据更新
- Web界面支持
15. 贡献指南
感谢以下开源项目:
- AKShare
- PyTorch
- Pandas
- NumPy
- Scikit-learn
注意:
- 本项目仅供学习和研究使用
- 股市投资有风险,预测结果仅供参考
- 实际投资决策需要考虑多种因素
15. 代码:
import akshare as ak # 导入akshare库,用于获取股票数据
import pandas as pd # 导入pandas库,用于数据处理
import numpy as np # 导入numpy库,用于数值计算
import torch # 导入PyTorch库,用于深度学习
import torch.nn as nn # 导入神经网络模块
import torch.optim as optim # 导入优化器模块
from sklearn.preprocessing import MinMaxScaler # 导入数据归一化工具
import random # 导入随机数模块
def set_random_seed(seed=42):
"""设置随机种子,确保实验结果可重复"""
random.seed(seed) # 设置Python随机数种子
np.random.seed(seed) # 设置NumPy随机数种子
torch.manual_seed(seed) # 设置PyTorch CPU随机数种子
if torch.cuda.is_available():
torch.cuda.manual_seed_all(seed) # 设置PyTorch GPU随机数种子
class StockRNN(nn.Module):
"""定义股票预测的RNN模型类"""
def __init__(self, input_size, hidden_size, num_layers):
"""
初始化模型参数
input_size: 输入特征维度
hidden_size: LSTM隐藏层大小
num_layers: LSTM层数
"""
super(StockRNN, self).__init__()
self.hidden_size = hidden_size
self.num_layers = num_layers
# 定义LSTM层
self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
# 定义全连接层,用于输出预测值
self.fc = nn.Linear(hidden_size, 1)
def forward(self, x):
"""
定义前向传播过程
x: 输入数据,形状为(batch_size, sequence_length, input_size)
"""
# 初始化隐藏状态和细胞状态
h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
# LSTM前向传播
out, _ = self.lstm(x, (h0, c0))
# 取最后一个时间步的输出进行预测
out = self.fc(out[:, -1, :])
return out
def get_stock_name(stock_code):
"""
获取股票名称
stock_code: 股票代码
返回: 股票名称
"""
try:
# 获取A股实时行情数据
df = ak.stock_zh_a_spot_em()
# 查找对应股票代码的信息
stock_info = df[df['代码'] == stock_code]
if not stock_info.empty:
return stock_info.iloc[0]['名称']
# 如果没找到,尝试添加市场前缀
if stock_code.startswith('6'):
full_code = f"sh{stock_code}"
else:
full_code = f"sz{stock_code}"
stock_info = df[df['代码'] == stock_code]
if not stock_info.empty:
return stock_info.iloc[0]['名称']
return f"股票{stock_code}"
except Exception as e:
print(f"获取股票名称时发生错误:{e}")
return f"股票{stock_code}"
def analyze_market_state(df, stock_name):
"""
分析股票市场状态
df: 股票数据DataFrame
stock_name: 股票名称
"""
# 计算日收益率
daily_returns = df['close'].pct_change()
# 计算年化波动率
volatility = daily_returns.std() * np.sqrt(252)
# 计算整体涨跌幅
trend = (df['close'].iloc[-1] - df['close'].iloc[0]) / df['close'].iloc[0]
# 计算最大回撤
cummax = df['close'].cummax() # 计算历史最高价
drawdown = (cummax - df['close']) / cummax # 计算回撤比例
max_drawdown = drawdown.max() # 计算最大回撤
# 打印分析结果
print(f"\n{stock_name}市场状态分析:")
print(f"样本数量: {len(df)} 天")
print(f"年化波动率: {volatility:.2%}")
print(f"整体趋势: {trend:.2%}")
print(f"最大回撤: {max_drawdown:.2%}")
print(f"起始价格: {df['close'].iloc[0]:.2f}")
print(f"结束价格: {df['close'].iloc[-1]:.2f}")
def get_stock_data(stock_code, start_date, end_date, stock_name):
"""
获取股票历史数据
stock_code: 股票代码
start_date: 起始日期
end_date: 结束日期
stock_name: 股票名称
"""
print(f"正在获取 {stock_name}({stock_code})的数据...")
try:
# 使用akshare获取股票历史数据
df = ak.stock_zh_a_hist(symbol=stock_code,
period="daily",
start_date=start_date,
end_date=end_date,
adjust="qfq") # 使用前复权数据
# 重命名列
df = df.rename(columns={'收盘': 'close', '日期': 'date'})
df['date'] = pd.to_datetime(df['date']) # 转换日期格式
df.set_index('date', inplace=True) # 设置日期为索引
# 分析市场状态
analyze_market_state(df, stock_name)
return df
except Exception as e:
print(f"获取{stock_name}数据时发生错误:{str(e)}")
return None
def prepare_data(df, sequence_length):
"""
准备模型训练数据
df: 股票数据DataFrame
sequence_length: 序列长度(用多少天数据预测下一天)
"""
# 数据归一化
scaler = MinMaxScaler()
scaled_data = scaler.fit_transform(df[['close']].values)
# 创建序列数据
X, y = [], []
for i in range(len(scaled_data) - sequence_length):
X.append(scaled_data[i:(i + sequence_length)]) # 输入序列
y.append(scaled_data[i + sequence_length]) # 预测目标
return np.array(X), np.array(y), scaler
def predict_next_day(model, last_sequence, scaler):
"""
预测下一个交易日的价格
model: 训练好的模型
last_sequence: 最后一个序列数据
scaler: 归一化器
"""
with torch.no_grad(): # 不计算梯度
# 准备输入数据
last_sequence_tensor = torch.FloatTensor(last_sequence).unsqueeze(0)
# 进行预测
predicted_scaled = model(last_sequence_tensor)
# 将预测结果转换回原始价格范围
predicted_price = scaler.inverse_transform(predicted_scaled.numpy())
return predicted_price[0][0]
def get_input_with_default(prompt, default_value):
"""
获取用户输入,支持默认值
prompt: 提示信息
default_value: 默认值
"""
user_input = input(f"{prompt} [默认: {default_value}]: ").strip()
return user_input if user_input else default_value
def main():
"""主函数"""
# 设置随机种子
set_random_seed(42)
try:
# 设置默认参数
default_stock_code = "002830"
default_start_date = "20230101"
default_end_date = "20240120"
default_predict_date = "20241209"
# 获取用户输入
stock_code = get_input_with_default(
"请输入股票代码",
default_stock_code
)
start_date = get_input_with_default(
"请输入起始日期(格式:YYYYMMDD)",
default_start_date
)
end_date = get_input_with_default(
"请输入结束日期(格式:YYYYMMDD)",
default_end_date
)
predict_date = get_input_with_default(
"请输入预测日期(格式:YYYYMMDD)",
default_predict_date
)
# 获取股票名称
stock_name = get_stock_name(stock_code)
# 设置模型参数
sequence_length = 10 # 使用10天数据预测下一天
hidden_size = 64 # LSTM隐藏层大小
num_layers = 2 # LSTM层数
epochs = 100 # 训练轮数
learning_rate = 0.001 # 学习率
# 获取并分析数据
df = get_stock_data(stock_code, start_date, end_date, stock_name)
if df is None:
return
# 准备训练数据
X, y, scaler = prepare_data(df, sequence_length)
X_tensor = torch.FloatTensor(X)
y_tensor = torch.FloatTensor(y)
# 初始化模型
model = StockRNN(input_size=1, hidden_size=hidden_size, num_layers=num_layers)
criterion = nn.MSELoss() # 使用均方误差损失
optimizer = optim.Adam(model.parameters(), lr=learning_rate) # 使用Adam优化器
# 训练模型
for epoch in range(epochs):
optimizer.zero_grad() # 清空梯度
outputs = model(X_tensor) # 前向传播
loss = criterion(outputs, y_tensor) # 计算损失
loss.backward() # 反向传播
optimizer.step() # 更新参数
# 每10轮打印一次损失
if (epoch + 1) % 10 == 0:
print(f'Epoch [{epoch+1}/{epochs}], Loss: {loss.item():.4f}')
# 预测未来价格
last_sequence = X[-1]
predicted_price = predict_next_day(model, last_sequence, scaler)
print(f"\n预测 {stock_name}({stock_code})在 {predict_date} 的收盘价: {predicted_price:.2f}")
# 打印风险提示
print("\n风险提示:")
print(f"1. {stock_name}的预测是基于历史数据的模型预测,不构成投资建议")
print("2. 长期预测(超过一周)的准确性会显著降低")
print("3. 股市受多种因素影响,模型无法预测突发事件的影响")
except Exception as e:
print(f"程序运行出错:{str(e)}")
if __name__ == "__main__":
main()
原文地址:https://blog.csdn.net/hzether/article/details/144323575
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!