大数据

python mysql连接池操作

import logging
import threading
import time
import hashlib
from contextlib import contextmanager

import pymysql
from typing import Dict, Any, Optional, List
from dbutils.pooled_db import PooledDB

# 配置日志记录
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler("database.log"),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger('DBManager')


class DynamicDataSourceManager(object):
    """
    动态数据源管理器
    功能特性:
    - 支持多数据源动态切换
    - 连接池管理(基于dbutils.pooled_db)
    - 线程安全设计
    - 配置自动缓存
    - 健康检查机制
    """

    _instance = None  # 单例模式
    _lock = threading.RLock()  # 可重入锁

    def __new__(cls):
        with cls._lock:
            if not cls._instance:
                cls._instance = super().__new__(cls)
                cls._instance._init_()
            return cls._instance

    def _init_(self):
        """初始化管理器"""
        # 当前生效配置
        self._active_config: Optional[Dict[str, Any]] = None
        # 连接池实例
        self._connection_pool: Optional[PooledDB] = None
        # 配置缓存(使用SHA256哈希)
        self._config_cache: Dict[str, PooledDB] = {}

        # 健康检查参数
        self.health_check_interval = 300  # 秒
        self.last_health_check = 0

    def _create_pool(self, config: Dict[str, Any]) -> PooledDB:
        """创建新连接池"""
        try:
            return PooledDB(
                creator=pymysql,
                host=config['host'],
                port=config['port'],
                user=config['user'],
                password=config['password'],
                database=config['database'],
                mincached=config.get('mincached', 2),
                maxcached=config.get('maxcached', 5),
                maxconnections=config.get('maxconnections', 20),
                charset='utf8mb4',
                autocommit=False
            )
        except Exception as e:
            logger.error(f"连接池创建失败:{str(e)}")
            raise

    @classmethod
    def _get_config_hash(cls, config: Dict) -> str:
        """生成配置哈希值"""
        return hashlib.sha256(
            f"{config['host']}:{config['port']}@{config['database']}".encode()
        ).hexdigest()

    def configure(self, new_config: Dict[str, Any]):
        """
        更新数据源配置
        :param new_config: 包含完整连接参数的新配置
        """
        with self._lock:  # 线程安全操作
            # 参数校验
            required_keys = ['host', 'port', 'user', 'password', 'database']
            if not all(k in new_config for k in required_keys):
                raise ValueError("缺少必要配置参数")

            config_hash = self._get_config_hash(new_config)
            # 配置未变化时跳过
            if self._active_config and self._get_config_hash(self._active_config) == config_hash:
                logger.info("配置未发生变化")
                return

            # 关闭旧连接池
            if self._connection_pool:
                self._connection_pool.close()
                logger.info("旧连接池已关闭")

            # 创建或获取连接池
            if config_hash in self._config_cache:
                logger.info("复用缓存连接池")
                self._connection_pool = self._config_cache[config_hash]
            else:
                self._connection_pool = self._create_pool(new_config)
                self._config_cache[config_hash] = self._connection_pool
                logger.info(f"新建连接池:{new_config['host']}")

            # 更新当前配置
            self._active_config = new_config.copy()

            # 执行初始健康检查
            self._perform_health_check()

    @contextmanager
    def get_connection(self):
        """获取数据库连接"""
        with self._lock:
            if not self._connection_pool:
                raise RuntimeError("连接池未初始化")
            # 定期健康检查
            if self._needs_health_check():
                self._perform_health_check()

            conn = None
            try:
                conn = self._connection_pool.connection()
                yield conn
            except Exception as e:
                logger.error(f"获取连接失败:{str(e)}")
                raise
            finally:
                if conn:
                    conn.close()

    def _needs_health_check(self):
        return (time.time() - self.last_health_check) > self.health_check_interval

    def _perform_health_check(self):
        """执行健康检查"""
        try:
            conn = self._connection_pool.connection()
            cursor = conn.cursor()
            cursor.execute("SELECT 1")
            cursor.close()
            conn.close()
            self.last_health_check = time.time()
            logger.debug("健康检查通过")
        except Exception as e:
            logger.error(f"健康检查失败: {str(e)}")
            self._handle_connection_failure()

    def _handle_connection_failure(self):
        """连接失败处理"""
        # 清理当前连接池
        if self._connection_pool:
            self._connection_pool.close()
            self._connection_pool = None
        # 尝试重新连接
        if self._active_config:
            self.configure(self._active_config)

    @classmethod
    def load_config_from_db(cls, config_name: str) -> Dict[str, Any]:
        """
        从数据库加载配置(需提前建立系统数据源连接)
        """
        # 示例实现,实际需根据数据源表结构调整
        # sys_conn = mysql.connector.connect(
        #     host="system_db_host",
        #     user="admin",
        #     password="admin_pass",
        #     database="config_db"
        # )
        # cursor = sys_conn.cursor(dictionary=True)
        # cursor.execute(
        #     "SELECT * FROM datasource WHERE name=%s",
        #     (config_name,)
        # )
        # config = cursor.fetchone()
        # cursor.close()
        # sys_conn.close()
        # return config
        return {
            'host': '192.168.0.130',
            'port': 3306,
            'user': 'task_center',
            'password': 'task_center',
            'database': 'task_center_dev',
            'mincached': 3,
            'maxconnections': 15
        }


class BaseCRUD(object):
    """通用数据库操作"""

    def __init__(self, data_source: DynamicDataSourceManager):
        self.ds = data_source

    @contextmanager
    def _get_cursor(self, conn):
        cursor = conn.cursor(pymysql.cursors.DictCursor)
        try:
            yield cursor
        finally:
            cursor.close()

    def select_one(self, sql: str, params: tuple = None) -> Optional[Dict]:
        """查询单条记录"""
        with self.ds.get_connection() as conn:
            with self._get_cursor(conn) as cursor:
                try:
                    cursor.execute(sql, params or ())
                    result = cursor.fetchone()
                    return result if result else None
                except Exception as e:
                    logger.error(f"查询失败:{sql} | {str(e)}")
                    raise

    def select_all(self, sql: str, params: tuple = None) -> List[Dict]:
        """查询多条记录"""
        with self.ds.get_connection() as conn:
            with self._get_cursor(conn) as cursor:
                try:
                    cursor.execute(sql, params or ())
                    return [row for row in cursor.fetchall()]
                except Exception as e:
                    logger.error(f"批查询失败:{sql} | {str(e)}")
                    raise

    def execute(self, sql: str, params: tuple = None) -> int:
        """执行写操作"""
        with self.ds.get_connection() as conn:
            with self._get_cursor(conn) as cursor:
                try:
                    cursor.execute(sql, params or ())
                    conn.commit()
                    return cursor.rowcount
                except Exception as e:
                    conn.rollback()
                    logger.error(f"执行失败:{sql} | {str(e)}")
                    raise

    def batch_execute(self, sql: str, params_list: List[tuple]) -> int:
        """批量执行操作"""
        with self.ds.get_connection() as conn:
            with self._get_cursor(conn) as cursor:
                total = 0
                try:
                    for params in params_list:
                        cursor.execute(sql, params)
                        total += cursor.rowcount
                    conn.commit()
                    return total
                except Exception as e:
                    conn.rollback()
                    logger.error(f"批执行失败:{sql} | {str(e)}")
                    raise


class UserCRUD(BaseCRUD):
    def get_by_email(self, email: str) -> Optional[Dict]:
        return self.select_one(
            "SELECT * FROM users WHERE email = %s",
            (email,)
        )

    def create_user(self, name: str, email: str) -> int:
        return self.execute(
            "INSERT INTO users (name, email) VALUES (%s, %s)",
            (name, email)
        )

    def get_all_users(self):
        return self.select_all(
            "SELECT * FROM users"
        )


if __name__ == '__main__':
    # 初始配置
    initial_config = {
        'host': 'xxxx.xxx.xxx.xxx',
        'port': 3306,
        'user': 'xxx',
        'password': 'xxx',
        'database': 'xxx',
        'mincached': 3,
        'maxconnections': 15
    }

    manager = DynamicDataSourceManager()
    manager.configure(initial_config)

    user_crud = UserCRUD(manager)

    # 事务操作
    # res = user_crud.execute(
    #     "INSERT INTO users (name, email) VALUES (%s, %s)",
    #     ("TOM", "tom@qq.com")
    # )
    # res = user_crud.get_by_email("bob@qq.com")
    # print(res)
    res = user_crud.get_all_users()
    print(res)