大数据

django-ninja动态api发布demo

动态api发布,带数据源配置以及api配置

新建django项目api_server

新建common文件夹 并创建core.py dbc.py dsm.py

core.py

from ninja import NinjaAPI

api = NinjaAPI()

dsm.py

import threading
import time
import hashlib
import pymysql

from contextlib import contextmanager
from typing import Dict, Optional
from dbutils.pooled_db import PooledDB

from common.log import logger


class DatasourceManager(object):
    """
    改进版动态数据源管理器
    功能特性:
    - 支持多数据源并行使用
    - 每个数据源配置独立单例
    - 连接池按配置隔离管理
    - 自适应健康检查
    """

    _instance = None
    _lock = threading.RLock()

    def __new__(cls):
        with cls._lock:
            if not cls._instance:
                cls._instance = super().__new__(cls)
                cls._instance.__init()
            return cls._instance

    def __init(self):
        """初始化多数据源存储结构"""
        # 连接池仓库 {config_hash: pool_entry}
        self._pool_store: Dict[str, Dict] = {}

        # 全局健康检查参数
        self.health_check_interval = 300  # 全局检查间隔

    def _get_pool_entry(self, config_hash: str) -> Optional[Dict]:
        """获取连接池条目"""
        return self._pool_store.get(config_hash)

    @staticmethod
    def _get_config_hash(config: Dict) -> str:
        """生成配置唯一标识"""
        unique_str = f"{config['host']}:{config['port']}@{config['database']}"
        return hashlib.sha256(unique_str.encode()).hexdigest()

    def _validate_config(self, config: Dict):
        """验证配置完整性"""
        required = ['host', 'port', 'user', 'password', 'database']
        if missing := [k for k in required if k not in config]:
            raise ValueError(f"缺少必要配置参数: {missing}")

    def _create_pool(self, config: Dict) -> PooledDB:
        """创建新连接池"""
        try:
            return PooledDB(
                creator=pymysql,
                host=config['host'],
                port=config['port'],
                user=config['user'],
                password=config['password'],
                database=config['database'],
                mincached=config.get('mincached', 2),
                maxcached=config.get('maxcached', 5),
                maxconnections=config.get('maxconnections', 20),
                charset='utf8mb4',
                autocommit=False
            )
        except Exception as e:
            logger.error(f"连接池创建失败:{str(e)}")
            raise

    def _register_pool(self, config: Dict) -> PooledDB:
        """注册新连接池"""
        config_hash = self._get_config_hash(config)
        with self._lock:
            if entry := self._get_pool_entry(config_hash):
                return entry['pool']

            new_pool = self._create_pool(config)
            self._pool_store[config_hash] = {
                'pool': new_pool,
                'last_check': 0,
                'config': config.copy()
            }
            logger.info(f"注册新数据源: {config['host']}:{config['database']}")
            return new_pool

    @contextmanager
    def get_connection(self, config: Dict):
        """
        获取指定配置的连接
        :param config: 数据库连接配置字典
        """
        self._validate_config(config)
        config_hash = self._get_config_hash(config)

        # 自动注册连接池
        if not self._get_pool_entry(config_hash):
            self._register_pool(config)

        with self._lock:
            pool_entry = self._pool_store[config_hash]
            pool = pool_entry['pool']

            # 执行健康检查
            if self._needs_check(pool_entry):
                self._health_check(pool_entry)

            conn = None
            try:
                conn = pool.connection()
                yield conn
            except Exception as e:
                logger.error(f"连接获取异常: {str(e)}")
                self._handle_failure(config_hash)
                raise
            finally:
                if conn:
                    conn.close()

    def _needs_check(self, entry: Dict) -> bool:
        """判断是否需要健康检查"""
        elapsed = time.time() - entry['last_check']
        return elapsed > self.health_check_interval

    def _health_check(self, entry: Dict):
        """执行连接池健康检查"""
        logger.debug(f"执行健康检查: {entry['config']['host']}")
        try:
            conn = entry['pool'].connection()
            cursor = conn.cursor()
            cursor.execute("SELECT 1")
            cursor.close()
            conn.close()
            entry['last_check'] = time.time()
        except Exception as e:
            logger.error(f"健康检查失败: {str(e)}")
            self._handle_failure(entry)

    def _handle_failure(self, entry: Dict):
        """处理连接失效"""
        logger.warning(f"清理失效连接池: {entry['config']['host']}")
        with self._lock:
            entry['pool'].close()
            config_hash = self._get_config_hash(entry['config'])
            del self._pool_store[config_hash]

    def close_all(self):
        """关闭所有连接池"""
        with self._lock:
            for config_hash, entry in list(self._pool_store.items()):
                entry['pool'].close()
                del self._pool_store[config_hash]
            logger.info("已关闭所有数据源连接池")

dbc.py

import pymysql

from contextlib import contextmanager
from typing import Dict, Optional, List, Generator

from common.dsm import DatasourceManager
from common.log import logger


class DBC(object):
    """支持多数据源的通用数据库操作类"""

    def __init__(self, default_config: Dict = None):
        """
        :param data_source: 数据源管理器实例
        :param default_config: 可选默认配置
        """
        self.ds = DatasourceManager()
        self.default_config = default_config

    def _resolve_config(self, config: Dict = None) -> Dict:
        """解析最终使用的配置"""
        if config is not None:
            return config
        if self.default_config is not None:
            return self.default_config
        raise ValueError("必须指定数据源配置或设置默认配置")

    @contextmanager
    def _get_cursor(self, conn) -> Generator[pymysql.cursors.DictCursor, None, None]:
        """获取字典游标的上下文管理器"""
        cursor = conn.cursor(pymysql.cursors.DictCursor)
        try:
            yield cursor
        finally:
            cursor.close()

    @contextmanager
    def transaction(self, config: Dict = None):
        """事务管理上下文"""
        config = self._resolve_config(config)
        with self.ds.get_connection(config) as conn:
            try:
                yield conn
                conn.commit()
            except Exception as e:
                conn.rollback()
                logger.error(f"事务回滚: {str(e)}")
                raise

    def select_one(self, sql: str, params: tuple = None,
                   config: Dict = None) -> Optional[Dict]:
        """查询单条记录"""
        config = self._resolve_config(config)
        with self.ds.get_connection(config) as conn:
            with self._get_cursor(conn) as cursor:
                try:
                    cursor.execute(sql, params or ())
                    return cursor.fetchone()
                except Exception as e:
                    logger.error(f"查询失败:{sql} | {str(e)}")
                    raise

    def select_all(self, sql: str, params: tuple = None,
                   config: Dict = None) -> List[Dict]:
        """查询多条记录"""
        config = self._resolve_config(config)
        with self.ds.get_connection(config) as conn:
            with self._get_cursor(conn) as cursor:
                try:
                    cursor.execute(sql, params or ())
                    return cursor.fetchall()
                except Exception as e:
                    logger.error(f"批查询失败:{sql} | {str(e)}")
                    raise

    def execute(self, sql: str, params: tuple = None,
                config: Dict = None) -> int:
        """执行写操作(自动提交)"""
        config = self._resolve_config(config)
        with self.ds.get_connection(config) as conn:
            with self._get_cursor(conn) as cursor:
                try:
                    cursor.execute(sql, params or ())
                    conn.commit()
                    return cursor.rowcount
                except Exception as e:
                    conn.rollback()
                    logger.error(f"执行失败:{sql} | {str(e)}")
                    raise

    def batch_execute(self, sql: str, params_list: List[tuple],
                      config: Dict = None) -> int:
        """批量执行操作(事务提交)"""
        config = self._resolve_config(config)
        with self.transaction(config) as conn:
            with self._get_cursor(conn) as cursor:
                total = 0
                try:
                    for params in params_list:
                        cursor.execute(sql, params)
                        total += cursor.rowcount
                    return total
                except Exception as e:
                    logger.error(f"批执行失败:{sql} | {str(e)}")
                    raise

    def execute_in_transaction(self, operations: List[dict],
                               config: Dict = None) -> bool:
        """
        事务中执行多个不同类型操作
        :param operations: 操作字典列表,每个字典格式:
            {'sql': str, 'params': tuple, 'type': 'write/read'}
        :return: 是否全部执行成功
        """
        config = self._resolve_config(config)
        with self.transaction(config) as conn:
            try:
                with self._get_cursor(conn) as cursor:
                    for op in operations:
                        if op.get('type', 'write') == 'read':
                            cursor.execute(op['sql'], op.get('params'))
                            _ = cursor.fetchall()  # 消费结果
                        else:
                            cursor.execute(op['sql'], op.get('params'))
                return True
            except Exception as e:
                logger.error(f"复合操作失败: {str(e)}")
                return False

log.py

import logging

# 基础配置(输出到控制台)
logging.basicConfig(
    level=logging.DEBUG,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)

# 获取logger并添加处理器
logger = logging.getLogger(__name__)

新建app: news

models.py

from django.db import models


class DataSource(models.Model):
    name = models.CharField(max_length=100)
    db_type = models.CharField(max_length=20)
    host = models.CharField(max_length=100)
    port = models.IntegerField()
    username = models.CharField(max_length=100)
    password = models.CharField(max_length=100)
    database = models.CharField(max_length=100)

    def gen_config(self):
        return {
            'host': self.host,
            'port': self.port,
            'user': self.username,
            'password': self.password,
            'database': self.database
        }


class APIConfig(models.Model):
    api_name = models.CharField(max_length=100)
    sql = models.TextField()
    datasource = models.ForeignKey(DataSource, on_delete=models.CASCADE)
    api_path = models.CharField(max_length=200)
    is_paginated = models.BooleanField(default=True)
    default_page_size = models.IntegerField(default=10)

apis.py

from .models import APIConfig
from common.dbc import DBC
from common.core import api


@api.get("/{api_path}")
def dynamic_api(request, api_path: str, page_size: int = 10):
    config = APIConfig.objects.get(api_path=f"/{api_path}")
    datasource = config.datasource
    dbc = DBC(datasource.gen_config())
    records = dbc.select_all(config.sql)
    return {"code": 200, "data": records, "cnt": len(records)}

编辑根路由api_server/urls.py

from django.contrib import admin
from django.urls import path
from news.apis import api

urlpatterns = [
    path('admin/', admin.site.urls),
    path('api/', api.urls)
]

以上就可以了。