文章
deepseek 股票趋势预测代码编写
import akshare as ak
import pandas as pd
from sklearn.linear_model import LogisticRegression
from sklearn.exceptions import NotFittedError
def get_stock_close_price(symbol: str, start_date: str, end_date: str, csv_path: str = None):
"""
获取股票收盘价并保存到CSV文件
参数:
symbol (str): 股票代码 (如: "600519" 或 "000001.SZ")
start_date (str): 开始日期 (格式: "YYYY-MM-DD")
end_date (str): 结束日期 (格式同上)
csv_path (str): 可选参数,CSV文件保存路径
返回:
pd.DataFrame: 包含日期和收盘价的数据框
"""
try:
# 获取股票历史数据 (前复权)
stock_df = ak.stock_zh_a_hist(
symbol=symbol,
period="daily",
start_date=start_date.replace("-", ""),
end_date=end_date.replace("-", ""),
adjust="qfq"
)
# 检查数据有效性
if stock_df.empty:
raise ValueError(f"未找到 {symbol} 在 {start_date} 至 {end_date} 之间的数据")
# 数据清洗
stock_df = stock_df[['日期', '收盘']].copy()
stock_df.rename(columns={'日期': 'Date', '收盘': 'Close'}, inplace=True)
stock_df['Date'] = pd.to_datetime(stock_df['Date'])
stock_df.sort_values('Date', inplace=True)
# 保存到CSV
if csv_path:
stock_df.to_csv(csv_path, index=False)
else:
filename = f"stock_{symbol}_close_{start_date}_to_{end_date}.csv"
stock_df.to_csv(filename, index=False)
print(f"数据已保存到: {filename}")
return stock_df
except Exception as e:
print(f"操作失败: {str(e)}")
return pd.DataFrame()
def calculate_wmi(df, windows=[5, 10, 20], weights=[0.5, 0.3, 0.2]):
"""
计算加权动量指标(Weighted Momentum Indicator)
参数:
df (DataFrame): 必须包含Date和Close列的历史数据
"""
# 数据校验
if not {'Date', 'Close'}.issubset(df.columns):
raise ValueError("CSV文件必须包含Date和Close列")
if len(df) < max(windows):
raise ValueError(f"数据量不足,至少需要{max(windows)}个交易日数据")
# 计算各窗口动量
momentum = pd.DataFrame(index=df.index)
for i, window in enumerate(windows):
momentum[f'momentum_{window}'] = df['Close'].pct_change(window - 1)
# 计算加权动量指标(跳过NaN值)
wmi = momentum.mul(weights).sum(axis=1).dropna()
return wmi
def prepare_features(df):
""" 特征工程与数据预处理 """
# 计算指标
df['WMI'] = calculate_wmi(df)
# 构建特征矩阵
features = pd.DataFrame({
'WMI_current': df['WMI'],
'WMI_3d_avg': df['WMI'].rolling(3).mean(),
'WMI_trend': (df['WMI'].diff(3) > 0).astype(int)
}).dropna()
# 构建标签:次日是否上涨(1=涨,0=跌)
labels = (df['Close'].pct_change().shift(-1) > 0).astype(int)
# 对齐索引
aligned_index = features.index.intersection(labels.index)
return features.loc[aligned_index], labels.loc[aligned_index]
def predict_next_day(csv_path):
""" 主预测函数 """
# 读取数据
try:
df = pd.read_csv(csv_path, parse_dates=['Date'])
df = df.sort_values('Date').reset_index(drop=True)
except FileNotFoundError:
raise FileNotFoundError("CSV文件未找到,请检查路径")
# 准备特征和标签
X, y = prepare_features(df)
# 使用最近30个有效交易日
if len(X) < 30:
raise ValueError("有效数据不足60个交易日")
X_recent = X.iloc[-30:]
y_recent = y.iloc[-30:]
# 训练逻辑回归模型
model = LogisticRegression()
try:
model.fit(X_recent[:-1], y_recent[:-1]) # 用前59天训练
except ValueError as e:
raise ValueError(f"模型训练失败: {str(e)}")
# 获取最新特征
latest_features = X_recent.iloc[[-1]]
# 预测次日概率(修复核心错误)
try:
proba = model.predict_proba(latest_features) # 返回形状为 (1,2) 的数组
# 确认类别顺序(假设 classes_=[0,1] 表示索引0=下跌,1=上涨)
rise_prob = float(proba[0, 1]) # 提取上涨概率
fall_prob = float(proba[0, 0]) # 提取下跌概率
except NotFittedError:
raise RuntimeError("模型未正确训练")
return {
'last_date': df['Date'].iloc[-1].strftime('%Y-%m-%d'),
'rise_prob': rise_prob, # 标量浮点数
'fall_prob': fall_prob,
'accuracy': float(model.score(X_recent[:-1], y_recent[:-1]))
}
if __name__ == "__main__":
# 输入参数
stock_code = "301078"
start = "2024-01-01"
end = "2025-03-20"
csv_path = "stock_data.csv" # 修改为实际路径
# 获取数据并保存
df = get_stock_close_price(
symbol=stock_code,
start_date=start,
end_date=end,
csv_path=csv_path
)
# 显示结果
if not df.empty:
print("\n最新5个交易日收盘价:")
print(df.tail(5))
try:
result = predict_next_day(csv_path)
print(f"\n最新交易日:{result['last_date']}")
print("=" * 40)
print(f"↑ 明日上涨概率:{result['rise_prob']:.2%}")
print(f"↓ 明日下跌概率:{result['fall_prob']:.2%}")
print(f"模型近期准确率:{result['accuracy']:.2%}")
# 交易信号生成
if result['rise_prob'] > 0.65:
signal = "强烈买入信号 ★★★★★"
elif result['rise_prob'] > 0.55:
signal = "温和买入信号 ★★★☆"
elif result['fall_prob'] > 0.65:
signal = "强烈卖出信号 ▼▼▼▼▼"
elif result['fall_prob'] > 0.55:
signal = "温和卖出信号 ▼▼▼☆"
else:
signal = "中性观望信号 ●●●"
print("\n操作建议:")
print(f"【{signal}】")
except Exception as e:
print(f"预测失败:{str(e)}")
print("请检查:1.文件路径 2.数据格式 3.数据量是否足够")
"""
股票涨跌预测系统(训练验证版)
功能:使用80%历史数据训练,20%验证,预测下一交易日涨跌
作者:智能助手
日期:2024年1月
"""
import akshare as ak
import pandas as pd
import numpy as np
from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score, confusion_matrix
# 设置显示格式
pd.set_option('display.float_format', lambda x: '%.2f' % x)
np.set_printoptions(precision=3)
# ================== 数据获取模块 ==================
def get_stock_data(symbol: str, start_date: str, end_date: str) -> pd.DataFrame:
"""
获取股票历史数据(前复权)
参数:
symbol: 股票代码,如"600519"
start_date: 开始日期,"YYYY-MM-DD"
end_date: 结束日期,"YYYY-MM-DD"
返回:
包含日期、开盘价、最高价、最低价、收盘价、成交量的DataFrame
"""
try:
# 从akshare获取数据
df = ak.stock_zh_a_hist(
symbol=symbol,
period="daily",
start_date=start_date.replace("-", ""),
end_date=end_date.replace("-", ""),
adjust="qfq"
)
# 数据清洗
df = df[['日期', '开盘', '最高', '最低', '收盘', '成交量']].copy()
df.columns = ['date', 'open', 'high', 'low', 'close', 'volume']
df['date'] = pd.to_datetime(df['date'])
df.sort_values('date', inplace=True)
df.reset_index(drop=True, inplace=True)
# 检查数据有效性
if df.empty:
raise ValueError("获取到的数据为空,请检查参数")
if len(df) < 60:
raise ValueError("至少需要60个交易日数据")
return df
except Exception as e:
print(f"数据获取失败: {str(e)}")
return pd.DataFrame()
# ================== 特征工程模块 ==================
def calculate_technical_indicators(df: pd.DataFrame) -> pd.DataFrame:
"""
计算技术指标
返回包含以下特征的DataFrame:
- WMI(加权动量指标)
- 均线序列
- 量价趋势
"""
# 计算加权动量指标(5/10/20日)
def _calc_wmi(price_series, windows=[5, 10, 20], weights=[0.5, 0.3, 0.2]):
momentum = pd.DataFrame()
for w in windows:
momentum[f'momentum_{w}'] = price_series.pct_change(w)
return momentum.dot(weights)
df['wmi'] = _calc_wmi(df['close'])
# 计算均线序列(5/10/20日)
ma_windows = [5, 10, 20]
for w in ma_windows:
df[f'ma{w}'] = df['close'].rolling(w).mean()
# 量价趋势指标(成交量变化率)
df['volume_change'] = df['volume'].pct_change()
# 价格波动率(20日标准差)
df['volatility'] = df['close'].pct_change().rolling(20).std()
return df
def create_features_target(df: pd.DataFrame) -> pd.DataFrame:
"""
创建特征矩阵和目标变量
目标变量:下一交易日是否上涨(1=涨,0=跌)
"""
# 计算技术指标
df = calculate_technical_indicators(df)
# 构建特征矩阵
features = pd.DataFrame({
# 动量指标
'wmi': df['wmi'],
'wmi_3d_change': df['wmi'].pct_change(3),
# 均线特征
'ma5_vs_ma10': (df['ma5'] > df['ma10']).astype(int),
'ma10_vs_ma20': (df['ma10'] > df['ma20']).astype(int),
# 量价关系
'volume_spike': (df['volume'] > 1.5 * df['volume'].rolling(5).mean()).astype(int),
'close_above_open': (df['close'] > df['open']).astype(int),
# 波动率
'volatility': df['volatility']
})
# 创建目标变量(使用未来1天的收益率)
features['target'] = (df['close'].shift(-1) > df['close']).astype(int)
# 删除包含NaN的行(首次20日因为计算波动率)
features.dropna(inplace=True)
return features
# ================== 模型训练模块 ==================
def train_model(features: pd.DataFrame) -> tuple:
"""
训练模型流程:
1. 按时间顺序分割数据集(前80%训练,后20%验证)
2. 特征标准化
3. 训练逻辑回归模型
4. 验证集评估
"""
# 按时间顺序分割
split_idx = int(len(features) * 0.8)
train = features.iloc[:split_idx]
val = features.iloc[split_idx:]
# 检查分割合理性
if len(train) < 50 or len(val) < 10:
raise ValueError("数据集过小,请扩大时间范围")
# 特征标准化
scaler = StandardScaler()
X_train = scaler.fit_transform(train.drop('target', axis=1))
X_val = scaler.transform(val.drop('target', axis=1))
y_train = train['target']
y_val = val['target']
# 初始化模型(带类别平衡权重)
model = LogisticRegression(
class_weight='balanced',
max_iter=1000,
random_state=42
)
# 训练模型
model.fit(X_train, y_train)
# 验证集评估
val_pred = model.predict(X_val)
print("\n=== 验证集表现 ===")
print(f"准确率: {accuracy_score(y_val, val_pred):.2%}")
print("混淆矩阵:")
print(confusion_matrix(y_val, val_pred))
return model, scaler
# ================== 预测模块 ==================
def prepare_prediction_data(raw_df: pd.DataFrame) -> pd.DataFrame:
"""准备最新交易日的特征数据"""
# 计算最新技术指标
latest = raw_df.iloc[-1].copy()
# 需要历史数据计算的指标
return pd.DataFrame({
'wmi': [latest['wmi']],
'wmi_3d_change': [latest['wmi'] - raw_df.iloc[-4]['wmi']],
'ma5_vs_ma10': [latest['ma5'] > latest['ma10']],
'ma10_vs_ma20': [latest['ma10'] > latest['ma20']],
'volume_spike': [latest['volume'] > 1.5 * raw_df['volume'].iloc[-5:].mean()],
'close_above_open': [latest['close'] > latest['open']],
'volatility': [latest['volatility']]
}).astype(float) # 将布尔值转为1.0/0.0
def predict_next_day(model, scaler, raw_df: pd.DataFrame) -> dict:
"""执行预测并返回结果字典"""
# 准备预测用特征
X_new = prepare_prediction_data(raw_df)
# 标准化处理
X_scaled = scaler.transform(X_new)
# 执行预测
pred = model.predict(X_scaled)
proba = model.predict_proba(X_scaled)
return {
'prediction': '看涨' if pred == 1 and proba[0, 1] > 0.65 else '看跌',
'上涨概率': f"{proba[0, 1]:.1%}",
'下跌概率': f"{proba[0, 0]:.1%}",
'最新收盘价': raw_df['close'].iloc[-1],
'训练天数': len(raw_df)
}
# ================== 主流程 ==================
if __name__ == "__main__":
# 参数设置
STOCK_CODE = "301078"
START_DATE = "2024-01-01"
END_DATE = "2025-03-20"
try:
print("正在获取数据...")
raw_df = get_stock_data(STOCK_CODE, START_DATE, END_DATE)
print("近5条数据: ")
print(raw_df.tail(5))
print("\n生成特征矩阵...")
features = create_features_target(raw_df)
print(f"有效样本数: {len(features)}")
print("特征样例:")
print(features.iloc[-3:])
print("\n训练模型中...")
model, scaler = train_model(features)
print("\n生成预测...")
prediction = predict_next_day(model, scaler, raw_df)
print("\n=== 下一交易日预测 ===")
for k, v in prediction.items():
print(f"{k:10}: {v}")
except Exception as e:
print(f"\n程序运行出错: {str(e)}")