文章
python 改进版动态数据源管理器
改进版动态数据源管理器
功能特性:
- 支持多数据源并行使用
- 每个数据源配置独立单例
- 连接池按配置隔离管理
- 自适应健康检查
import threading
import time
import hashlib
import pymysql
from contextlib import contextmanager
from typing import Dict, Optional
from dbutils.pooled_db import PooledDB
from common.tools.logger import LogManager
logger = LogManager.get_logger()
class DatasourceManager(object):
"""
改进版动态数据源管理器
功能特性:
- 支持多数据源并行使用
- 每个数据源配置独立单例
- 连接池按配置隔离管理
- 自适应健康检查
"""
_instance = None
_lock = threading.RLock()
def __new__(cls):
with cls._lock:
if not cls._instance:
cls._instance = super().__new__(cls)
cls._instance.__init()
return cls._instance
def __init(self):
"""初始化多数据源存储结构"""
# 连接池仓库 {config_hash: pool_entry}
self._pool_store: Dict[str, Dict] = {}
# 全局健康检查参数
self.health_check_interval = 300 # 全局检查间隔
def _get_pool_entry(self, config_hash: str) -> Optional[Dict]:
"""获取连接池条目"""
return self._pool_store.get(config_hash)
@staticmethod
def _get_config_hash(config: Dict) -> str:
"""生成配置唯一标识"""
unique_str = f"{config['host']}:{config['port']}@{config['database']}"
return hashlib.sha256(unique_str.encode()).hexdigest()
def _validate_config(self, config: Dict):
"""验证配置完整性"""
required = ['host', 'port', 'user', 'password', 'database']
if missing := [k for k in required if k not in config]:
raise ValueError(f"缺少必要配置参数: {missing}")
def _create_pool(self, config: Dict) -> PooledDB:
"""创建新连接池"""
try:
return PooledDB(
creator=pymysql,
host=config['host'],
port=config['port'],
user=config['user'],
password=config['password'],
database=config['database'],
mincached=config.get('mincached', 2),
maxcached=config.get('maxcached', 5),
maxconnections=config.get('maxconnections', 20),
charset='utf8mb4',
autocommit=False
)
except Exception as e:
logger.error(f"连接池创建失败:{str(e)}")
raise
def _register_pool(self, config: Dict) -> PooledDB:
"""注册新连接池"""
config_hash = self._get_config_hash(config)
with self._lock:
if entry := self._get_pool_entry(config_hash):
return entry['pool']
new_pool = self._create_pool(config)
self._pool_store[config_hash] = {
'pool': new_pool,
'last_check': 0,
'config': config.copy()
}
logger.info(f"注册新数据源: {config['host']}:{config['database']}")
return new_pool
@contextmanager
def get_connection(self, config: Dict):
"""
获取指定配置的连接
:param config: 数据库连接配置字典
"""
self._validate_config(config)
config_hash = self._get_config_hash(config)
# 自动注册连接池
if not self._get_pool_entry(config_hash):
self._register_pool(config)
with self._lock:
pool_entry = self._pool_store[config_hash]
pool = pool_entry['pool']
# 执行健康检查
if self._needs_check(pool_entry):
self._health_check(pool_entry)
conn = None
try:
conn = pool.connection()
yield conn
except Exception as e:
logger.error(f"连接获取异常: {str(e)}")
self._handle_failure(config_hash)
raise
finally:
if conn:
conn.close()
def _needs_check(self, entry: Dict) -> bool:
"""判断是否需要健康检查"""
elapsed = time.time() - entry['last_check']
return elapsed > self.health_check_interval
def _health_check(self, entry: Dict):
"""执行连接池健康检查"""
logger.debug(f"执行健康检查: {entry['config']['host']}")
try:
conn = entry['pool'].connection()
cursor = conn.cursor()
cursor.execute("SELECT 1")
cursor.close()
conn.close()
entry['last_check'] = time.time()
except Exception as e:
logger.error(f"健康检查失败: {str(e)}")
self._handle_failure(entry)
def _handle_failure(self, entry: Dict):
"""处理连接失效"""
logger.warning(f"清理失效连接池: {entry['config']['host']}")
with self._lock:
entry['pool'].close()
config_hash = self._get_config_hash(entry['config'])
del self._pool_store[config_hash]
def close_all(self):
"""关闭所有连接池"""
with self._lock:
for config_hash, entry in list(self._pool_store.items()):
entry['pool'].close()
del self._pool_store[config_hash]
logger.info("已关闭所有数据源连接池")
import pymysql
from contextlib import contextmanager
from typing import Dict, Optional, List, Generator
from common.data_tools.dsm import DatasourceManager
from common.tools.logger import LogManager
logger = LogManager.get_logger()
class DBC(object):
"""支持多数据源的通用数据库操作类"""
def __init__(self, data_source: DatasourceManager, default_config: Dict = None):
"""
:param data_source: 数据源管理器实例
:param default_config: 可选默认配置
"""
self.ds = data_source
self.default_config = default_config
def _resolve_config(self, config: Dict = None) -> Dict:
"""解析最终使用的配置"""
if config is not None:
return config
if self.default_config is not None:
return self.default_config
raise ValueError("必须指定数据源配置或设置默认配置")
@contextmanager
def _get_cursor(self, conn) -> Generator[pymysql.cursors.DictCursor, None, None]:
"""获取字典游标的上下文管理器"""
cursor = conn.cursor(pymysql.cursors.DictCursor)
try:
yield cursor
finally:
cursor.close()
@contextmanager
def transaction(self, config: Dict = None):
"""事务管理上下文"""
config = self._resolve_config(config)
with self.ds.get_connection(config) as conn:
try:
yield conn
conn.commit()
except Exception as e:
conn.rollback()
logger.error(f"事务回滚: {str(e)}")
raise
def select_one(self, sql: str, params: tuple = None,
config: Dict = None) -> Optional[Dict]:
"""查询单条记录"""
config = self._resolve_config(config)
with self.ds.get_connection(config) as conn:
with self._get_cursor(conn) as cursor:
try:
cursor.execute(sql, params or ())
return cursor.fetchone()
except Exception as e:
logger.error(f"查询失败:{sql} | {str(e)}")
raise
def select_all(self, sql: str, params: tuple = None,
config: Dict = None) -> List[Dict]:
"""查询多条记录"""
config = self._resolve_config(config)
with self.ds.get_connection(config) as conn:
with self._get_cursor(conn) as cursor:
try:
cursor.execute(sql, params or ())
return cursor.fetchall()
except Exception as e:
logger.error(f"批查询失败:{sql} | {str(e)}")
raise
def execute(self, sql: str, params: tuple = None,
config: Dict = None) -> int:
"""执行写操作(自动提交)"""
config = self._resolve_config(config)
with self.ds.get_connection(config) as conn:
with self._get_cursor(conn) as cursor:
try:
cursor.execute(sql, params or ())
conn.commit()
return cursor.rowcount
except Exception as e:
conn.rollback()
logger.error(f"执行失败:{sql} | {str(e)}")
raise
def batch_execute(self, sql: str, params_list: List[tuple],
config: Dict = None) -> int:
"""批量执行操作(事务提交)"""
config = self._resolve_config(config)
with self.transaction(config) as conn:
with self._get_cursor(conn) as cursor:
total = 0
try:
for params in params_list:
cursor.execute(sql, params)
total += cursor.rowcount
return total
except Exception as e:
logger.error(f"批执行失败:{sql} | {str(e)}")
raise
def execute_in_transaction(self, operations: List[dict],
config: Dict = None) -> bool:
"""
事务中执行多个不同类型操作
:param operations: 操作字典列表,每个字典格式:
{'sql': str, 'params': tuple, 'type': 'write/read'}
:return: 是否全部执行成功
"""
config = self._resolve_config(config)
with self.transaction(config) as conn:
try:
with self._get_cursor(conn) as cursor:
for op in operations:
if op.get('type', 'write') == 'read':
cursor.execute(op['sql'], op.get('params'))
_ = cursor.fetchall() # 消费结果
else:
cursor.execute(op['sql'], op.get('params'))
return True
except Exception as e:
logger.error(f"复合操作失败: {str(e)}")
return False
使用示例:
# 初始化(带默认配置)
db = DBC(DatasourceManager(), default_config=primary_config)
# 简单查询(使用默认数据源)
user = db.select_one("SELECT * FROM users WHERE id=%s", (1001,))
# 跨数据源操作
def cross_domain_update():
# 从分析数据库读取
analytics_data = db.select_all(
"SELECT * FROM user_behavior",
config=analytics_config
)
# 向主数据库写入
with db.transaction() as conn: # 使用默认配置
for data in analytics_data:
db.execute(
"UPDATE users SET stats=%s WHERE id=%s",
(data['stats'], data['user_id']),
conn=conn # 使用事务连接
)
# 同时操作日志数据库
db.execute(
"INSERT INTO sync_log (message) VALUES (%s)",
("数据同步完成",),
config=log_db_config
)
修改亮点解析:
灵活配置传递:
- 方法级config参数覆盖
- 支持初始化默认配置
# 默认使用config_a,特定操作使用config_b
db = DBC(manager, default_config=config_a)
db.select_one("...") # 使用config_a
db.execute("...", config=config_b) # 使用config_b
高效事务控制:
- 通过上下文管理器管理事务生命周期
with db.transaction() as conn: # 开启事务
db.execute("...", conn=conn)
db.execute("...", conn=conn)
# 自动提交/回滚
混合操作支持:
- 单方法内处理多个数据源
def process_multidb():
# 从数据源A读取
data = db.select_all("...", config=config_a)
# 向数据源B写入
db.batch_execute("...", data, config=config_b)
# 在数据源C记录日志
db.execute("...", config=config_c)
连接复用优化:
- 显式传递conn参数可复用同一连接
with db.transaction(config=special_config) as conn:
result = db.select_one("...", conn=conn) # 使用事务连接查询
db.execute("...", conn=conn) # 同一连接写操作
智能类型处理:
- 自动识别读取/写入操作
# 在事务中混合读写操作
db.execute_in_transaction([
{'sql': "SELECT ...", 'type': 'read'},
{'sql': "UPDATE ...", 'params': (...)}
])