文章
django-ninja动态api发布demo
动态api发布,带数据源配置以及api配置
新建django项目api_server
新建common文件夹 并创建core.py dbc.py dsm.py
core.py
from ninja import NinjaAPI
api = NinjaAPI()
dsm.py
import threading
import time
import hashlib
import pymysql
from contextlib import contextmanager
from typing import Dict, Optional
from dbutils.pooled_db import PooledDB
from common.log import logger
class DatasourceManager(object):
"""
改进版动态数据源管理器
功能特性:
- 支持多数据源并行使用
- 每个数据源配置独立单例
- 连接池按配置隔离管理
- 自适应健康检查
"""
_instance = None
_lock = threading.RLock()
def __new__(cls):
with cls._lock:
if not cls._instance:
cls._instance = super().__new__(cls)
cls._instance.__init()
return cls._instance
def __init(self):
"""初始化多数据源存储结构"""
# 连接池仓库 {config_hash: pool_entry}
self._pool_store: Dict[str, Dict] = {}
# 全局健康检查参数
self.health_check_interval = 300 # 全局检查间隔
def _get_pool_entry(self, config_hash: str) -> Optional[Dict]:
"""获取连接池条目"""
return self._pool_store.get(config_hash)
@staticmethod
def _get_config_hash(config: Dict) -> str:
"""生成配置唯一标识"""
unique_str = f"{config['host']}:{config['port']}@{config['database']}"
return hashlib.sha256(unique_str.encode()).hexdigest()
def _validate_config(self, config: Dict):
"""验证配置完整性"""
required = ['host', 'port', 'user', 'password', 'database']
if missing := [k for k in required if k not in config]:
raise ValueError(f"缺少必要配置参数: {missing}")
def _create_pool(self, config: Dict) -> PooledDB:
"""创建新连接池"""
try:
return PooledDB(
creator=pymysql,
host=config['host'],
port=config['port'],
user=config['user'],
password=config['password'],
database=config['database'],
mincached=config.get('mincached', 2),
maxcached=config.get('maxcached', 5),
maxconnections=config.get('maxconnections', 20),
charset='utf8mb4',
autocommit=False
)
except Exception as e:
logger.error(f"连接池创建失败:{str(e)}")
raise
def _register_pool(self, config: Dict) -> PooledDB:
"""注册新连接池"""
config_hash = self._get_config_hash(config)
with self._lock:
if entry := self._get_pool_entry(config_hash):
return entry['pool']
new_pool = self._create_pool(config)
self._pool_store[config_hash] = {
'pool': new_pool,
'last_check': 0,
'config': config.copy()
}
logger.info(f"注册新数据源: {config['host']}:{config['database']}")
return new_pool
@contextmanager
def get_connection(self, config: Dict):
"""
获取指定配置的连接
:param config: 数据库连接配置字典
"""
self._validate_config(config)
config_hash = self._get_config_hash(config)
# 自动注册连接池
if not self._get_pool_entry(config_hash):
self._register_pool(config)
with self._lock:
pool_entry = self._pool_store[config_hash]
pool = pool_entry['pool']
# 执行健康检查
if self._needs_check(pool_entry):
self._health_check(pool_entry)
conn = None
try:
conn = pool.connection()
yield conn
except Exception as e:
logger.error(f"连接获取异常: {str(e)}")
self._handle_failure(config_hash)
raise
finally:
if conn:
conn.close()
def _needs_check(self, entry: Dict) -> bool:
"""判断是否需要健康检查"""
elapsed = time.time() - entry['last_check']
return elapsed > self.health_check_interval
def _health_check(self, entry: Dict):
"""执行连接池健康检查"""
logger.debug(f"执行健康检查: {entry['config']['host']}")
try:
conn = entry['pool'].connection()
cursor = conn.cursor()
cursor.execute("SELECT 1")
cursor.close()
conn.close()
entry['last_check'] = time.time()
except Exception as e:
logger.error(f"健康检查失败: {str(e)}")
self._handle_failure(entry)
def _handle_failure(self, entry: Dict):
"""处理连接失效"""
logger.warning(f"清理失效连接池: {entry['config']['host']}")
with self._lock:
entry['pool'].close()
config_hash = self._get_config_hash(entry['config'])
del self._pool_store[config_hash]
def close_all(self):
"""关闭所有连接池"""
with self._lock:
for config_hash, entry in list(self._pool_store.items()):
entry['pool'].close()
del self._pool_store[config_hash]
logger.info("已关闭所有数据源连接池")
dbc.py
import pymysql
from contextlib import contextmanager
from typing import Dict, Optional, List, Generator
from common.dsm import DatasourceManager
from common.log import logger
class DBC(object):
"""支持多数据源的通用数据库操作类"""
def __init__(self, default_config: Dict = None):
"""
:param data_source: 数据源管理器实例
:param default_config: 可选默认配置
"""
self.ds = DatasourceManager()
self.default_config = default_config
def _resolve_config(self, config: Dict = None) -> Dict:
"""解析最终使用的配置"""
if config is not None:
return config
if self.default_config is not None:
return self.default_config
raise ValueError("必须指定数据源配置或设置默认配置")
@contextmanager
def _get_cursor(self, conn) -> Generator[pymysql.cursors.DictCursor, None, None]:
"""获取字典游标的上下文管理器"""
cursor = conn.cursor(pymysql.cursors.DictCursor)
try:
yield cursor
finally:
cursor.close()
@contextmanager
def transaction(self, config: Dict = None):
"""事务管理上下文"""
config = self._resolve_config(config)
with self.ds.get_connection(config) as conn:
try:
yield conn
conn.commit()
except Exception as e:
conn.rollback()
logger.error(f"事务回滚: {str(e)}")
raise
def select_one(self, sql: str, params: tuple = None,
config: Dict = None) -> Optional[Dict]:
"""查询单条记录"""
config = self._resolve_config(config)
with self.ds.get_connection(config) as conn:
with self._get_cursor(conn) as cursor:
try:
cursor.execute(sql, params or ())
return cursor.fetchone()
except Exception as e:
logger.error(f"查询失败:{sql} | {str(e)}")
raise
def select_all(self, sql: str, params: tuple = None,
config: Dict = None) -> List[Dict]:
"""查询多条记录"""
config = self._resolve_config(config)
with self.ds.get_connection(config) as conn:
with self._get_cursor(conn) as cursor:
try:
cursor.execute(sql, params or ())
return cursor.fetchall()
except Exception as e:
logger.error(f"批查询失败:{sql} | {str(e)}")
raise
def execute(self, sql: str, params: tuple = None,
config: Dict = None) -> int:
"""执行写操作(自动提交)"""
config = self._resolve_config(config)
with self.ds.get_connection(config) as conn:
with self._get_cursor(conn) as cursor:
try:
cursor.execute(sql, params or ())
conn.commit()
return cursor.rowcount
except Exception as e:
conn.rollback()
logger.error(f"执行失败:{sql} | {str(e)}")
raise
def batch_execute(self, sql: str, params_list: List[tuple],
config: Dict = None) -> int:
"""批量执行操作(事务提交)"""
config = self._resolve_config(config)
with self.transaction(config) as conn:
with self._get_cursor(conn) as cursor:
total = 0
try:
for params in params_list:
cursor.execute(sql, params)
total += cursor.rowcount
return total
except Exception as e:
logger.error(f"批执行失败:{sql} | {str(e)}")
raise
def execute_in_transaction(self, operations: List[dict],
config: Dict = None) -> bool:
"""
事务中执行多个不同类型操作
:param operations: 操作字典列表,每个字典格式:
{'sql': str, 'params': tuple, 'type': 'write/read'}
:return: 是否全部执行成功
"""
config = self._resolve_config(config)
with self.transaction(config) as conn:
try:
with self._get_cursor(conn) as cursor:
for op in operations:
if op.get('type', 'write') == 'read':
cursor.execute(op['sql'], op.get('params'))
_ = cursor.fetchall() # 消费结果
else:
cursor.execute(op['sql'], op.get('params'))
return True
except Exception as e:
logger.error(f"复合操作失败: {str(e)}")
return False
log.py
import logging
# 基础配置(输出到控制台)
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# 获取logger并添加处理器
logger = logging.getLogger(__name__)
新建app: news
models.py
from django.db import models
class DataSource(models.Model):
name = models.CharField(max_length=100)
db_type = models.CharField(max_length=20)
host = models.CharField(max_length=100)
port = models.IntegerField()
username = models.CharField(max_length=100)
password = models.CharField(max_length=100)
database = models.CharField(max_length=100)
def gen_config(self):
return {
'host': self.host,
'port': self.port,
'user': self.username,
'password': self.password,
'database': self.database
}
class APIConfig(models.Model):
api_name = models.CharField(max_length=100)
sql = models.TextField()
datasource = models.ForeignKey(DataSource, on_delete=models.CASCADE)
api_path = models.CharField(max_length=200)
is_paginated = models.BooleanField(default=True)
default_page_size = models.IntegerField(default=10)
apis.py
from .models import APIConfig
from common.dbc import DBC
from common.core import api
@api.get("/{api_path}")
def dynamic_api(request, api_path: str, page_size: int = 10):
config = APIConfig.objects.get(api_path=f"/{api_path}")
datasource = config.datasource
dbc = DBC(datasource.gen_config())
records = dbc.select_all(config.sql)
return {"code": 200, "data": records, "cnt": len(records)}
编辑根路由api_server/urls.py
from django.contrib import admin
from django.urls import path
from news.apis import api
urlpatterns = [
path('admin/', admin.site.urls),
path('api/', api.urls)
]
以上就可以了。