大数据

deepseek 股票趋势预测代码编写

import akshare as ak
import pandas as pd
from sklearn.linear_model import LogisticRegression
from sklearn.exceptions import NotFittedError


def get_stock_close_price(symbol: str, start_date: str, end_date: str, csv_path: str = None):
    """
    获取股票收盘价并保存到CSV文件

    参数:
        symbol (str): 股票代码 (如: "600519" 或 "000001.SZ")
        start_date (str): 开始日期 (格式: "YYYY-MM-DD")
        end_date (str): 结束日期 (格式同上)
        csv_path (str): 可选参数,CSV文件保存路径

    返回:
        pd.DataFrame: 包含日期和收盘价的数据框
    """
    try:
        # 获取股票历史数据 (前复权)
        stock_df = ak.stock_zh_a_hist(
            symbol=symbol,
            period="daily",
            start_date=start_date.replace("-", ""),
            end_date=end_date.replace("-", ""),
            adjust="qfq"
        )

        # 检查数据有效性
        if stock_df.empty:
            raise ValueError(f"未找到 {symbol}{start_date}{end_date} 之间的数据")

        # 数据清洗
        stock_df = stock_df[['日期', '收盘']].copy()
        stock_df.rename(columns={'日期': 'Date', '收盘': 'Close'}, inplace=True)
        stock_df['Date'] = pd.to_datetime(stock_df['Date'])
        stock_df.sort_values('Date', inplace=True)

        # 保存到CSV
        if csv_path:
            stock_df.to_csv(csv_path, index=False)
        else:
            filename = f"stock_{symbol}_close_{start_date}_to_{end_date}.csv"
            stock_df.to_csv(filename, index=False)
            print(f"数据已保存到: {filename}")

        return stock_df

    except Exception as e:
        print(f"操作失败: {str(e)}")
        return pd.DataFrame()


def calculate_wmi(df, windows=[5, 10, 20], weights=[0.5, 0.3, 0.2]):
    """
    计算加权动量指标(Weighted Momentum Indicator)

    参数:
        df (DataFrame): 必须包含Date和Close列的历史数据
    """
    # 数据校验
    if not {'Date', 'Close'}.issubset(df.columns):
        raise ValueError("CSV文件必须包含Date和Close列")
    if len(df) < max(windows):
        raise ValueError(f"数据量不足,至少需要{max(windows)}个交易日数据")

    # 计算各窗口动量
    momentum = pd.DataFrame(index=df.index)
    for i, window in enumerate(windows):
        momentum[f'momentum_{window}'] = df['Close'].pct_change(window - 1)

    # 计算加权动量指标(跳过NaN值)
    wmi = momentum.mul(weights).sum(axis=1).dropna()
    return wmi


def prepare_features(df):
    """ 特征工程与数据预处理 """
    # 计算指标
    df['WMI'] = calculate_wmi(df)

    # 构建特征矩阵
    features = pd.DataFrame({
        'WMI_current': df['WMI'],
        'WMI_3d_avg': df['WMI'].rolling(3).mean(),
        'WMI_trend': (df['WMI'].diff(3) > 0).astype(int)
    }).dropna()

    # 构建标签:次日是否上涨(1=涨,0=跌)
    labels = (df['Close'].pct_change().shift(-1) > 0).astype(int)

    # 对齐索引
    aligned_index = features.index.intersection(labels.index)
    return features.loc[aligned_index], labels.loc[aligned_index]


def predict_next_day(csv_path):
    """ 主预测函数 """
    # 读取数据
    try:
        df = pd.read_csv(csv_path, parse_dates=['Date'])
        df = df.sort_values('Date').reset_index(drop=True)
    except FileNotFoundError:
        raise FileNotFoundError("CSV文件未找到,请检查路径")

    # 准备特征和标签
    X, y = prepare_features(df)

    # 使用最近30个有效交易日
    if len(X) < 30:
        raise ValueError("有效数据不足60个交易日")
    X_recent = X.iloc[-30:]
    y_recent = y.iloc[-30:]

    # 训练逻辑回归模型
    model = LogisticRegression()
    try:
        model.fit(X_recent[:-1], y_recent[:-1])  # 用前59天训练
    except ValueError as e:
        raise ValueError(f"模型训练失败: {str(e)}")

    # 获取最新特征
    latest_features = X_recent.iloc[[-1]]

    # 预测次日概率(修复核心错误)
    try:
        proba = model.predict_proba(latest_features)  # 返回形状为 (1,2) 的数组

        # 确认类别顺序(假设 classes_=[0,1] 表示索引0=下跌,1=上涨)
        rise_prob = float(proba[0, 1])  # 提取上涨概率
        fall_prob = float(proba[0, 0])  # 提取下跌概率

    except NotFittedError:
        raise RuntimeError("模型未正确训练")

    return {
        'last_date': df['Date'].iloc[-1].strftime('%Y-%m-%d'),
        'rise_prob': rise_prob,  # 标量浮点数
        'fall_prob': fall_prob,
        'accuracy': float(model.score(X_recent[:-1], y_recent[:-1]))
    }


if __name__ == "__main__":
    # 输入参数
    stock_code = "301078"
    start = "2024-01-01"
    end = "2025-03-20"

    csv_path = "stock_data.csv"  # 修改为实际路径

    # 获取数据并保存
    df = get_stock_close_price(
        symbol=stock_code,
        start_date=start,
        end_date=end,
        csv_path=csv_path
    )

    # 显示结果
    if not df.empty:
        print("\n最新5个交易日收盘价:")
        print(df.tail(5))

    try:
        result = predict_next_day(csv_path)

        print(f"\n最新交易日:{result['last_date']}")
        print("=" * 40)
        print(f"↑ 明日上涨概率:{result['rise_prob']:.2%}")
        print(f"↓ 明日下跌概率:{result['fall_prob']:.2%}")
        print(f"模型近期准确率:{result['accuracy']:.2%}")

        # 交易信号生成
        if result['rise_prob'] > 0.65:
            signal = "强烈买入信号 ★★★★★"
        elif result['rise_prob'] > 0.55:
            signal = "温和买入信号 ★★★☆"
        elif result['fall_prob'] > 0.65:
            signal = "强烈卖出信号 ▼▼▼▼▼"
        elif result['fall_prob'] > 0.55:
            signal = "温和卖出信号 ▼▼▼☆"
        else:
            signal = "中性观望信号 ●●●"

        print("\n操作建议:")
        print(f"【{signal}】")

    except Exception as e:
        print(f"预测失败:{str(e)}")
        print("请检查:1.文件路径 2.数据格式 3.数据量是否足够")
"""
股票涨跌预测系统(训练验证版)
功能:使用80%历史数据训练,20%验证,预测下一交易日涨跌
作者:智能助手
日期:2024年1月
"""
import akshare as ak
import pandas as pd
import numpy as np
from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score, confusion_matrix

# 设置显示格式
pd.set_option('display.float_format', lambda x: '%.2f' % x)
np.set_printoptions(precision=3)


# ================== 数据获取模块 ==================
def get_stock_data(symbol: str, start_date: str, end_date: str) -> pd.DataFrame:
    """
    获取股票历史数据(前复权)
    参数:
        symbol: 股票代码,如"600519"
        start_date: 开始日期,"YYYY-MM-DD"
        end_date: 结束日期,"YYYY-MM-DD"
    返回:
        包含日期、开盘价、最高价、最低价、收盘价、成交量的DataFrame
    """
    try:
        # 从akshare获取数据
        df = ak.stock_zh_a_hist(
            symbol=symbol,
            period="daily",
            start_date=start_date.replace("-", ""),
            end_date=end_date.replace("-", ""),
            adjust="qfq"
        )

        # 数据清洗
        df = df[['日期', '开盘', '最高', '最低', '收盘', '成交量']].copy()
        df.columns = ['date', 'open', 'high', 'low', 'close', 'volume']
        df['date'] = pd.to_datetime(df['date'])
        df.sort_values('date', inplace=True)
        df.reset_index(drop=True, inplace=True)

        # 检查数据有效性
        if df.empty:
            raise ValueError("获取到的数据为空,请检查参数")
        if len(df) < 60:
            raise ValueError("至少需要60个交易日数据")

        return df

    except Exception as e:
        print(f"数据获取失败: {str(e)}")
        return pd.DataFrame()


# ================== 特征工程模块 ==================
def calculate_technical_indicators(df: pd.DataFrame) -> pd.DataFrame:
    """
    计算技术指标
    返回包含以下特征的DataFrame:
    - WMI(加权动量指标)
    - 均线序列
    - 量价趋势
    """

    # 计算加权动量指标(5/10/20日)
    def _calc_wmi(price_series, windows=[5, 10, 20], weights=[0.5, 0.3, 0.2]):
        momentum = pd.DataFrame()
        for w in windows:
            momentum[f'momentum_{w}'] = price_series.pct_change(w)
        return momentum.dot(weights)

    df['wmi'] = _calc_wmi(df['close'])

    # 计算均线序列(5/10/20日)
    ma_windows = [5, 10, 20]
    for w in ma_windows:
        df[f'ma{w}'] = df['close'].rolling(w).mean()

    # 量价趋势指标(成交量变化率)
    df['volume_change'] = df['volume'].pct_change()

    # 价格波动率(20日标准差)
    df['volatility'] = df['close'].pct_change().rolling(20).std()

    return df


def create_features_target(df: pd.DataFrame) -> pd.DataFrame:
    """
    创建特征矩阵和目标变量
    目标变量:下一交易日是否上涨(1=涨,0=跌)
    """
    # 计算技术指标
    df = calculate_technical_indicators(df)

    # 构建特征矩阵
    features = pd.DataFrame({
        # 动量指标
        'wmi': df['wmi'],
        'wmi_3d_change': df['wmi'].pct_change(3),

        # 均线特征
        'ma5_vs_ma10': (df['ma5'] > df['ma10']).astype(int),
        'ma10_vs_ma20': (df['ma10'] > df['ma20']).astype(int),

        # 量价关系
        'volume_spike': (df['volume'] > 1.5 * df['volume'].rolling(5).mean()).astype(int),
        'close_above_open': (df['close'] > df['open']).astype(int),

        # 波动率
        'volatility': df['volatility']
    })

    # 创建目标变量(使用未来1天的收益率)
    features['target'] = (df['close'].shift(-1) > df['close']).astype(int)

    # 删除包含NaN的行(首次20日因为计算波动率)
    features.dropna(inplace=True)

    return features


# ================== 模型训练模块 ==================
def train_model(features: pd.DataFrame) -> tuple:
    """
    训练模型流程:
    1. 按时间顺序分割数据集(前80%训练,后20%验证)
    2. 特征标准化
    3. 训练逻辑回归模型
    4. 验证集评估
    """
    # 按时间顺序分割
    split_idx = int(len(features) * 0.8)
    train = features.iloc[:split_idx]
    val = features.iloc[split_idx:]

    # 检查分割合理性
    if len(train) < 50 or len(val) < 10:
        raise ValueError("数据集过小,请扩大时间范围")

    # 特征标准化
    scaler = StandardScaler()
    X_train = scaler.fit_transform(train.drop('target', axis=1))
    X_val = scaler.transform(val.drop('target', axis=1))
    y_train = train['target']
    y_val = val['target']

    # 初始化模型(带类别平衡权重)
    model = LogisticRegression(
        class_weight='balanced',
        max_iter=1000,
        random_state=42
    )

    # 训练模型
    model.fit(X_train, y_train)

    # 验证集评估
    val_pred = model.predict(X_val)
    print("\n=== 验证集表现 ===")
    print(f"准确率: {accuracy_score(y_val, val_pred):.2%}")
    print("混淆矩阵:")
    print(confusion_matrix(y_val, val_pred))

    return model, scaler


# ================== 预测模块 ==================
def prepare_prediction_data(raw_df: pd.DataFrame) -> pd.DataFrame:
    """准备最新交易日的特征数据"""
    # 计算最新技术指标
    latest = raw_df.iloc[-1].copy()

    # 需要历史数据计算的指标
    return pd.DataFrame({
        'wmi': [latest['wmi']],
        'wmi_3d_change': [latest['wmi'] - raw_df.iloc[-4]['wmi']],
        'ma5_vs_ma10': [latest['ma5'] > latest['ma10']],
        'ma10_vs_ma20': [latest['ma10'] > latest['ma20']],
        'volume_spike': [latest['volume'] > 1.5 * raw_df['volume'].iloc[-5:].mean()],
        'close_above_open': [latest['close'] > latest['open']],
        'volatility': [latest['volatility']]
    }).astype(float)  # 将布尔值转为1.0/0.0


def predict_next_day(model, scaler, raw_df: pd.DataFrame) -> dict:
    """执行预测并返回结果字典"""
    # 准备预测用特征
    X_new = prepare_prediction_data(raw_df)

    # 标准化处理
    X_scaled = scaler.transform(X_new)

    # 执行预测
    pred = model.predict(X_scaled)
    proba = model.predict_proba(X_scaled)

    return {
        'prediction': '看涨' if pred == 1 and proba[0, 1] > 0.65 else '看跌',
        '上涨概率': f"{proba[0, 1]:.1%}",
        '下跌概率': f"{proba[0, 0]:.1%}",
        '最新收盘价': raw_df['close'].iloc[-1],
        '训练天数': len(raw_df)
    }


# ================== 主流程 ==================
if __name__ == "__main__":
    # 参数设置
    STOCK_CODE = "301078"
    START_DATE = "2024-01-01"
    END_DATE = "2025-03-20"

    try:
        print("正在获取数据...")
        raw_df = get_stock_data(STOCK_CODE, START_DATE, END_DATE)

        print("近5条数据: ")
        print(raw_df.tail(5))

        print("\n生成特征矩阵...")
        features = create_features_target(raw_df)
        print(f"有效样本数: {len(features)}")
        print("特征样例:")
        print(features.iloc[-3:])

        print("\n训练模型中...")
        model, scaler = train_model(features)

        print("\n生成预测...")
        prediction = predict_next_day(model, scaler, raw_df)
        print("\n=== 下一交易日预测 ===")
        for k, v in prediction.items():
            print(f"{k:10}: {v}")

    except Exception as e:
        print(f"\n程序运行出错: {str(e)}")