未分类

python 改进版动态数据源管理器

改进版动态数据源管理器
功能特性:
- 支持多数据源并行使用
- 每个数据源配置独立单例
- 连接池按配置隔离管理
- 自适应健康检查
import threading
import time
import hashlib
import pymysql

from contextlib import contextmanager
from typing import Dict, Optional
from dbutils.pooled_db import PooledDB

from common.tools.logger import LogManager

logger = LogManager.get_logger()


class DatasourceManager(object):
    """
    改进版动态数据源管理器
    功能特性:
    - 支持多数据源并行使用
    - 每个数据源配置独立单例
    - 连接池按配置隔离管理
    - 自适应健康检查
    """

    _instance = None
    _lock = threading.RLock()

    def __new__(cls):
        with cls._lock:
            if not cls._instance:
                cls._instance = super().__new__(cls)
                cls._instance.__init()
            return cls._instance

    def __init(self):
        """初始化多数据源存储结构"""
        # 连接池仓库 {config_hash: pool_entry}
        self._pool_store: Dict[str, Dict] = {}

        # 全局健康检查参数
        self.health_check_interval = 300  # 全局检查间隔

    def _get_pool_entry(self, config_hash: str) -> Optional[Dict]:
        """获取连接池条目"""
        return self._pool_store.get(config_hash)

    @staticmethod
    def _get_config_hash(config: Dict) -> str:
        """生成配置唯一标识"""
        unique_str = f"{config['host']}:{config['port']}@{config['database']}"
        return hashlib.sha256(unique_str.encode()).hexdigest()

    def _validate_config(self, config: Dict):
        """验证配置完整性"""
        required = ['host', 'port', 'user', 'password', 'database']
        if missing := [k for k in required if k not in config]:
            raise ValueError(f"缺少必要配置参数: {missing}")

    def _create_pool(self, config: Dict) -> PooledDB:
        """创建新连接池"""
        try:
            return PooledDB(
                creator=pymysql,
                host=config['host'],
                port=config['port'],
                user=config['user'],
                password=config['password'],
                database=config['database'],
                mincached=config.get('mincached', 2),
                maxcached=config.get('maxcached', 5),
                maxconnections=config.get('maxconnections', 20),
                charset='utf8mb4',
                autocommit=False
            )
        except Exception as e:
            logger.error(f"连接池创建失败:{str(e)}")
            raise

    def _register_pool(self, config: Dict) -> PooledDB:
        """注册新连接池"""
        config_hash = self._get_config_hash(config)
        with self._lock:
            if entry := self._get_pool_entry(config_hash):
                return entry['pool']

            new_pool = self._create_pool(config)
            self._pool_store[config_hash] = {
                'pool': new_pool,
                'last_check': 0,
                'config': config.copy()
            }
            logger.info(f"注册新数据源: {config['host']}:{config['database']}")
            return new_pool

    @contextmanager
    def get_connection(self, config: Dict):
        """
        获取指定配置的连接
        :param config: 数据库连接配置字典
        """
        self._validate_config(config)
        config_hash = self._get_config_hash(config)

        # 自动注册连接池
        if not self._get_pool_entry(config_hash):
            self._register_pool(config)

        with self._lock:
            pool_entry = self._pool_store[config_hash]
            pool = pool_entry['pool']

            # 执行健康检查
            if self._needs_check(pool_entry):
                self._health_check(pool_entry)

            conn = None
            try:
                conn = pool.connection()
                yield conn
            except Exception as e:
                logger.error(f"连接获取异常: {str(e)}")
                self._handle_failure(config_hash)
                raise
            finally:
                if conn:
                    conn.close()

    def _needs_check(self, entry: Dict) -> bool:
        """判断是否需要健康检查"""
        elapsed = time.time() - entry['last_check']
        return elapsed > self.health_check_interval

    def _health_check(self, entry: Dict):
        """执行连接池健康检查"""
        logger.debug(f"执行健康检查: {entry['config']['host']}")
        try:
            conn = entry['pool'].connection()
            cursor = conn.cursor()
            cursor.execute("SELECT 1")
            cursor.close()
            conn.close()
            entry['last_check'] = time.time()
        except Exception as e:
            logger.error(f"健康检查失败: {str(e)}")
            self._handle_failure(entry)

    def _handle_failure(self, entry: Dict):
        """处理连接失效"""
        logger.warning(f"清理失效连接池: {entry['config']['host']}")
        with self._lock:
            entry['pool'].close()
            config_hash = self._get_config_hash(entry['config'])
            del self._pool_store[config_hash]

    def close_all(self):
        """关闭所有连接池"""
        with self._lock:
            for config_hash, entry in list(self._pool_store.items()):
                entry['pool'].close()
                del self._pool_store[config_hash]
            logger.info("已关闭所有数据源连接池")
import pymysql

from contextlib import contextmanager
from typing import Dict, Optional, List, Generator

from common.data_tools.dsm import DatasourceManager
from common.tools.logger import LogManager

logger = LogManager.get_logger()


class DBC(object):
    """支持多数据源的通用数据库操作类"""

    def __init__(self, data_source: DatasourceManager, default_config: Dict = None):
        """
        :param data_source: 数据源管理器实例
        :param default_config: 可选默认配置
        """
        self.ds = data_source
        self.default_config = default_config

    def _resolve_config(self, config: Dict = None) -> Dict:
        """解析最终使用的配置"""
        if config is not None:
            return config
        if self.default_config is not None:
            return self.default_config
        raise ValueError("必须指定数据源配置或设置默认配置")

    @contextmanager
    def _get_cursor(self, conn) -> Generator[pymysql.cursors.DictCursor, None, None]:
        """获取字典游标的上下文管理器"""
        cursor = conn.cursor(pymysql.cursors.DictCursor)
        try:
            yield cursor
        finally:
            cursor.close()

    @contextmanager
    def transaction(self, config: Dict = None):
        """事务管理上下文"""
        config = self._resolve_config(config)
        with self.ds.get_connection(config) as conn:
            try:
                yield conn
                conn.commit()
            except Exception as e:
                conn.rollback()
                logger.error(f"事务回滚: {str(e)}")
                raise

    def select_one(self, sql: str, params: tuple = None,
                   config: Dict = None) -> Optional[Dict]:
        """查询单条记录"""
        config = self._resolve_config(config)
        with self.ds.get_connection(config) as conn:
            with self._get_cursor(conn) as cursor:
                try:
                    cursor.execute(sql, params or ())
                    return cursor.fetchone()
                except Exception as e:
                    logger.error(f"查询失败:{sql} | {str(e)}")
                    raise

    def select_all(self, sql: str, params: tuple = None,
                   config: Dict = None) -> List[Dict]:
        """查询多条记录"""
        config = self._resolve_config(config)
        with self.ds.get_connection(config) as conn:
            with self._get_cursor(conn) as cursor:
                try:
                    cursor.execute(sql, params or ())
                    return cursor.fetchall()
                except Exception as e:
                    logger.error(f"批查询失败:{sql} | {str(e)}")
                    raise

    def execute(self, sql: str, params: tuple = None,
                config: Dict = None) -> int:
        """执行写操作(自动提交)"""
        config = self._resolve_config(config)
        with self.ds.get_connection(config) as conn:
            with self._get_cursor(conn) as cursor:
                try:
                    cursor.execute(sql, params or ())
                    conn.commit()
                    return cursor.rowcount
                except Exception as e:
                    conn.rollback()
                    logger.error(f"执行失败:{sql} | {str(e)}")
                    raise

    def batch_execute(self, sql: str, params_list: List[tuple],
                      config: Dict = None) -> int:
        """批量执行操作(事务提交)"""
        config = self._resolve_config(config)
        with self.transaction(config) as conn:
            with self._get_cursor(conn) as cursor:
                total = 0
                try:
                    for params in params_list:
                        cursor.execute(sql, params)
                        total += cursor.rowcount
                    return total
                except Exception as e:
                    logger.error(f"批执行失败:{sql} | {str(e)}")
                    raise

    def execute_in_transaction(self, operations: List[dict],
                               config: Dict = None) -> bool:
        """
        事务中执行多个不同类型操作
        :param operations: 操作字典列表,每个字典格式:
            {'sql': str, 'params': tuple, 'type': 'write/read'}
        :return: 是否全部执行成功
        """
        config = self._resolve_config(config)
        with self.transaction(config) as conn:
            try:
                with self._get_cursor(conn) as cursor:
                    for op in operations:
                        if op.get('type', 'write') == 'read':
                            cursor.execute(op['sql'], op.get('params'))
                            _ = cursor.fetchall()  # 消费结果
                        else:
                            cursor.execute(op['sql'], op.get('params'))
                return True
            except Exception as e:
                logger.error(f"复合操作失败: {str(e)}")
                return False

使用示例:

# 初始化(带默认配置)
db = DBC(DatasourceManager(), default_config=primary_config)

# 简单查询(使用默认数据源)
user = db.select_one("SELECT * FROM users WHERE id=%s", (1001,))

# 跨数据源操作
def cross_domain_update():
    # 从分析数据库读取
    analytics_data = db.select_all(
        "SELECT * FROM user_behavior", 
        config=analytics_config
    )
    
    # 向主数据库写入
    with db.transaction() as conn:  # 使用默认配置
        for data in analytics_data:
            db.execute(
                "UPDATE users SET stats=%s WHERE id=%s",
                (data['stats'], data['user_id']),
                conn=conn  # 使用事务连接
            )
            
    # 同时操作日志数据库
    db.execute(
        "INSERT INTO sync_log (message) VALUES (%s)",
        ("数据同步完成",),
        config=log_db_config
    )

修改亮点解析:

灵活配置传递‌:

  • 方法级config参数覆盖
  • 支持初始化默认配置
    # 默认使用config_a,特定操作使用config_b
    db = DBC(manager, default_config=config_a)
    db.select_one("...")  # 使用config_a
    db.execute("...", config=config_b)  # 使用config_b

    高效事务控制‌:

    • 通过上下文管理器管理事务生命周期
    with db.transaction() as conn:  # 开启事务
        db.execute("...", conn=conn)
        db.execute("...", conn=conn)
    # 自动提交/回滚

    混合操作支持‌:

    • 单方法内处理多个数据源
    def process_multidb():
        # 从数据源A读取
        data = db.select_all("...", config=config_a)
        
        # 向数据源B写入
        db.batch_execute("...", data, config=config_b)
        
        # 在数据源C记录日志
        db.execute("...", config=config_c)

    连接复用优化‌:

    • 显式传递conn参数可复用同一连接
    with db.transaction(config=special_config) as conn:
        result = db.select_one("...", conn=conn)  # 使用事务连接查询
        db.execute("...", conn=conn)  # 同一连接写操作

    智能类型处理‌:

    • 自动识别读取/写入操作
    # 在事务中混合读写操作
    db.execute_in_transaction([
        {'sql': "SELECT ...", 'type': 'read'},
        {'sql': "UPDATE ...", 'params': (...)}
    ])