未分类

python 改进版动态数据源管理器

改进版动态数据源管理器
功能特性:
- 支持多数据源并行使用
- 每个数据源配置独立单例
- 连接池按配置隔离管理
- 自适应健康检查
import threading
import time
import hashlib
import pymysql

from contextlib import contextmanager
from typing import Dict, Optional
from dbutils.pooled_db import PooledDB

from common.tools.logger import LogManager

logger = LogManager.get_logger()


class DatasourceManager(object):
    """
    改进版动态数据源管理器
    功能特性:
    - 支持多数据源并行使用
    - 每个数据源配置独立单例
    - 连接池按配置隔离管理
    - 自适应健康检查
    """

    _instance = None
    _lock = threading.RLock()

    def __new__(cls):
        with cls._lock:
            if not cls._instance:
                cls._instance = super().__new__(cls)
                cls._instance.__init()
            return cls._instance

    def __init(self):
        """初始化多数据源存储结构"""
        # 连接池仓库 {config_hash: pool_entry}
        self._pool_store: Dict[str, Dict] = {}

        # 全局健康检查参数
        self.health_check_interval = 300  # 全局检查间隔

    def _get_pool_entry(self, config_hash: str) -> Optional[Dict]:
        """获取连接池条目"""
        return self._pool_store.get(config_hash)

    @staticmethod
    def _get_config_hash(config: Dict) -> str:
        """生成配置唯一标识"""
        unique_str = f"{config['host']}:{config['port']}@{config['database']}"
        return hashlib.sha256(unique_str.encode()).hexdigest()

    def _validate_config(self, config: Dict):
        """验证配置完整性"""
        required = ['host', 'port', 'user', 'password', 'database']
        if missing := [k for k in required if k not in config]:
            raise ValueError(f"缺少必要配置参数: {missing}")

    def _create_pool(self, config: Dict) -> PooledDB:
        """创建新连接池"""
        try:
            return PooledDB(
                creator=pymysql,
                host=config['host'],
                port=config['port'],
                user=config['user'],
                password=config['password'],
                database=config['database'],
                mincached=config.get('mincached', 2),
                maxcached=config.get('maxcached', 5),
                maxconnections=config.get('maxconnections', 20),
                charset='utf8mb4',
                autocommit=False
            )
        except Exception as e:
            logger.error(f"连接池创建失败:{str(e)}")
            raise

    def _register_pool(self, config: Dict) -> PooledDB:
        """注册新连接池"""
        config_hash = self._get_config_hash(config)
        with self._lock:
            if entry := self._get_pool_entry(config_hash):
                return entry['pool']

            new_pool = self._create_pool(config)
            self._pool_store[config_hash] = {
                'pool': new_pool,
                'last_check': 0,
                'config': config.copy()
            }
            logger.info(f"注册新数据源: {config['host']}:{config['database']}")
            return new_pool

    @contextmanager
    def get_connection(self, config: Dict):
        """
        获取指定配置的连接
        :param config: 数据库连接配置字典
        """
        self._validate_config(config)
        config_hash = self._get_config_hash(config)

        # 自动注册连接池
        if not self._get_pool_entry(config_hash):
            self._register_pool(config)

        with self._lock:
            pool_entry = self._pool_store[config_hash]
            pool = pool_entry['pool']

            # 执行健康检查
            if self._needs_check(pool_entry):
                self._health_check(pool_entry)

            conn = None
            try:
                conn = pool.connection()
                yield conn
            except Exception as e:
                logger.error(f"连接获取异常: {str(e)}")
                self._handle_failure(config_hash)
                raise
            finally:
                if conn:
                    conn.close()

    def _needs_check(self, entry: Dict) -> bool:
        """判断是否需要健康检查"""
        elapsed = time.time() - entry['last_check']
        return elapsed > self.health_check_interval

    def _health_check(self, entry: Dict):
        """执行连接池健康检查"""
        logger.debug(f"执行健康检查: {entry['config']['host']}")
        try:
            conn = entry['pool'].connection()
            cursor = conn.cursor()
            cursor.execute("SELECT 1")
            cursor.close()
            conn.close()
            entry['last_check'] = time.time()
        except Exception as e:
            logger.error(f"健康检查失败: {str(e)}")
            self._handle_failure(entry)

    def _handle_failure(self, entry: Dict):
        """处理连接失效"""
        logger.warning(f"清理失效连接池: {entry['config']['host']}")
        with self._lock:
            entry['pool'].close()
            config_hash = self._get_config_hash(entry['config'])
            del self._pool_store[config_hash]

    def close_all(self):
        """关闭所有连接池"""
        with self._lock:
            for config_hash, entry in list(self._pool_store.items()):
                entry['pool'].close()
                del self._pool_store[config_hash]
            logger.info("已关闭所有数据源连接池")
import pymysql

from contextlib import contextmanager
from typing import Dict, Optional, List, Generator

from common.data_tools.dsm import DatasourceManager
from common.tools.logger import LogManager

logger = LogManager.get_logger()


class DBC(object):
    """支持多数据源的通用数据库操作类"""

    def __init__(self, data_source: DatasourceManager, default_config: Dict = None):
        """
        :param data_source: 数据源管理器实例
        :param default_config: 可选默认配置
        """
        self.ds = data_source
        self.default_config = default_config

    def _resolve_config(self, config: Dict = None) -> Dict:
        """解析最终使用的配置"""
        if config is not None:
            return config
        if self.default_config is not None:
            return self.default_config
        raise ValueError("必须指定数据源配置或设置默认配置")

    @contextmanager
    def _get_cursor(self, conn) -> Generator[pymysql.cursors.DictCursor, None, None]:
        """获取字典游标的上下文管理器"""
        cursor = conn.cursor(pymysql.cursors.DictCursor)
        try:
            yield cursor
        finally:
            cursor.close()

    @contextmanager
    def transaction(self, config: Dict = None):
        """事务管理上下文"""
        config = self._resolve_config(config)
        with self.ds.get_connection(config) as conn:
            try:
                yield conn
                conn.commit()
            except Exception as e:
                conn.rollback()
                logger.error(f"事务回滚: {str(e)}")
                raise

    def select_one(self, sql: str, params: tuple = None,
                   config: Dict = None) -> Optional[Dict]:
        """查询单条记录"""
        config = self._resolve_config(config)
        with self.ds.get_connection(config) as conn:
            with self._get_cursor(conn) as cursor:
                try:
                    cursor.execute(sql, params or ())
                    return cursor.fetchone()
                except Exception as e:
                    logger.error(f"查询失败:{sql} | {str(e)}")
                    raise

    def select_all(self, sql: str, params: tuple = None,
                   config: Dict = None) -> List[Dict]:
        """查询多条记录"""
        config = self._resolve_config(config)
        with self.ds.get_connection(config) as conn:
            with self._get_cursor(conn) as cursor:
                try:
                    cursor.execute(sql, params or ())
                    return cursor.fetchall()
                except Exception as e:
                    logger.error(f"批查询失败:{sql} | {str(e)}")
                    raise

    def execute(self, sql: str, params: tuple = None,
                config: Dict = None) -> int:
        """执行写操作(自动提交)"""
        config = self._resolve_config(config)
        with self.ds.get_connection(config) as conn:
            with self._get_cursor(conn) as cursor:
                try:
                    cursor.execute(sql, params or ())
                    conn.commit()
                    return cursor.rowcount
                except Exception as e:
                    conn.rollback()
                    logger.error(f"执行失败:{sql} | {str(e)}")
                    raise

    def batch_execute(self, sql: str, params_list: List[tuple],
                      config: Dict = None) -> int:
        """批量执行操作(事务提交)"""
        config = self._resolve_config(config)
        with self.transaction(config) as conn:
            with self._get_cursor(conn) as cursor:
                total = 0
                try:
                    for params in params_list:
                        cursor.execute(sql, params)
                        total += cursor.rowcount
                    return total
                except Exception as e:
                    logger.error(f"批执行失败:{sql} | {str(e)}")
                    raise

    def execute_in_transaction(self, operations: List[dict],
                               config: Dict = None) -> bool:
        """
        事务中执行多个不同类型操作
        :param operations: 操作字典列表,每个字典格式:
            {'sql': str, 'params': tuple, 'type': 'write/read'}
        :return: 是否全部执行成功
        """
        config = self._resolve_config(config)
        with self.transaction(config) as conn:
            try:
                with self._get_cursor(conn) as cursor:
                    for op in operations:
                        if op.get('type', 'write') == 'read':
                            cursor.execute(op['sql'], op.get('params'))
                            _ = cursor.fetchall()  # 消费结果
                        else:
                            cursor.execute(op['sql'], op.get('params'))
                return True
            except Exception as e:
                logger.error(f"复合操作失败: {str(e)}")
                return False

使用示例:

# 初始化(带默认配置)
db = DBC(DatasourceManager(), default_config=primary_config)

# 简单查询(使用默认数据源)
user = db.select_one("SELECT * FROM users WHERE id=%s", (1001,))

# 跨数据源操作
def cross_domain_update():
    # 从分析数据库读取
    analytics_data = db.select_all(
        "SELECT * FROM user_behavior", 
        config=analytics_config
    )
    
    # 向主数据库写入
    with db.transaction() as conn:  # 使用默认配置
        for data in analytics_data:
            db.execute(
                "UPDATE users SET stats=%s WHERE id=%s",
                (data['stats'], data['user_id']),
                conn=conn  # 使用事务连接
            )
            
    # 同时操作日志数据库
    db.execute(
        "INSERT INTO sync_log (message) VALUES (%s)",
        ("数据同步完成",),
        config=log_db_config
    )

修改亮点解析:

灵活配置传递‌:

  • 方法级config参数覆盖
  • 支持初始化默认配置
# 默认使用config_a,特定操作使用config_b
db = DBC(manager, default_config=config_a)
db.select_one("...")  # 使用config_a
db.execute("...", config=config_b)  # 使用config_b

高效事务控制‌:

  • 通过上下文管理器管理事务生命周期
with db.transaction() as conn:  # 开启事务
    db.execute("...", conn=conn)
    db.execute("...", conn=conn)
# 自动提交/回滚

混合操作支持‌:

  • 单方法内处理多个数据源
def process_multidb():
    # 从数据源A读取
    data = db.select_all("...", config=config_a)
    
    # 向数据源B写入
    db.batch_execute("...", data, config=config_b)
    
    # 在数据源C记录日志
    db.execute("...", config=config_c)

连接复用优化‌:

  • 显式传递conn参数可复用同一连接
with db.transaction(config=special_config) as conn:
    result = db.select_one("...", conn=conn)  # 使用事务连接查询
    db.execute("...", conn=conn)  # 同一连接写操作

智能类型处理‌:

  • 自动识别读取/写入操作
# 在事务中混合读写操作
db.execute_in_transaction([
    {'sql': "SELECT ...", 'type': 'read'},
    {'sql': "UPDATE ...", 'params': (...)}
])

重试规则

import pymysql

import logging
from contextlib import contextmanager
from typing import Dict, Optional, List, Generator
from tenacity import retry, stop_after_attempt, wait_fixed, retry_if_exception_type, before_sleep_log
from common.dbc.dsm import DatasourceManager
from common.tools.logger import LogManager

logger = LogManager.get_logger()

# 定义需要重试的异常类型
RETRYABLE_EXCEPTIONS = (
    pymysql.OperationalError,
    pymysql.InterfaceError,
    pymysql.InternalError,
    ConnectionError,
    TimeoutError
)

# 重试装饰器配置
db_retry = retry(
    stop=stop_after_attempt(3),  # 最多重试3次
    wait=wait_fixed(1),  # 每次间隔1秒
    retry=retry_if_exception_type(RETRYABLE_EXCEPTIONS),
    before_sleep=before_sleep_log(logger, logging.WARNING),  # 重试前记录日志
    reraise=True  # 重新抛出原始异常
)


class DBC(object):
    """支持多数据源的通用数据库操作类"""

    def __init__(self, default_config: Dict = None):
        """
        :param default_config: 可选默认配置
        """
        self.ds = DatasourceManager()
        self.default_config = default_config

    def _resolve_config(self, config: Dict = None) -> Dict:
        """解析最终使用的配置"""
        if config is not None:
            return config
        if self.default_config is not None:
            return self.default_config
        raise ValueError("必须指定数据源配置或设置默认配置")

    @contextmanager
    def _get_cursor(self, conn) -> Generator[pymysql.cursors.DictCursor, None, None]:
        """获取字典游标的上下文管理器"""
        cursor = conn.cursor(pymysql.cursors.DictCursor)
        try:
            yield cursor
        finally:
            cursor.close()

    @contextmanager
    @db_retry
    def transaction(self, config: Dict = None):
        """事务管理上下文"""
        config = self._resolve_config(config)
        with self.ds.get_connection(config) as conn:
            try:
                yield conn
                conn.commit()
            except Exception as e:
                conn.rollback()
                logger.error(f"事务回滚: {str(e)}")
                raise

    @db_retry
    def select_one(self, sql: str, params: tuple = None, config: Dict = None) -> Optional[Dict]:
        """查询单条记录"""
        config = self._resolve_config(config)
        with self.ds.get_connection(config) as conn:
            with self._get_cursor(conn) as cursor:
                try:
                    cursor.execute(sql, params or ())
                    return cursor.fetchone()
                except Exception as e:
                    logger.error(f"查询失败:{sql} | {str(e)}")
                    raise

    @db_retry
    def select_all(self, sql: str, params: tuple = None, config: Dict = None) -> List[Dict]:
        """查询多条记录"""
        config = self._resolve_config(config)
        with self.ds.get_connection(config) as conn:
            with self._get_cursor(conn) as cursor:
                try:
                    cursor.execute(sql, params or ())
                    return cursor.fetchall()
                except Exception as e:
                    logger.error(f"批查询失败:{sql} | {str(e)}")
                    raise

    @db_retry
    def execute(self, sql: str, params: tuple = None, config: Dict = None) -> int:
        """执行写操作(自动提交)"""
        config = self._resolve_config(config)
        with self.ds.get_connection(config) as conn:
            with self._get_cursor(conn) as cursor:
                try:
                    cursor.execute(sql, params or ())
                    conn.commit()
                    return cursor.rowcount
                except Exception as e:
                    conn.rollback()
                    logger.error(f"执行失败:{sql} | {str(e)}")
                    raise

    @db_retry
    def batch_execute(self, sql: str, params_list: List[tuple], config: Dict = None) -> int:
        """批量执行操作(事务提交)"""
        config = self._resolve_config(config)
        with self.transaction(config) as conn:
            with self._get_cursor(conn) as cursor:
                total = 0
                try:
                    for params in params_list:
                        cursor.execute(sql, params)
                        total += cursor.rowcount
                    return total
                except Exception as e:
                    logger.error(f"批执行失败:{sql} | {str(e)}")
                    raise

    @db_retry
    def execute_in_transaction(self, operations: List[dict], config: Dict = None) -> bool:
        """
        事务中执行多个不同类型操作
        :param operations: 操作字典列表,每个字典格式:
            {'sql': str, 'params': tuple, 'type': 'write/read'}
        :param config:
        :return: 是否全部执行成功
        """
        config = self._resolve_config(config)
        with self.transaction(config) as conn:
            try:
                with self._get_cursor(conn) as cursor:
                    for op in operations:
                        if op.get('type', 'write') == 'read':
                            cursor.execute(op['sql'], op.get('params'))
                            _ = cursor.fetchall()  # 消费结果
                        else:
                            cursor.execute(op['sql'], op.get('params'))
                return True
            except Exception as e:
                logger.error(f"复合操作失败: {str(e)}")
                return False

    def insert_dict_data(self, table_name, data: dict):
        """
        table_name: 目标表名
        data: dict类型数据 data中key对应 table_name指定表的表字段
        """
        items = list(data.items())
        fields = [f"`{item[0]}`" for item in items]
        fields_str = ", ".join(fields)
        v_list = ["%s"] * len(fields)
        v_list_str = ", ".join(v_list)
        sql = f"""
        INSERT INTO `{table_name}` ({fields_str}) VALUES ({v_list_str})
        """
        values = tuple([item[1] for item in items])
        self.execute(sql, values)

    @db_retry
    def insert_batch_dict_data(self, table_name, data_list, config: Dict = None):
        """批量插入数据
        table_name: 目标表名
        data_list: [dict_data1, dict_dat2]  dict_data 中key对应 table_name指定表的表字段
        """
        if not data_list:
            return {'result': False, 'err': 'Empty data list'}

        fields = list(data_list[0].keys())
        fields_str = ", ".join(f"`{f}`" for f in fields)
        placeholders = ", ".join(["%s"] * len(fields))
        sql = f"INSERT INTO `{table_name}` ({fields_str}) VALUES ({placeholders})"

        values = [tuple(item[f] for f in fields) for item in data_list]
        config = self._resolve_config(config)
        with self.ds.get_connection(config) as conn:
            with self._get_cursor(conn) as cursor:
                try:
                    cursor.executemany(sql, values)
                    conn.commit()
                    return {'result': True, 'affected_rows': cursor.rowcount}
                except Exception as err:
                    conn.rollback()
                    return {'result': False, 'err': str(err)}

    def get_fields(self, table_name: str) -> list:
        sql = f"SELECT * FROM {table_name} LIMIT 1"
        result = self.select_one(sql)
        if result is None:
            return []
        else:
            return list(result.keys())