大数据

动态库+动态项目配置管理

需求:根据用户指定的不同的环境以及当前环境下不同的项目读取不同的配置信息

1、项目结构

multi_db_tool/
├── config/
   ├── __init__.py
   ├── settings.py        # 新:INI + Pydantic 配置加载
   ├── models.py          # 新:各模块配置模型
├── env/
   ├── dev.ini            # ← 改用 .ini 格式
   └── prod.ini
├── db.py
└── main.py

2、文件代码

config/models.py

from pydantic import BaseModel


class MainDBConfig(BaseModel):
    host: str
    port: int
    username: str
    password: str
    database: str

    @property
    def url(self) -> str:
        from urllib.parse import quote_plus
        user = quote_plus(self.username)
        pwd = quote_plus(self.password)
        return f"mysql+pymysql://{user}:{pwd}@{self.host}:{self.port}/{self.database}"


class DolphinSchedulerConfig(BaseModel):
    web_url: str
    username: str
    password: str
    project_code: int  # 项目专属
    datasource_id: int  # 项目专属
    retry_times: int = 3

config/settings.py

import configparser
from pathlib import Path
from typing import Optional, Literal
from .models import MainDBConfig, DolphinSchedulerConfig


class Settings:
    def __init__(
            self,
            env: Optional[Literal["dev", "prod"]] = None,
            project: Optional[str] = None,
            datasource_id: Optional[str] = None
    ):
        # 1. 读取 defaults.ini 获取默认值
        defaults_path = Path(__file__).parent.parent / "env" / "defaults.ini"
        parser_defaults = configparser.ConfigParser()
        parser_defaults.read(defaults_path, encoding="utf-8")

        default_env = parser_defaults.get("app_defaults", "env", fallback="dev")
        default_project = parser_defaults.get("app_defaults", "project", fallback="projectA")
        default_ds_id = parser_defaults.get("app_defaults", "datasource_id", fallback="1001")

        self.env = env or default_env
        self.project = project or default_project
        self.default_ds_id = datasource_id or default_ds_id

        # 2. 构建配置文件名: {env}_{project}.ini
        config_filename = f"{self.env}_{self.project}.ini"
        config_path = Path(__file__).parent.parent / "env" / config_filename

        if not config_path.exists():
            raise FileNotFoundError(
                f"配置文件不存在: {config_path}\n"
                f"请检查是否已创建 {config_filename}"
            )

        # 3. 加载配置
        parser = configparser.ConfigParser()
        parser.read(config_path, encoding="utf-8")

        self.main_db = MainDBConfig(**parser["main_db"])
        self.dolphinscheduler = DolphinSchedulerConfig(**parser["dolphinscheduler"])

    def __repr__(self):
        return f"<Settings env={self.env} project={self.project}>"

env/default.ini

[app_defaults]
env = dev
project = projectA
datasource_id = 1001

env/dev_projectA.ini

[main_db]
host = 192.168.0.xxx
port = 3306
username = xxx
password = xxx
database = xxx

[dolphinscheduler]
web_url = http://192.168.0.xxx:12345/dolphinscheduler
username = xxx
password = xxx
project_code = 1234567
datasource_id = 3
retry_times = 3

env/dev_projectB.ini

[main_db]
host = 192.168.0.xxx
port = 3306
username = xxx
password = xxx
database = xxx

[dolphinscheduler]
web_url = http://192.168.0.xxx:12345/dolphinscheduler
username = xxx
password = xxx
project_code = 7654321
datasource_id = 3
retry_times = 3

db.py

from sqlalchemy import create_engine, text
from sqlalchemy.pool import QueuePool
from contextlib import contextmanager
from urllib.parse import quote_plus
from typing import Dict, Any

# 全局主库引擎 由main.py 初始化
main_engine = None


def init_main_engine(settings) -> None:
    """
    初始化主库(存放 datasource 表的数据库)连接引擎。
    """
    global main_engine
    main_engine = create_engine(
        settings.main_db.url,
        poolclass=QueuePool,
        pool_size=5,
        max_overflow=10,
        pool_pre_ping=True,  # 自动检测并重连失效连接
        echo=False
    )


def get_datasource_config(ds_id: str) -> Dict[str, Any]:
    """
    从主库的 datasource 表中读取指定 ID 的连接信息。
    返回字典,字段名作为 key(如 host, port, username...)。
    """
    with main_engine.connect() as conn:
        result = conn.execute(
            text("""
                SELECT host, port, username, password, default_database 
                FROM datasource 
                WHERE id = :id
            """),
            {"id": ds_id}
        ).mappings().fetchone()

    if not result:
        raise ValueError(f"Datasource ID {ds_id} not found in metadata DB")

    # 转为普通字典
    data = dict(result)

    # 字段重命名:default_database → database(便于后续使用)
    data["database"] = data.pop("default_database")

    # 确保端口为整数
    data["port"] = int(data["port"])

    # 对用户名和密码进行 URL 编码(支持 @、# 等特殊字符)
    data["username"] = quote_plus(data["username"])
    data["password"] = quote_plus(data["password"])

    return data


def get_engine_by_id(ds_id: str):
    """
    根据 datasource ID 动态创建目标数据库的 SQLAlchemy Engine。
    每次调用都读取最新配置,避免缓存导致的配置过期问题。
    """
    config = get_datasource_config(ds_id)
    url = f"mysql+pymysql://{config['username']}:{config['password']}@{config['host']}:{config['port']}/{config['database']}"
    return create_engine(
        url,
        poolclass=QueuePool,
        pool_size=5,
        max_overflow=10,
        pool_pre_ping=True,
        echo=False,
    )


@contextmanager
def get_connection(ds_id: str):
    """
    上下文管理器:安全获取目标数据库连接。
    自动处理连接创建与关闭,并在出错时抛出带上下文的异常。
    """
    try:
        engine = get_engine_by_id(ds_id)
        conn = engine.connect()
        yield conn
    except Exception as e:
        raise RuntimeError(f"Failed to connect to datasource {ds_id}: {e}") from e
    finally:
        if 'conn' in locals() and conn:
            conn.close()

main.py

from config.settings import Settings
from db import init_main_engine, get_connection
from sqlalchemy import text


def get_user_input_with_default(prompt: str, default: str) -> str:
    user_input = input(f"{prompt} (默认: {default}): ").strip()
    return user_input if user_input else default


def main():
    # 1. 读取 defaults 获取默认 env 和 project
    temp_settings = Settings()
    default_env = temp_settings.env
    default_project = temp_settings.project
    default_ds_id = temp_settings.default_ds_id

    print("🔧 多项目多环境配置工具")
    print("=" * 50)

    # 2. 交互式输入
    env = get_user_input_with_default("请输入环境 (dev/prod/test)", default_env)
    project = get_user_input_with_default("请输入项目名", default_project)
    ds_id = get_user_input_with_default(f"请输入 datasource ID", default_ds_id)

    # 3. 加载配置
    try:
        settings = Settings(env=env, project=project, datasource_id=ds_id)
        print(f"\n✅ 已加载配置: {env} / {project}")
        print(f"   🐬 DS 项目码: {settings.dolphinscheduler.project_code}")
        print(f"   🗄️  主库: {settings.main_db.database}")
    except Exception as e:
        print(f"❌ 配置加载失败: {e}")
        return

    # 4. 测试连接...
    try:
        init_main_engine(settings)
        with get_connection(ds_id) as conn:
            version = conn.execute(text("SELECT VERSION()")).fetchone()[0]
            print(f"✅ 目标库连接成功!MySQL 版本: {version}")
    except Exception as e:
        print(f"❌ 连接失败: {e}")
        return

    print("\n🎉 完成!")


if __name__ == "__main__":
    try:
        main()
    except KeyboardInterrupt:
        print("\n\n👋 退出。")

web服务等长时间运行db.py需要缓存engine

"""
Web 服务专用:带缓存的多数据源连接管理。
适用于 FastAPI/Django 等长期运行服务。
"""

from sqlalchemy import create_engine, text
from sqlalchemy.pool import QueuePool
from contextlib import contextmanager
from urllib.parse import quote_plus
from typing import Dict, Any
import hashlib
import threading
from sqlalchemy.exc import OperationalError, DatabaseError

# === 全局状态 ===
main_engine = None
_target_engines = {}  # 缓存: {(ds_id, fingerprint): engine}
_engine_lock = threading.Lock()  # 线程安全锁


def init_main_engine(settings) -> None:
    """初始化主库引擎(存放 datasource 表)"""
    global main_engine
    main_engine = create_engine(
        settings.main_db.url,
        poolclass=QueuePool,
        pool_size=2,  # 主库连接池可小一点
        max_overflow=5,
        pool_pre_ping=True,
        echo=False,
    )


def _config_fingerprint(config: Dict[str, Any]) -> str:
    """基于关键字段生成配置指纹,用于缓存键"""
    items = (
        config["host"],
        config["port"],
        config["username"],
        config["password"],
        config["database"],
    )
    return hashlib.md5(repr(items).encode()).hexdigest()


def get_datasource_config(ds_id: str) -> Dict[str, Any]:
    """从主库读取 datasource 配置(同原逻辑)"""
    with main_engine.connect() as conn:
        result = conn.execute(
            text("""
                SELECT host, port, username, password, default_database 
                FROM datasource 
                WHERE id = :id
            """),
            {"id": ds_id}
        ).mappings().fetchone()

    if not result:
        raise ValueError(f"Datasource ID {ds_id} not found in metadata DB")

    data = dict(result)
    data["database"] = data.pop("default_database")
    data["port"] = int(data["port"])
    data["username"] = quote_plus(data["username"])
    data["password"] = quote_plus(data["password"])
    return data


def _get_or_create_engine(ds_id: str):
    """
    获取或创建目标数据库 Engine(带缓存)
    """
    # 1. 读取最新配置
    config = get_datasource_config(ds_id)
    fingerprint = _config_fingerprint(config)
    cache_key = (ds_id, fingerprint)

    # 2. 检查缓存
    with _engine_lock:
        if cache_key in _target_engines:
            return _target_engines[cache_key]

        # 3. 清理同一 ds_id 的旧缓存(可选)
        old_keys = [k for k in _target_engines if k[0] == ds_id]
        for k in old_keys:
            del _target_engines[k]

        # 4. 创建新 Engine(Web 服务连接池要小!)
        url = f"mysql+pymysql://{config['username']}:{config['password']}@{config['host']}:{config['port']}/{config['database']}"
        engine = create_engine(
            url,
            poolclass=QueuePool,
            pool_size=2,  # 每个数据源常驻 2 连接
            max_overflow=5,  # 最多突发到 7 连接
            pool_pre_ping=True,
            pool_recycle=3600,  # 1小时回收,防 MySQL 8h 断连
            echo=False,
        )
        _target_engines[cache_key] = engine
        return engine


@contextmanager
def get_connection(ds_id: str):
    """
    获取连接,支持连接失败时自动清理缓存并重试。
    """
    max_retries = 1  # 失败后最多重试1次
    for attempt in range(max_retries + 1):
        try:
            engine = _get_or_create_engine(ds_id)
            conn = engine.connect()
            yield conn
            return  # 成功则退出
        except (OperationalError, DatabaseError) as e:
            # 可能是认证失败、连接中断等
            error_msg = str(e).lower()
            if "access denied" in error_msg or "password" in error_msg or "connection" in error_msg:
                # 关键:移除缓存的 Engine
                with _engine_lock:
                    # 清理所有该 ds_id 的缓存(因为当前配置可能已失效)
                    old_keys = [k for k in _target_engines if k[0] == ds_id]
                    for k in old_keys:
                        del _target_engines[k]
                if attempt < max_retries:
                    continue  # 重试
            raise RuntimeError(f"DB connection failed for {ds_id}: {e}") from e
        except Exception as e:
            raise RuntimeError(f"Unexpected error for {ds_id}: {e}") from e
        finally:
            if 'conn' in locals() and conn:
                conn.close()