文章
python mysql连接池操作
import logging
import threading
import time
import hashlib
from contextlib import contextmanager
import pymysql
from typing import Dict, Any, Optional, List
from dbutils.pooled_db import PooledDB
# 配置日志记录
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("database.log"),
logging.StreamHandler()
]
)
logger = logging.getLogger('DBManager')
class DynamicDataSourceManager(object):
"""
动态数据源管理器
功能特性:
- 支持多数据源动态切换
- 连接池管理(基于dbutils.pooled_db)
- 线程安全设计
- 配置自动缓存
- 健康检查机制
"""
_instance = None # 单例模式
_lock = threading.RLock() # 可重入锁
def __new__(cls):
with cls._lock:
if not cls._instance:
cls._instance = super().__new__(cls)
cls._instance._init_()
return cls._instance
def _init_(self):
"""初始化管理器"""
# 当前生效配置
self._active_config: Optional[Dict[str, Any]] = None
# 连接池实例
self._connection_pool: Optional[PooledDB] = None
# 配置缓存(使用SHA256哈希)
self._config_cache: Dict[str, PooledDB] = {}
# 健康检查参数
self.health_check_interval = 300 # 秒
self.last_health_check = 0
def _create_pool(self, config: Dict[str, Any]) -> PooledDB:
"""创建新连接池"""
try:
return PooledDB(
creator=pymysql,
host=config['host'],
port=config['port'],
user=config['user'],
password=config['password'],
database=config['database'],
mincached=config.get('mincached', 2),
maxcached=config.get('maxcached', 5),
maxconnections=config.get('maxconnections', 20),
charset='utf8mb4',
autocommit=False
)
except Exception as e:
logger.error(f"连接池创建失败:{str(e)}")
raise
@classmethod
def _get_config_hash(cls, config: Dict) -> str:
"""生成配置哈希值"""
return hashlib.sha256(
f"{config['host']}:{config['port']}@{config['database']}".encode()
).hexdigest()
def configure(self, new_config: Dict[str, Any]):
"""
更新数据源配置
:param new_config: 包含完整连接参数的新配置
"""
with self._lock: # 线程安全操作
# 参数校验
required_keys = ['host', 'port', 'user', 'password', 'database']
if not all(k in new_config for k in required_keys):
raise ValueError("缺少必要配置参数")
config_hash = self._get_config_hash(new_config)
# 配置未变化时跳过
if self._active_config and self._get_config_hash(self._active_config) == config_hash:
logger.info("配置未发生变化")
return
# 关闭旧连接池
if self._connection_pool:
self._connection_pool.close()
logger.info("旧连接池已关闭")
# 创建或获取连接池
if config_hash in self._config_cache:
logger.info("复用缓存连接池")
self._connection_pool = self._config_cache[config_hash]
else:
self._connection_pool = self._create_pool(new_config)
self._config_cache[config_hash] = self._connection_pool
logger.info(f"新建连接池:{new_config['host']}")
# 更新当前配置
self._active_config = new_config.copy()
# 执行初始健康检查
self._perform_health_check()
@contextmanager
def get_connection(self):
"""获取数据库连接"""
with self._lock:
if not self._connection_pool:
raise RuntimeError("连接池未初始化")
# 定期健康检查
if self._needs_health_check():
self._perform_health_check()
conn = None
try:
conn = self._connection_pool.connection()
yield conn
except Exception as e:
logger.error(f"获取连接失败:{str(e)}")
raise
finally:
if conn:
conn.close()
def _needs_health_check(self):
return (time.time() - self.last_health_check) > self.health_check_interval
def _perform_health_check(self):
"""执行健康检查"""
try:
conn = self._connection_pool.connection()
cursor = conn.cursor()
cursor.execute("SELECT 1")
cursor.close()
conn.close()
self.last_health_check = time.time()
logger.debug("健康检查通过")
except Exception as e:
logger.error(f"健康检查失败: {str(e)}")
self._handle_connection_failure()
def _handle_connection_failure(self):
"""连接失败处理"""
# 清理当前连接池
if self._connection_pool:
self._connection_pool.close()
self._connection_pool = None
# 尝试重新连接
if self._active_config:
self.configure(self._active_config)
@classmethod
def load_config_from_db(cls, config_name: str) -> Dict[str, Any]:
"""
从数据库加载配置(需提前建立系统数据源连接)
"""
# 示例实现,实际需根据数据源表结构调整
# sys_conn = mysql.connector.connect(
# host="system_db_host",
# user="admin",
# password="admin_pass",
# database="config_db"
# )
# cursor = sys_conn.cursor(dictionary=True)
# cursor.execute(
# "SELECT * FROM datasource WHERE name=%s",
# (config_name,)
# )
# config = cursor.fetchone()
# cursor.close()
# sys_conn.close()
# return config
return {
'host': '192.168.0.130',
'port': 3306,
'user': 'task_center',
'password': 'task_center',
'database': 'task_center_dev',
'mincached': 3,
'maxconnections': 15
}
class BaseCRUD(object):
"""通用数据库操作"""
def __init__(self, data_source: DynamicDataSourceManager):
self.ds = data_source
@contextmanager
def _get_cursor(self, conn):
cursor = conn.cursor(pymysql.cursors.DictCursor)
try:
yield cursor
finally:
cursor.close()
def select_one(self, sql: str, params: tuple = None) -> Optional[Dict]:
"""查询单条记录"""
with self.ds.get_connection() as conn:
with self._get_cursor(conn) as cursor:
try:
cursor.execute(sql, params or ())
result = cursor.fetchone()
return result if result else None
except Exception as e:
logger.error(f"查询失败:{sql} | {str(e)}")
raise
def select_all(self, sql: str, params: tuple = None) -> List[Dict]:
"""查询多条记录"""
with self.ds.get_connection() as conn:
with self._get_cursor(conn) as cursor:
try:
cursor.execute(sql, params or ())
return [row for row in cursor.fetchall()]
except Exception as e:
logger.error(f"批查询失败:{sql} | {str(e)}")
raise
def execute(self, sql: str, params: tuple = None) -> int:
"""执行写操作"""
with self.ds.get_connection() as conn:
with self._get_cursor(conn) as cursor:
try:
cursor.execute(sql, params or ())
conn.commit()
return cursor.rowcount
except Exception as e:
conn.rollback()
logger.error(f"执行失败:{sql} | {str(e)}")
raise
def batch_execute(self, sql: str, params_list: List[tuple]) -> int:
"""批量执行操作"""
with self.ds.get_connection() as conn:
with self._get_cursor(conn) as cursor:
total = 0
try:
for params in params_list:
cursor.execute(sql, params)
total += cursor.rowcount
conn.commit()
return total
except Exception as e:
conn.rollback()
logger.error(f"批执行失败:{sql} | {str(e)}")
raise
class UserCRUD(BaseCRUD):
def get_by_email(self, email: str) -> Optional[Dict]:
return self.select_one(
"SELECT * FROM users WHERE email = %s",
(email,)
)
def create_user(self, name: str, email: str) -> int:
return self.execute(
"INSERT INTO users (name, email) VALUES (%s, %s)",
(name, email)
)
def get_all_users(self):
return self.select_all(
"SELECT * FROM users"
)
if __name__ == '__main__':
# 初始配置
initial_config = {
'host': 'xxxx.xxx.xxx.xxx',
'port': 3306,
'user': 'xxx',
'password': 'xxx',
'database': 'xxx',
'mincached': 3,
'maxconnections': 15
}
manager = DynamicDataSourceManager()
manager.configure(initial_config)
user_crud = UserCRUD(manager)
# 事务操作
# res = user_crud.execute(
# "INSERT INTO users (name, email) VALUES (%s, %s)",
# ("TOM", "tom@qq.com")
# )
# res = user_crud.get_by_email("bob@qq.com")
# print(res)
res = user_crud.get_all_users()
print(res)