数据治理

spring-boot实现多数据源管理

技术栈:spring-boot + mybatis-plus + dynamic-datasource

实现动态添加数据源,并可执行query操作

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.18</version>
    </parent>

    <groupId>com.bigdata</groupId>
    <artifactId>bigdata-center</artifactId>
    <version>1.0</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <java.version>1.8</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <mybatis-plus.version>3.5.6</mybatis-plus.version>
        <dynamic-ds.version>4.2.0</dynamic-ds.version>
        <postgresql.version>42.7.3</postgresql.version>
        <mysql.version>8.0.33</mysql.version>
        <commons-codec.version>1.15</commons-codec.version>
    </properties>

    <dependencies>
        <!-- web -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!-- mybatis plus -->
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
            <version>${mybatis-plus.version}</version>
        </dependency>

        <!-- dynamic datasource -->
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>dynamic-datasource-spring-boot-starter</artifactId>
            <version>${dynamic-ds.version}</version>
        </dependency>

        <!-- mysql driver -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>${mysql.version}</version>
        </dependency>

        <!-- postgresql -->
        <dependency>
            <groupId>org.postgresql</groupId>
            <artifactId>postgresql</artifactId>
            <version>${postgresql.version}</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>

        <!-- Apache commons codec 用于MD5 -->
        <dependency>
            <groupId>commons-codec</groupId>
            <artifactId>commons-codec</artifactId>
            <version>${commons-codec.version}</version>
        </dependency>

        <!-- DevTools 开发时热部署 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>

        <!-- Test -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

resources/application.yml

server:
  port: 8080
  servlet:
    context-path: /

spring:
  application:
    name: datasource-manager

  # 默认数据源(用于存储 datasource 表)
  datasource:
    dynamic:
      primary: default
      strict: true  # 允许动态添加不存在的 datasource
      datasource:
        default:
          driver-class-name: com.mysql.cj.jdbc.Driver
          url: jdbc:mysql://127.0.0.1:3306/django_datacenter?useUnicode=true&characterEncoding=utf8&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true&useSSL=false
          username: root
          password: root
          hikari:
            connection-timeout: 30000
            idle-timeout: 600000
            max-lifetime: 1800000
            maximum-pool-size: 20
            minimum-idle: 5

  # 关闭 banner(可选)
  main:
    banner-mode: "off"

# MyBatis-Plus 配置
mybatis-plus:
  configuration:
    map-underscore-to-camel-case: true
    log-impl: org.apache.ibatis.logging.stdout.StdOutImpl # 开发时可开启 SQL 日志
  mapper-locations: classpath:mapper/*.xml
  global-config:
    db-config:
      id-type: none  # datasource.id 为字符串主键
      table-underline: true

# 日志配置(可选)
logging:
  level:
    com.example.datasource: debug
    com.baomidou.dynamic: info

resources/mapper/DatasourceMapper.xml

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper
        PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
        "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.bigdata.datasource.mapper.DatasourceMapper">

    <resultMap id="BaseResultMap" type="com.bigdata.datasource.entity.Datasource">
        <id property="id" column="id"/>
        <result property="sourceName" column="source_name"/>
        <result property="host" column="host"/>
        <result property="username" column="username"/>
        <result property="password" column="password"/>
        <result property="port" column="port"/>
        <result property="defaultDatabase" column="default_database"/>
        <result property="dbAlias" column="db_alias"/>
        <result property="dbType" column="db_type"/>
        <result property="deleteFlag" column="delete_flag"/>
        <result property="createTime" column="create_time"/>
        <result property="updateTime" column="update_time"/>
        <result property="schemaName" column="schema_name"/>
    </resultMap>

    <sql id="Base_Column_List">
        id
        ,source_name,host,username,password,port,
        default_database,db_alias,db_type,delete_flag,create_time,
        update_time,schema_name
    </sql>
</mapper>

entity目录

package com.bigdata.datasource.entity;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import com.bigdata.datasource.utils.DbType;
import lombok.Data;

import java.time.LocalDateTime;
import java.util.Objects;

@Data
@TableName("datasource")
public class Datasource {
    @TableId(type = IdType.NONE)
    private String id;
    private String sourceName;
    private String host;
    private String username;
    private String password;
    private Integer port;
    private String defaultDatabase;
    private String schemaName; // postgresql 专用
    private String dbAlias; // 唯一别名 用于动态数据源 key
    private String dbType; // mysql/doris/postgresql
    private String deleteFlag = "0";
    private LocalDateTime createTime;
    private LocalDateTime updateTime;

    public String getEffectSchema() {
        DbType type = DbType.fromString(dbType);
        if (type == DbType.POSTGRESQL) {
            return Objects.toString(schemaName, "public");
        }
        return null;
    }
}

mapper目录

package com.bigdata.datasource.mapper;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.bigdata.datasource.entity.Datasource;
import org.apache.ibatis.annotations.Mapper;


@Mapper
public interface DatasourceMapper extends BaseMapper<Datasource> {

}

service目录

package com.bigdata.datasource.service;

import com.baomidou.mybatisplus.extension.service.IService;
import com.bigdata.datasource.entity.Datasource;

import javax.sql.DataSource;
import java.util.List;
import java.util.Map;


public interface DatasourceService extends IService<Datasource> {
    DataSource registerDataSource(Datasource ds);

    void unregisterDataSource(String alias);

    List<String> getAllTables(Datasource ds);

    List<Map<String, Object>> getTableColumns(Datasource ds, String tableName);

    String getCreateTableSql(Datasource ds, String tableName);

    String getTableComment(Datasource ds, String tableName);

    List<Map<String, Object>> executeQuery(Datasource ds, String sql);
    List<Map<String, Object>> executeQuery(Datasource ds, String sql, Object... params);

    int executeUpdate(Datasource ds, String sql);
    int executeUpdate(Datasource ds, String sql, Object... params);

    void executeDdl(Datasource ds, String sql);
    void executeDdl(Datasource ds, String sql, Object... params);
}
package com.bigdata.datasource.service;

import com.bigdata.datasource.entity.Datasource;
import com.bigdata.datasource.utils.SyncTableResult;

import java.util.Map;

public interface SchemaSyncService {
    // 同步表结构:从源数据源 → 目标数据源
    SyncTableResult syncTable(Datasource sourceDs, Datasource targetDs, String sourceTable, String targetTable);

    // 生成目标数据源的建表 DDL(不执行)
    String generateCreateTableDdl(Datasource sourceDs, Datasource targetDs, String sourceTable, String targetTable);
}

service/impl/目录

package com.bigdata.datasource.service.impl;

import com.baomidou.dynamic.datasource.DynamicRoutingDataSource;
import com.baomidou.dynamic.datasource.creator.DataSourceCreator;
import com.baomidou.dynamic.datasource.creator.DataSourceProperty;
import com.baomidou.dynamic.datasource.creator.hikaricp.HikariCpConfig;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.bigdata.datasource.entity.Datasource;
import com.bigdata.datasource.mapper.DatasourceMapper;
import com.bigdata.datasource.service.DatasourceService;
import com.bigdata.datasource.utils.DbType;
import com.bigdata.datasource.utils.DdlBuilderUtils;
import com.bigdata.datasource.utils.TableNameValidator;
import org.apache.commons.codec.digest.DigestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;

import javax.sql.DataSource;
import java.sql.*;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

/**
 * 数据源服务实现类
 * <p>
 * 职责:
 * - 动态注册/注销数据源到 DynamicRoutingDataSource
 * - 提供统一的 SQL 执行接口(查询、DML、DDL)
 * - 获取数据库元数据(表列表、字段信息、建表语句、注释等)
 * <p>
 * 设计原则:
 * - 线程安全:依赖 DynamicRoutingDataSource 内部同步机制
 * - 重试机制:对连接失效异常自动重试一次
 * - 类型安全:按 DbType 分发不同数据库的元数据查询逻辑
 * - 日志完备:关键操作均有 info/debug/error 日志
 */
@Service
public class DataSourceServiceImpl extends ServiceImpl<DatasourceMapper, Datasource> implements DatasourceService {
    private static final Logger log = LoggerFactory.getLogger(DataSourceServiceImpl.class);

    @Autowired
    private DynamicRoutingDataSource dynamicRoutingDataSource;

    @Autowired
    @Qualifier("hikariDataSourceCreator")
    private DataSourceCreator dataSourceCreator;

    private final Map<String, Object> registrationLocks = new ConcurrentHashMap<>();

    // ========== 数据源注册与连接管理 ==========

    /**
     * 动态注册数据源到路由容器
     *
     * @param ds 数据源配置实体
     * @return 创建的 DataSource 实例
     * @throws IllegalArgumentException 当数据源为空或别名为 'default' 时
     */
    @Override
    public DataSource registerDataSource(Datasource ds) {
        if (ds == null) {
            throw new IllegalArgumentException("Datasource must not be null");
        }
        String alias = generateAlias(ds);
        if ("default".equals(alias)) {
            throw new IllegalArgumentException("Alias 'default' is reserved for primary datasource and cannot be used");
        }

        log.debug("准备注册动态数据源: alias={}, host={}:{}, db={}",
                alias, ds.getHost(), ds.getPort(), ds.getDefaultDatabase());

        // 构建数据源属性
        DataSourceProperty property = buildDataSourceProperty(ds);

        // 创建并注册
        DataSource dataSource = dataSourceCreator.createDataSource(property);
        dynamicRoutingDataSource.addDataSource(alias, dataSource);

        log.info("动态数据源注册成功: alias={}", alias);
        return dataSource;
    }

    /**
     * 从路由容器中移除数据源
     *
     * @param alias 数据源别名
     * @throws IllegalArgumentException 当尝试移除 'default' 数据源时
     */
    @Override
    public void unregisterDataSource(String alias) {
        if ("default".equals(alias)) {
            throw new IllegalArgumentException("Cannot remove primary datasource");
        }
        // 获取与 getConnection 中相同的锁对象
        Object lock = registrationLocks.get(alias);
        if (lock != null) {
            synchronized (lock) {
                // 先检查是否存在(避免移除不存在的)
                if (dynamicRoutingDataSource.getDataSources().containsKey(alias)) {
                    dynamicRoutingDataSource.removeDataSource(alias);
                    log.info("动态数据源已移除: alias={}", alias);
                }
            }
        } else {
            // 没有锁,可能是预注册的,直接尝试移除(remove 内部是安全的)
            dynamicRoutingDataSource.removeDataSource(alias);
            log.debug("尝试移除不存在的数据源: alias={}", alias);
        }
    }

    /**
     * 获取数据源连接(带懒加载和重试)
     * <p>
     * 策略:
     * - 先查缓存
     * - 不存在则注册(幂等)
     * - 返回连接
     */
    private Connection getConnection(Datasource ds) throws SQLException {
        String alias = generateAlias(ds);
        // 第一次检查:直接查底层 map(不会抛异常)
        DataSource target = dynamicRoutingDataSource.getDataSources().get(alias);
        if (target == null) {
            Object lock = registrationLocks.computeIfAbsent(alias, k -> new Object());
            synchronized (lock) {
                // 双重检查:仍然查 map,不是 getDataSource()
                target = dynamicRoutingDataSource.getDataSources().get(alias);
                if (target == null) {
                    target = registerDataSource(ds); // 直接拿到新创建的 DataSource
                }
            }
        }
        return target.getConnection();
    }

    /**
     * 将 ResultSet 转换为 List<Map>
     */
    private List<Map<String, Object>> resultSetToList(ResultSet rs) throws SQLException {
        List<Map<String, Object>> list = new ArrayList<>();
        ResultSetMetaData meta = rs.getMetaData();
        int cols = meta.getColumnCount();
        while (rs.next()) {
            HashMap<String, Object> row = new HashMap<>();
            for (int i = 1; i <= cols; i++) {
                row.put(meta.getColumnLabel(i), rs.getObject(i));
            }
            list.add(row);
        }
        return list;
    }

    // ========== SQL 执行核心逻辑 ==========

    /**
     * 执行 SQL(带连接失效重试)
     *
     * @param ds       数据源
     * @param sql      SQL 语句
     * @param params   参数(用于 PreparedStatement)
     * @param fetch    是否查询(返回 ResultSet)
     * @param isDml    是否 DML(返回影响行数)
     * @return 查询结果列表 或 DML 影响行数包装 或 null(DDL)
     */
    private List<Map<String, Object>> executeWithRetry(Datasource ds, String sql, Object[] params, boolean fetch, boolean isDml) {
        for (int attempt = 0; attempt <= 1; attempt++) { // 最多重试1次
            try (Connection conn = getConnection(ds)) {
                return executeSqlInternal(conn, sql, params, fetch, isDml);
            } catch (SQLException e) {
                if (isConnectionError(e) && attempt == 0) {
                    log.warn("检测到连接失效,尝试重新注册数据源并重试: {}", e.getMessage());
                    String alias = generateAlias(ds);
                    unregisterDataSource(alias);
                    continue; // 重试
                }
                log.error("SQL 执行失败 (attempt={})", attempt + 1, e);
                throw new RuntimeException("SQL 执行失败: " + e.getMessage(), e);
            }
        }
        return null; // 不可达
    }

    /**
     * 判断是否为连接相关异常(基于 JDBC 标准异常分类)
     */
    private boolean isConnectionError(SQLException e) {
        // 使用 SQLState 和异常类型判断,比字符串匹配更可靠
        return e instanceof SQLTransientConnectionException ||
                e instanceof SQLNonTransientConnectionException ||
                "08".equals(e.getSQLState()) || // SQLState 08xxx 表示连接异常
                (e.getErrorCode() == 0 && e.getMessage() != null &&
                        (e.getMessage().toLowerCase().contains("closed") ||
                                e.getMessage().toLowerCase().contains("gone away") ||
                                e.getMessage().toLowerCase().contains("broken pipe") ||
                                e.getMessage().toLowerCase().contains("connection reset")));
    }

    /**
     * 在已有连接上执行 SQL
     */
    private List<Map<String, Object>> executeSqlInternal(Connection conn, String sql, Object[] params, boolean fetch, boolean isDml)
            throws SQLException {
        try (PreparedStatement ps = conn.prepareStatement(sql)) {
            if (params != null) {
                for (int i = 0; i < params.length; i++) {
                    ps.setObject(i + 1, params[i]);
                }
            }
            if (fetch) {
                try (ResultSet rs = ps.executeQuery()) {
                    return resultSetToList(rs);
                }
            } else if (isDml) {
                int count = ps.executeUpdate();
                return Arrays.asList(Collections.singletonMap("rowcount", count));
            } else {
                ps.execute();
                return null;
            }
        }
    }

    // ========== 元数据查询 ==========

    @Override
    public List<String> getAllTables(Datasource ds) {
        DbType type = DbType.fromString(ds.getDbType());
        if (type == DbType.MYSQL || type == DbType.DORIS) {
            List<Map<String, Object>> rows = executeWithRetry(ds, "SHOW TABLES", null, true, false);
            String key = "Tables_in_" + ds.getDefaultDatabase();
            return rows.stream().map(r -> (String) r.get(key)).collect(Collectors.toList());
        } else if (type == DbType.POSTGRESQL) {
            String schema = ds.getEffectSchema();
            List<Map<String, Object>> rows = executeQuery(ds,
                    "SELECT table_name FROM information_schema.tables WHERE table_schema = ? AND table_type = 'BASE TABLE'",
                    schema);
            return rows.stream().map(r -> (String) r.get("table_name")).collect(Collectors.toList());
        }
        throw new UnsupportedOperationException("不支持的数据库类型");
    }

    @Override
    public List<Map<String, Object>> getTableColumns(Datasource ds, String tableName) {
        tableName = TableNameValidator.validate(tableName);
        DbType type = DbType.fromString(ds.getDbType());
        if (type == DbType.MYSQL || type == DbType.DORIS) {
            return getTableColumnsFromMysqlOrDoris(ds, tableName);
        } else if (type == DbType.POSTGRESQL) {
            return getTableColumnsFromPostgreSQL(ds, tableName);
        }
        throw new UnsupportedOperationException("不支持的数据库类型");
    }

    /**
     * 从 MySQL 或 Doris 获取表字段信息
     */
    private List<Map<String, Object>> getTableColumnsFromMysqlOrDoris(Datasource ds, String tableName) {
        List<Map<String, Object>> rows = executeQuery(ds,
                "SELECT COLUMN_NAME, COLUMN_TYPE, IS_NULLABLE, COLUMN_DEFAULT, COLUMN_COMMENT, EXTRA, COLUMN_KEY " +
                        "FROM information_schema.COLUMNS " +
                        "WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? ORDER BY ORDINAL_POSITION",
                ds.getDefaultDatabase(), tableName);

        return rows.stream().map(row -> {
            Map<String, Object> col = new HashMap<>();
            col.put("name", row.get("COLUMN_NAME"));
            col.put("type", row.get("COLUMN_TYPE"));
            col.put("nullable", "YES".equals(row.get("IS_NULLABLE")));
            col.put("default", row.get("COLUMN_DEFAULT"));
            col.put("comment", row.get("COLUMN_COMMENT") != null ? row.get("COLUMN_COMMENT") : "");
            col.put("extra", row.get("EXTRA") != null ? row.get("EXTRA") : "");
            col.put("is_primary_key", "PRI".equals(row.get("COLUMN_KEY")));
            return col;
        }).collect(Collectors.toList());
    }

    /**
     * 从 PostgreSQL 获取表字段信息(使用参数化查询减少拼接)
     */
    private List<Map<String, Object>> getTableColumnsFromPostgreSQL(Datasource ds, String tableName) {
        String schema = ds.getEffectSchema();
        
        String sql = String.format(
                "SELECT  " +
                        "c.column_name, " +
                        "c.data_type, " +
                        "c.character_maximum_length, " +
                        "c.numeric_precision, " +
                        "c.numeric_scale, " +
                        "c.is_nullable, " +
                        "c.column_default, " +
                        "pg_catalog.col_description( " +
                        "(SELECT oid FROM pg_class WHERE relname = '%s' AND relnamespace =  " +
                        "(SELECT oid FROM pg_namespace WHERE nspname = '%s') " +
                        "), c.ordinal_position " +
                        ") AS column_comment, " +
                        "CASE  " +
                        "WHEN pk_cols.column_name IS NOT NULL THEN TRUE  " +
                        "ELSE FALSE  " +
                        "END AS is_primary_key " +
                        "FROM information_schema.columns c " +
                        "LEFT JOIN ( " +
                        "SELECT kcu.column_name " +
                        "FROM information_schema.table_constraints tc " +
                        "JOIN information_schema.key_column_usage kcu " +
                        "ON tc.constraint_name = kcu.constraint_name " +
                        "AND tc.table_schema = kcu.table_schema " +
                        "WHERE tc.constraint_type = 'PRIMARY KEY' " +
                        "AND tc.table_schema = '%s' " +
                        "AND tc.table_name = '%s' " +
                        ") pk_cols ON c.column_name = pk_cols.column_name " +
                        "WHERE c.table_schema = '%s' AND c.table_name = '%s' " +
                        "ORDER BY c.ordinal_position ",
                tableName, schema, schema, tableName, schema, tableName
        );
        List<Map<String, Object>> rows = executeQuery(ds, sql);

        return rows.stream().map(row -> {
            String dataType = (String) row.get("data_type");
            Integer charLen = (Integer) row.get("character_maximum_length");
            String displayType;

            // 类型标准化
            if ("character varying".equals(dataType)) {
                displayType = charLen != null ? "varchar(" + charLen + ")" : "varchar";
            } else if ("character".equals(dataType)) {
                displayType = charLen != null ? "char(" + charLen + ")" : "char";
            } else if ("numeric".equals(dataType)) {
                Integer prec = (Integer) row.get("numeric_precision");
                Integer scale = (Integer) row.get("numeric_scale");
                displayType = (prec != null && scale != null) ? "numeric(" + prec + "," + scale + ")" : "numeric";
            } else {
                switch (dataType) {
                    case "integer":
                        displayType = "int";
                        break;
                    case "bigint":
                        displayType = "bigint";
                        break;
                    case "double precision":
                        displayType = "double";
                        break;
                    case "timestamp without time zone":
                        displayType = "timestamp";
                        break;
                    default:
                        displayType = dataType;
                        break;
                }
            }

            Map<String, Object> col = new HashMap<>();
            col.put("name", row.get("column_name"));
            col.put("type", displayType);
            col.put("nullable", "YES".equals(row.get("is_nullable")));
            // pg默认值可能包含::需要特殊处理
            Object rawDefault = row.get("column_default");
            if (rawDefault != null) {
                String columnDefault = rawDefault.toString();
                // 1. 去掉 ::type
                if (columnDefault.contains("::")) {
                    columnDefault = columnDefault.split("::", 2)[0]; // 只 split 一次,避免内容中有 ::
                }
                // 2. 如果是字符串字面量(以 ' 开头和结尾),去掉外层引号并处理转义
                if (columnDefault.length() >= 2 &&
                        columnDefault.startsWith("'") &&
                        columnDefault.endsWith("'")) {
                    // 去掉首尾单引号
                    columnDefault = columnDefault.substring(1, columnDefault.length() - 1);
                    // 将 '' 替换为 '
                    columnDefault = columnDefault.replace("''", "'");
                }
                col.put("default", columnDefault);
            } else {
                col.put("default", null);
            }
            col.put("comment", row.get("column_comment") != null ? row.get("column_comment") : "");
            col.put("extra", "");
            col.put("is_primary_key", row.get("is_primary_key"));
            return col;
        }).collect(Collectors.toList());
    }

    @Override
    public String getCreateTableSql(Datasource ds, String tableName) {
        tableName = TableNameValidator.validate(tableName);
        DbType type = DbType.fromString(ds.getDbType());
        if (type == DbType.MYSQL || type == DbType.DORIS) {
            List<Map<String, Object>> rows = executeWithRetry(ds, "SHOW CREATE TABLE `" + tableName + "`", new Object[0], true, false);
            return (String) rows.get(0).get("Create Table");
        } else if (type == DbType.POSTGRESQL) {
            // PostgreSQL 无原生 SHOW CREATE TABLE,需重建
            List<Map<String, Object>> columns = getTableColumns(ds, tableName);
            String tableComment = getTableComment(ds, tableName);
            String schema = ds.getEffectSchema();
            String createSql = DdlBuilderUtils.buildCreateTableSql(
                    columns, type, type, tableName, schema, tableComment, true);
            return createSql;
        }
        throw new UnsupportedOperationException("不支持的数据库类型");
    }

    @Override
    public String getTableComment(Datasource ds, String tableName) {
        tableName = TableNameValidator.validate(tableName);
        DbType type = DbType.fromString(ds.getDbType());
        if (type == DbType.MYSQL || type == DbType.DORIS) {
            List<Map<String, Object>> rows = executeQuery(ds,
                    "SELECT TABLE_COMMENT FROM information_schema.TABLES WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ?",
                    ds.getDefaultDatabase(), tableName);
            return rows.isEmpty() ? null : (String) rows.get(0).get("TABLE_COMMENT");
        } else if (type == DbType.POSTGRESQL) {
            String schema = ds.getEffectSchema();
            List<Map<String, Object>> rows = executeQuery(ds,
                    "SELECT obj_description(c.oid, 'pg_class') AS table_comment " +
                            "FROM pg_class c JOIN pg_namespace n ON n.oid = c.relnamespace " +
                            "WHERE c.relname = ? AND n.nspname = ?",
                    tableName, schema);
            return rows.isEmpty() ? null : (String) rows.get(0).get("table_comment");
        }
        return null;
    }

    // ========== SQL 执行公开接口 ==========

    @Override
    public List<Map<String, Object>> executeQuery(Datasource ds, String sql) {
        return executeQuery(ds, sql, new Object[0]);
    }

    @Override
    public List<Map<String, Object>> executeQuery(Datasource ds, String sql, Object... params) {
        if (!sql.trim().toUpperCase().startsWith("SELECT")) {
            throw new IllegalArgumentException("仅支持 SELECT 查询");
        }
        return executeWithRetry(ds, sql, params, true, false);
    }

    @Override
    public int executeUpdate(Datasource ds, String sql) {
        return executeUpdate(ds, sql, new Object[0]);
    }

    @Override
    public int executeUpdate(Datasource ds, String sql, Object... params) {
        String upper = sql.trim().toUpperCase();
        if (!(upper.startsWith("INSERT") || upper.startsWith("UPDATE") || upper.startsWith("DELETE"))) {
            throw new IllegalArgumentException("仅支持 DML 语句");
        }
        List<Map<String, Object>> result = executeWithRetry(ds, sql, params, false, true);
        return result == null ? 0 : (int) result.get(0).get("rowcount");
    }

    @Override
    public void executeDdl(Datasource ds, String sql) {
        executeDdl(ds, sql, new Object[0]);
    }

    @Override
    public void executeDdl(Datasource ds, String sql, Object... params) {
        for (String stmt : splitSql(sql)) {
            String upper = stmt.trim().toUpperCase();
            if (!(upper.startsWith("CREATE") || upper.startsWith("ALTER") ||
                    upper.startsWith("DROP") || upper.startsWith("TRUNCATE") || upper.startsWith("COMMENT"))) {
                throw new IllegalArgumentException("DDL 仅支持 CREATE/ALTER/DROP/TRUNCATE/COMMENT");
            }
            executeWithRetry(ds, stmt, params, false, false);
        }
    }

    private List<String> splitSql(String sql) {
        return Arrays.stream(sql.split(";"))
                .map(String::trim)
                .filter(s -> !s.isEmpty())
                .collect(Collectors.toList());
    }

    // ========== 工具方法 ==========

    /**
     * 生成数据源唯一别名(基于连接信息 MD5)
     */
    private String generateAlias(Datasource ds) {
        String schema = ds.getEffectSchema();

        String fingerprintInput = String.format(
                "%s:%d/%s/%s?user=%s&pass=%s",
                ds.getHost(),
                ds.getPort(),
                Objects.toString(ds.getDefaultDatabase(), ""),
                Objects.toString(schema, ""),
                Objects.toString(ds.getUsername(), ""),
                Objects.toString(ds.getPassword(), "")
        );
        String fingerprint = DigestUtils.md5Hex(fingerprintInput);
        return "dynamic_" + fingerprint.substring(0, 8);
    }

    /**
     * 构建 JDBC URL
     */
    private String buildJdbcUrl(Datasource ds) {
        DbType dbType = DbType.fromString(ds.getDbType());
        String host = ds.getHost();
        int port = ds.getPort();
        String db = ds.getDefaultDatabase();
        switch (dbType) {
            case MYSQL:
            case DORIS:
                return "jdbc:mysql://" + host + ":" + port + "/" + db +
                        "?useUnicode=true&characterEncoding=utf8&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true&useSSL=false";
            case POSTGRESQL:
                return "jdbc:postgresql://" + host + ":" + port + "/" + db;
            default:
                throw new IllegalArgumentException("不支持的类型: " + dbType);
        }
    }

    /**
     * 获取 JDBC 驱动类名
     */
    public String getDriverClass(String dbType) {
        switch (DbType.fromString(dbType)) {
            case MYSQL:
            case DORIS:
                return "com.mysql.cj.jdbc.Driver";
            case POSTGRESQL:
                return "org.postgresql.Driver";
            default:
                throw new IllegalArgumentException("不支持的类型: " + dbType);
        }
    }

    /**
     * 构建 DataSourceProperty(含连接池配置)
     */
    private DataSourceProperty buildDataSourceProperty(Datasource ds) {
        DataSourceProperty property = new DataSourceProperty();
        // 设置基础连接信息
        property.setUsername(ds.getUsername());
        property.setPassword(ds.getPassword());
        // 构建JDBC URL
        property.setUrl(buildJdbcUrl(ds));
        // 设置驱动类
        property.setDriverClassName(getDriverClass(ds.getDbType()));
        // 可选 设置连接池参数(HikariCP)
        property.setHikari(buildHikariConfig(ds));
        return property;
    }

    /**
     * 构建 HikariCP 连接池配置
     * <p>
     * TODO: 未来可从配置中心读取
     */
    private HikariCpConfig buildHikariConfig(Datasource ds) {
        HikariCpConfig config = new HikariCpConfig();
        config.setConnectionTimeout(30000L);   // 30秒
        config.setIdleTimeout(600000L);        // 10分钟
        config.setMaxLifetime(1800000L);       // 30分钟
        config.setMaximumPoolSize(20);
        config.setMinIdle(5);
        return config;
    }
}
package com.bigdata.datasource.service.impl;

import com.bigdata.datasource.entity.Datasource;
import com.bigdata.datasource.service.DatasourceService;
import com.bigdata.datasource.service.SchemaSyncService;
import com.bigdata.datasource.utils.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

import java.util.*;
import java.util.regex.Pattern;

/**
 * 表结构同步服务实现类
 * <p>
 * 职责:
 * - 校验同步参数(表名、数据源等)
 * - 区分同构(相同数据库类型)与异构(不同数据库类型)同步
 * - 生成并执行目标建表语句
 * - 处理 PostgreSQL 的额外注释语句
 * - 新增能力:仅生成 DDL(供前端预览/编辑)
 * <p>
 * 注意:
 * - 本服务层不返回 Map,不处理 Web 响应格式
 * - 业务异常抛出 SchemaSyncException,由 Controller 统一捕获
 * - 系统异常(如数据库连接失败)也包装为 SchemaSyncException 向上抛出
 */
@Service
public class SchemaSyncServiceImpl implements SchemaSyncService {

    @Autowired
    private DatasourceService datasourceService;

    private static final Logger log = LoggerFactory.getLogger(SchemaSyncServiceImpl.class);

    /**
     * 同步表结构:从源数据源的源表同步到目标数据源的目标表
     * <p>
     * 此方法会:
     * 1. 校验参数
     * 2. 生成目标 DDL
     * 3. 在目标数据源中执行建表
     *
     * @param sourceDs     源数据源(必须已注册)
     * @param targetDs     目标数据源(必须已注册)
     * @param sourceTable  源表名(必填)
     * @param targetTable  目标表名(可为空,默认同源表名)
     * @return 同步结果(成功/失败 + SQL + 消息)
     * @throws SchemaSyncException 业务校验失败或同步过程中发生错误
     */
    @Override
    public SyncTableResult syncTable(Datasource sourceDs, Datasource targetDs, String sourceTable, String targetTable) {
        // ====== 1. 参数校验与预处理 ======
        validateAndPrepare(sourceDs, targetDs, sourceTable, targetTable);

        // 提取标准化后的表名
        sourceTable = TableNameValidator.validate(sourceTable.trim());
        targetTable = (targetTable == null || targetTable.trim().isEmpty())
                ? sourceTable
                : TableNameValidator.validate(targetTable.trim());

        // 获取数据库类型
        DbType sourceType = DbType.fromString(sourceDs.getDbType());
        DbType targetType = DbType.fromString(targetDs.getDbType());

        log.info("开始同步表结构: {}({}) -> {}({})",
                sourceDs.getSourceName(), sourceTable, targetDs.getSourceName(), targetTable);

        try {
            if (sourceType == targetType) {
                // 同构同步:复用源库建表语句(替换表名/schema)
                return handleHomogeneousSync(sourceDs, targetDs, sourceTable, targetTable, sourceType);
            } else {
                // 异构同步:解析字段元数据,重建目标建表语句
                return handleHeterogeneousSync(sourceDs, targetDs, sourceTable, targetTable, sourceType, targetType);
            }
        } catch (Exception e) {
            log.error("表结构同步过程中发生未预期异常", e);
            throw new SchemaSyncException("同步失败: " + e.getMessage(), e);
        }
    }

    /**
     * 生成目标数据源的建表 DDL(不执行)
     * <p>
     * 此方法适用于:
     * - 前端预览建表语句
     * - 用户手动编辑后再提交执行
     * - 自动化脚本生成
     *
     * @param sourceDs     源数据源
     * @param targetDs     目标数据源
     * @param sourceTable  源表名
     * @param targetTable  目标表名(可为空,默认同源表名)
     * @return 生成的完整 DDL 语句(字符串)
     * @throws SchemaSyncException 参数校验失败或生成过程中出错
     */
    public String generateCreateTableDdl(
            Datasource sourceDs,
            Datasource targetDs,
            String sourceTable,
            String targetTable) {

        // 1. 参数校验(复用相同逻辑,但不检查目标表是否存在,因为只是生成)
        if (sourceTable == null || sourceTable.trim().isEmpty()) {
            throw new SchemaSyncException("源表名不能为空");
        }

        // 2. 表名标准化
        sourceTable = TableNameValidator.validate(sourceTable.trim());
        targetTable = (targetTable == null || targetTable.trim().isEmpty())
                ? sourceTable
                : TableNameValidator.validate(targetTable.trim());

        // 3. 获取数据库类型
        DbType sourceType = DbType.fromString(sourceDs.getDbType());
        DbType targetType = DbType.fromString(targetDs.getDbType());

        log.debug("生成建表 DDL: {}({}) -> {}({})", sourceDs.getSourceName(), sourceTable, targetDs.getSourceName(), targetTable);

        // 4. 根据是否同构,调用对应的 DDL 生成方法
        if (sourceType == targetType) {
            return generateHomogeneousDdl(sourceDs, targetDs, sourceTable, targetTable, sourceType);
        } else {
            return generateHeterogeneousDdl(sourceDs, targetDs, sourceTable, targetTable, sourceType, targetType);
        }
    }

    // ==============================
    // Private: 参数校验(仅用于 syncTable,因为要检查目标表是否存在)
    // ==============================

    /**
     * 参数校验与前置检查(仅用于实际同步场景)
     */
    private void validateAndPrepare(Datasource sourceDs, Datasource targetDs, String sourceTable, String targetTable) {
        if (sourceTable == null || sourceTable.trim().isEmpty()) {
            throw new SchemaSyncException("源表名不能为空");
        }

        // 禁止自同步(同一数据源 + 同名表)
        if (Objects.equals(sourceDs.getId(), targetDs.getId()) &&
                sourceTable.trim().equals((targetTable == null ? sourceTable : targetTable.trim()))) {
            throw new SchemaSyncException("禁止将表同步到自身");
        }

        // 检查目标表是否已存在(仅 syncTable 需要,generate 不需要)
        String finalTargetTable = (targetTable == null || targetTable.trim().isEmpty())
                ? sourceTable.trim()
                : targetTable.trim();
        List<String> targetTables = datasourceService.getAllTables(targetDs);
        if (targetTables.contains(finalTargetTable)) {
            throw new SchemaSyncException("目标表 " + finalTargetTable + " 已存在");
        }
    }

    // ==============================
    // 同构同步:执行 + 生成
    // ==============================

    /**
     * 处理同构同步(如 MySQL → MySQL, PG → PG)
     * <p>
     * 此方法会生成 DDL 并执行。
     */
    private SyncTableResult handleHomogeneousSync(
            Datasource sourceDs,
            Datasource targetDs,
            String sourceTable,
            String targetTable,
            DbType dbType) {

        // 1. 生成 DDL
        String finalSql = generateHomogeneousDdl(sourceDs, targetDs, sourceTable, targetTable, dbType);

        // 2. 执行
        try {
            datasourceService.executeDdl(targetDs, finalSql);
            log.info("同构同步成功:表 {} 已在目标数据源创建", targetTable);
            return SyncTableResult.success(finalSql, "" + targetTable + " 已通过原生SQL同步创建");
        } catch (Exception e) {
            log.error("执行同构建表SQL失败", e);
            return SyncTableResult.failure(finalSql, "执行建表语句失败: " + e.getMessage());
        }
    }

    /**
     * 生成同构同步的 DDL(不执行)
     * <p>
     * 逻辑:
     * - 从源库获取原生 SHOW CREATE TABLE
     * - 替换表名(MySQL/Doris)或 schema.table 引用(PostgreSQL)
     */
    private String generateHomogeneousDdl(
            Datasource sourceDs,
            Datasource targetDs,
            String sourceTable,
            String targetTable,
            DbType dbType) {

        String createSql = datasourceService.getCreateTableSql(sourceDs, sourceTable);
        if (createSql == null || createSql.trim().isEmpty()) {
            throw new SchemaSyncException("无法获取源表 " + sourceTable + " 的建表语句");
        }

        if (dbType == DbType.POSTGRESQL) {
            // PostgreSQL: 替换 "schema"."table" 引用
            String srcSchema = sourceDs.getEffectSchema();
            String tgtSchema = targetDs.getEffectSchema();
            String escapedSrcSchema = Pattern.quote(srcSchema);
            String escapedSrcTable = Pattern.quote(sourceTable);
            String pattern = "\"" + escapedSrcSchema + "\"\\s*\\.\\s*\"" + escapedSrcTable + "\"";
            String replacedSql = createSql.replaceAll(pattern, "\"" + tgtSchema + "\".\"" + targetTable + "\"");
            log.debug("PostgreSQL 同构 DDL 生成:替换表引用 {} -> {}", pattern, "\"" + tgtSchema + "\".\"" + targetTable + "\"");
            return replacedSql;
        } else {
            // MySQL / Doris: 替换反引号中的表名(仅替换完整标识符)
            String regex = "`" + Pattern.quote(sourceTable) + "`";
            String replacedSql = createSql.replaceAll(regex, "`" + targetTable + "`");
            log.debug("MySQL/Doris 同构 DDL 生成:替换表名 {} -> {}", sourceTable, targetTable);
            return replacedSql;
        }
    }

    // ==============================
    // 异构同步:执行 + 生成
    // ==============================

    /**
     * 处理异构同步(如 Doris → MySQL, MySQL → PG 等)
     * <p>
     * 此方法会生成 DDL 并执行。
     * <p>
     * 重要原则:
     * - 从 Doris 同步到 MySQL/PG 时,默认不生成 PRIMARY KEY
     *   (因为 Doris 的 Key 字段是逻辑去重键,不一定适合作为主键)
     * - 主键处理应由用户显式配置(本版本暂不支持,留作扩展点)
     */
    private SyncTableResult handleHeterogeneousSync(
            Datasource sourceDs,
            Datasource targetDs,
            String sourceTable,
            String targetTable,
            DbType sourceType,
            DbType targetType) {

        // 1. 生成完整 DDL(含 PG 注释)
        String fullSql = generateHeterogeneousDdl(sourceDs, targetDs, sourceTable, targetTable, sourceType, targetType);

        // 2. 执行 DDL
        try {
            if (targetType == DbType.POSTGRESQL) {
                // PostgreSQL 需要逐条执行(因包含 COMMENT ON 语句)
                String[] statements = fullSql.split("(?<=;)(?=\\s*(?:--.*?\\n)?\\s*\\S)");
                for (String stmt : statements) {
                    stmt = stmt.trim();
                    // 跳过空语句和注释行
                    if (!stmt.isEmpty() && !stmt.startsWith("--")) {
                        datasourceService.executeDdl(targetDs, stmt);
                    }
                }
            } else {
                // MySQL / Doris 等支持单条多语句或无需分割
                datasourceService.executeDdl(targetDs, fullSql);
            }
            log.info("异构建表成功:表 {} 已在目标数据源创建", targetTable);
            return SyncTableResult.success(fullSql, "" + targetTable + " 已成功创建于目标数据源");
        } catch (Exception e) {
            log.error("异构建表失败", e);
            return SyncTableResult.failure(fullSql, "建表失败: " + e.getMessage());
        }
    }

    /**
     * 生成异构同步的 DDL(不执行)
     * <p>
     * 步骤:
     * 1. 获取源表字段元数据
     * 2. 获取表注释
     * 3. 调用 DdlBuilderUtils 生成基础 CREATE TABLE
     * 4. 对 PostgreSQL,追加 COMMENT ON 语句
     */
    private String generateHeterogeneousDdl(
            Datasource sourceDs,
            Datasource targetDs,
            String sourceTable,
            String targetTable,
            DbType sourceType,
            DbType targetType) {

        // 1. 获取源表字段
        List<Map<String, Object>> columns = datasourceService.getTableColumns(sourceDs, sourceTable);
        if (columns == null || columns.isEmpty()) {
            throw new SchemaSyncException("源表 " + sourceTable + " 不存在或无字段");
        }

        // 2. 获取表注释
        String tableComment = datasourceService.getTableComment(sourceDs, sourceTable);

        // 3. 生成基础 CREATE TABLE 语句(DdlBuilderUtils 内部处理类型映射、默认值、注释位置等)
        String createSql = DdlBuilderUtils.buildCreateTableSql(
                columns,
                sourceType,
                targetType,
                targetTable,
                targetDs.getSchemaName(),
                tableComment,
                false  // 异构路径由本服务自己追加 PostgreSQL 注释
        );

        // 4. 构建完整脚本(含 PG 注释)
        StringBuilder fullSql = new StringBuilder(createSql);

        if (targetType == DbType.POSTGRESQL) {
            List<String> commentStatements = buildPostgreSqlCommentStatements(
                    targetDs, targetTable, tableComment, columns);
            if (!commentStatements.isEmpty()) {
                fullSql.append("\n\n-- PostgreSQL 字段注释:\n")
                        .append(String.join("\n", commentStatements));
            }
        }

        return fullSql.toString();
    }

    // ==============================
    // PostgreSQL 注释辅助方法
    // ==============================

    /**
     * 构建 PostgreSQL 的 COMMENT ON 语句列表
     * <p>
     * 安全措施:
     * - 使用 escapeSingleQuote 转义单引号(PostgreSQL 注释用单引号包裹)
     * - 严格校验 schema/table/column 名称(已由 TableNameValidator 保证)
     */
    private List<String> buildPostgreSqlCommentStatements(
            Datasource targetDs,
            String targetTable,
            String tableComment,
            List<Map<String, Object>> columns) {

        List<String> statements = new ArrayList<>();
        String schema = targetDs.getEffectSchema();

        // 表注释
        if (isNotBlank(tableComment)) {
            String safeComment = escapeSingleQuote(tableComment);
            statements.add(String.format(
                    "COMMENT ON TABLE \"%s\".\"%s\" IS '%s';",
                    schema, targetTable, safeComment
            ));
        }

        // 字段注释
        for (Map<String, Object> col : columns) {
            String colName = (String) col.get("name");
            String comment = (String) col.getOrDefault("comment", "");
            if (isNotBlank(comment)) {
                String safeComment = escapeSingleQuote(comment);
                statements.add(String.format(
                        "COMMENT ON COLUMN \"%s\".\"%s\".\"%s\" IS '%s';",
                        schema, targetTable, colName, safeComment
                ));
            }
        }
        return statements;
    }

    /**
     * 判断字符串是否非空(非 null 且去除空格后长度 > 0)
     */
    private boolean isNotBlank(String str) {
        return str != null && !str.trim().isEmpty();
    }

    /**
     * 转义 PostgreSQL 注释中的单引号:' → ''
     */
    private String escapeSingleQuote(String input) {
        return input.replace("'", "''");
    }
}

utils目录

package com.bigdata.datasource.utils;

public enum DbType {
    MYSQL, DORIS, POSTGRESQL;

    public static DbType fromString(String type) {
        if (type == null) throw new IllegalArgumentException("dbType 不能为空");
        switch (type.toLowerCase()) {
            case "mysql":
                return MYSQL;
            case "doris":
                return DORIS;
            case "postgresql":
                return POSTGRESQL;
            default:
                throw new IllegalArgumentException("不支持的数据库类型:" + type);
        }
    }
}
package com.bigdata.datasource.utils;


import java.util.*;
import java.util.stream.Collectors;

public class DdlBuilderUtils {

    /**
     * 构建目标数据库的 CREATE TABLE 语句
     *
     * @param columns         字段列表(每个字段为 Map,含 name/type/nullable/default/comment/is_primary_key)
     * @param sourceDbType    源数据库类型
     * @param targetDbType    目标数据库类型
     * @param targetTable     目标表名
     * @param targetSchema    目标 schema(仅 PostgreSQL 使用)
     * @param tableComment    表注释
     * @param includeComments 是否在 SQL 中包含 COMMENT ON 语句(仅 PostgreSQL 同构路径使用)
     * @return 完整的 CREATE TABLE SQL
     */
    public static String buildCreateTableSql(
            List<Map<String, Object>> columns,
            DbType sourceDbType,
            DbType targetDbType,
            String targetTable,
            String targetSchema,
            String tableComment,
            boolean includeComments
    ) {
        // 主键字段必须 NOT NULL(安全兜底)
        for (Map<String, Object> col : columns) {
            if (Boolean.TRUE.equals(col.get("is_primary_key"))) {
                col.put("nullable", false);
            }
        }

        List<String> pkColumns = columns.stream()
                .filter(col -> Boolean.TRUE.equals(col.get("is_primary_key")))
                .map(col -> (String) col.get("name"))
                .collect(Collectors.toList());

        // ========== MySQL / PostgreSQL ==========
        if (targetDbType == DbType.MYSQL || targetDbType == DbType.POSTGRESQL) {
            List<String> columnDefs = new ArrayList<>();
            for (Map<String, Object> col : columns) {
                String def = buildColumnDef(col, sourceDbType, targetDbType, false);
                columnDefs.add(def);
            }

            if (targetDbType == DbType.MYSQL) {
                StringBuilder sql = new StringBuilder();
                sql.append("CREATE TABLE `").append(targetTable).append("` (\n    ")
                        .append(String.join(",\n    ", columnDefs));

                if (!pkColumns.isEmpty()) {
                    String pkClause = pkColumns.stream()
                            .map(name -> "`" + name + "`")
                            .collect(Collectors.joining(", "));
                    sql.append(",\n    PRIMARY KEY (").append(pkClause).append(")");
                }
                sql.append("\n)");

                if (tableComment != null && !tableComment.trim().isEmpty()) {
                    String escaped = tableComment.replace("'", "\\'");
                    sql.append(" COMMENT='").append(escaped).append("'");
                }
                sql.append(";");
                return sql.toString();

            } else { // PostgreSQL
                String schema = Optional.ofNullable(targetSchema).orElse("public");
                StringBuilder sql = new StringBuilder();
                sql.append("CREATE TABLE \"").append(schema).append("\".\"").append(targetTable).append("\" (\n    ")
                        .append(String.join(",\n    ", columnDefs));

                if (!pkColumns.isEmpty()) {
                    String pkClause = pkColumns.stream()
                            .map(name -> "\"" + name + "\"")
                            .collect(Collectors.joining(", "));
                    sql.append(",\n    PRIMARY KEY (").append(pkClause).append(")");
                }
                sql.append("\n);");

                if (includeComments) {
                    List<String> statements = new ArrayList<>();
                    statements.add(sql.toString());

                    // 表注释
                    if (tableComment != null && !tableComment.trim().isEmpty()) {
                        String escaped = tableComment.replace("'", "''");
                        statements.add(String.format(
                                "COMMENT ON TABLE \"%s\".\"%s\" IS '%s';",
                                schema, targetTable, escaped
                        ));
                    }

                    // 字段注释
                    for (Map<String, Object> col : columns) {
                        String comment = (String) col.getOrDefault("comment", "");
                        if (comment != null && !comment.trim().isEmpty()) {
                            String escaped = comment.replace("'", "''");
                            statements.add(String.format(
                                    "COMMENT ON COLUMN \"%s\".\"%s\".\"%s\" IS '%s';",
                                    schema, targetTable, col.get("name"), escaped
                            ));
                        }
                    }
                    return String.join("\n", statements);
                } else {
                    return sql.toString();
                }
            }
        }

        // ========== Doris ==========
        boolean hasExplicitPk = columns.stream().anyMatch(col -> Boolean.TRUE.equals(col.get("is_primary_key")));
        List<Map<String, Object>> keyColumns, valueColumns;
        if (hasExplicitPk) {
            keyColumns = columns.stream().filter(col -> Boolean.TRUE.equals(col.get("is_primary_key"))).collect(Collectors.toList());
            valueColumns = columns.stream().filter(col -> !Boolean.TRUE.equals(col.get("is_primary_key"))).collect(Collectors.toList());
        } else {
            keyColumns = Arrays.asList(columns.get(0));
            valueColumns = columns.subList(1, columns.size());
        }

        List<Map<String, Object>> reordered = new ArrayList<>(keyColumns);
        reordered.addAll(valueColumns);
        List<String> keyNames = keyColumns.stream().map(col -> (String) col.get("name")).collect(Collectors.toList());
        String firstKey = keyNames.get(0);

        List<String> columnDefs = new ArrayList<>();
        for (Map<String, Object> col : reordered) {
            boolean isKey = keyNames.contains(col.get("name"));
            String def = buildColumnDef(col, sourceDbType, DbType.DORIS, isKey);
            columnDefs.add(def);
        }

        StringBuilder sql = new StringBuilder();
        sql.append("CREATE TABLE `").append(targetTable).append("` (\n    ")
                .append(String.join(",\n    ", columnDefs))
                .append("\n)");

        if (hasExplicitPk) {
            String pkClause = keyNames.stream().map(name -> "`" + name + "`").collect(Collectors.joining(", "));
            sql.append("\nUNIQUE KEY(").append(pkClause).append(")");
        } else {
            sql.append("\nDUPLICATE KEY(`").append(firstKey).append("`)");
        }

        if (tableComment != null && !tableComment.trim().isEmpty()) {
            String escaped = tableComment.replace("'", "\\'");
            sql.append(" COMMENT \"").append(escaped).append("\"");
        }

        sql.append("\nDISTRIBUTED BY HASH(`").append(firstKey).append("`) BUCKETS 32\n")
                .append("PROPERTIES(\"replication_num\" = \"1\");");

        return sql.toString();
    }

    /**
     * 映射默认值,确保目标数据库兼容
     */
    private static String mapDefaultValue(String defaultValue, DbType sourceDbType, DbType targetDbType, String columnType) {
        if (defaultValue == null) return null;

        String dv = defaultValue.trim();
        String dvLower = dv.toLowerCase();

        // NULL
        if ("null".equals(dvLower)) {
            return "NULL";
        }

        // 时间函数
        if ("now()".equals(dvLower) || "current_timestamp".equals(dvLower)) {
            if (targetDbType == DbType.MYSQL || targetDbType == DbType.POSTGRESQL || targetDbType == DbType.DORIS) {
                return "CURRENT_TIMESTAMP";
            }
        }

        // Doris 只支持常量
        if (targetDbType == DbType.DORIS) {
            boolean isStringType = columnType != null &&
                    (columnType.toLowerCase().contains("char") ||
                            columnType.toLowerCase().contains("varchar") ||
                            columnType.toLowerCase().contains("string") ||
                            columnType.toLowerCase().contains("text"));

            if (isStringType) {
                if (dv.startsWith("'") && dv.endsWith("'")) {
                    return dv;
                } else {
                    return "'" + dv + "'";
                }
            } else {
                // 数值类型:只允许纯数字(含小数、负号)
                String clean = dv.replace(".", "").replace("-", "");
                if (clean.chars().allMatch(Character::isDigit)) {
                    return dv;
                } else {
                    return null; // 丢弃非法默认值
                }
            }
        }

        // 默认:原样返回
        return dv;
    }

    /**
     * 构建单个字段定义
     */
    private static String buildColumnDef(Map<String, Object> col, DbType sourceDbType, DbType targetDbType, boolean isKey) {
        String name = (String) col.get("name");
        String srcType = (String) col.get("type");
        Boolean nullable = (Boolean) col.get("nullable");
        String defaultValue = (String) col.get("default");
        String comment = (String) col.getOrDefault("comment", "");

        // 1. 类型映射
        String mappedType = TypeMapper.mapColumnType(srcType, sourceDbType, targetDbType);

        // 2. 字段名引号
        String colDef;
        if (targetDbType == DbType.MYSQL || targetDbType == DbType.DORIS) {
            colDef = "`" + name + "` " + mappedType;
        } else { // PostgreSQL
            colDef = "\"" + name + "\" " + mappedType;
        }

        // 3. 默认值
        String mappedDefault = mapDefaultValue(defaultValue, sourceDbType, targetDbType, mappedType);
        if (mappedDefault != null) {
            if ("NULL".equals(mappedDefault)) {
                if (targetDbType == DbType.MYSQL || targetDbType == DbType.DORIS) {
                    if (!isKey) {
                        colDef += " DEFAULT NULL";
                    }
                }
            } else {
                colDef += " DEFAULT " + mappedDefault;
            }
        }

        // 4. 非空约束
        if (isKey) {
            colDef += " NOT NULL";
        } else if (nullable != null && !nullable) {
            // Doris 规则:有默认值的非主键字段不能写 NOT NULL
            if (targetDbType == DbType.DORIS && mappedDefault != null) {
                // skip NOT NULL
            } else {
                colDef += " NOT NULL";
            }
        }

        // 5. 内联注释(仅 MySQL/Doris)
        if ((targetDbType == DbType.MYSQL || targetDbType == DbType.DORIS) && comment != null && !comment.trim().isEmpty()) {
            String escaped = comment.replace("'", "\\'");
            colDef += " COMMENT '" + escaped + "'";
        }

        return colDef;
    }
}
package com.bigdata.datasource.utils;

public class SchemaSyncException extends RuntimeException {
    public SchemaSyncException(String message) {
        super(message);
    }
    public SchemaSyncException(String message, Throwable cause) {
        super(message, cause);
    }
}
package com.bigdata.datasource.utils;

import lombok.Data;

@Data
public class SyncTableResult {
    private final boolean success;
    private final String sql;
    private final String message;
    private final String error;

    private SyncTableResult(boolean success, String sql, String message, String error) {
        this.success = success;
        this.sql = sql;
        this.message = message;
        this.error = error;
    }

    public static SyncTableResult success(String sql, String message) {
        return new SyncTableResult(true, sql, message, null);
    }

    public static SyncTableResult failure(String sql, String error) {
        return new SyncTableResult(false, sql, null, error);
    }

}
package com.bigdata.datasource.utils;

import java.util.regex.Pattern;

public class TableNameValidator {
    private static final Pattern VALID_NAME = Pattern.compile("^[a-zA-Z_][a-zA-Z0-9_]*$");

    public static String validate(String name) {
        if (name == null || name.trim().isEmpty()) {
            throw new IllegalArgumentException("表名不能为空");
        }
        name = name.trim();
        if (!VALID_NAME.matcher(name).matches()) {
            throw new IllegalArgumentException("表名 '" + name + "' 不合法:必须以字母或下划线开头,仅包含字母、数字、下划线");
        }
        return name;
    }
}
package com.bigdata.datasource.utils;

import java.util.HashMap;
import java.util.Map;
import java.util.function.BiFunction;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
 * 数据库字段类型映射器
 * 支持 MySQL / Doris / PostgreSQL 之间的类型转换
 */
public class TypeMapper {

    // ========== 类型映射规则表 ==========
    // 结构: { 标准化基础类型 -> { 目标数据库 -> 映射函数 } }
    // 映射函数: (length, precScale) -> 目标类型字符串
    private static final Map<String, Map<DbType, BiFunction<Integer, String, String>>> TYPE_MAPPING = new HashMap<>();

    static {
        // 字符串类型
        putMapping("varchar", DbType.MYSQL, (len, ps) -> len != null ? "varchar(" + len + ")" : "varchar(255)");
        putMapping("varchar", DbType.DORIS, (len, ps) -> "varchar(" + Math.min(len != null ? len : 65533, 65533) + ")");
        putMapping("varchar", DbType.POSTGRESQL, (len, ps) -> len != null ? "varchar(" + len + ")" : "varchar");

        putMapping("char", DbType.MYSQL, (len, ps) -> "char(" + (len != null ? len : 1) + ")");
        putMapping("char", DbType.DORIS, (len, ps) -> "char(" + (len != null ? len : 1) + ")");
        putMapping("char", DbType.POSTGRESQL, (len, ps) -> "char(" + (len != null ? len : 1) + ")");

        // 整数类型
        putMapping("int", DbType.MYSQL, (len, ps) -> "int");
        putMapping("int", DbType.DORIS, (len, ps) -> "int");
        putMapping("int", DbType.POSTGRESQL, (len, ps) -> "int");

        putMapping("int_unsigned", DbType.MYSQL, (len, ps) -> "int unsigned");
        putMapping("int_unsigned", DbType.DORIS, (len, ps) -> "bigint"); // Doris 无 unsigned
        putMapping("int_unsigned", DbType.POSTGRESQL, (len, ps) -> "bigint");

        putMapping("bigint", DbType.MYSQL, (len, ps) -> "bigint");
        putMapping("bigint", DbType.DORIS, (len, ps) -> "bigint");
        putMapping("bigint", DbType.POSTGRESQL, (len, ps) -> "bigint");

        putMapping("bigint_unsigned", DbType.MYSQL, (len, ps) -> "bigint unsigned");
        putMapping("bigint_unsigned", DbType.DORIS, (len, ps) -> "bigint");
        putMapping("bigint_unsigned", DbType.POSTGRESQL, (len, ps) -> "numeric(20,0)");

        putMapping("smallint", DbType.MYSQL, (len, ps) -> "smallint");
        putMapping("smallint", DbType.DORIS, (len, ps) -> "smallint");
        putMapping("smallint", DbType.POSTGRESQL, (len, ps) -> "smallint");

        putMapping("tinyint", DbType.MYSQL, (len, ps) -> "tinyint(1)");
        putMapping("tinyint", DbType.DORIS, (len, ps) -> "boolean");
        putMapping("tinyint", DbType.POSTGRESQL, (len, ps) -> "boolean");

        // 浮点/定点类型
        putMapping("double", DbType.MYSQL, (len, ps) -> "double");
        putMapping("double", DbType.DORIS, (len, ps) -> "double");
        putMapping("double", DbType.POSTGRESQL, (len, ps) -> "double precision");

        putMapping("float", DbType.MYSQL, (len, ps) -> "float");
        putMapping("float", DbType.DORIS, (len, ps) -> "float");
        putMapping("float", DbType.POSTGRESQL, (len, ps) -> "real");

        putMapping("decimal", DbType.MYSQL, (len, ps) -> "decimal(" + (ps != null ? ps : "10,0") + ")");
        putMapping("decimal", DbType.DORIS, (len, ps) -> "decimal(" + (ps != null ? ps : "10,0") + ")");
        putMapping("decimal", DbType.POSTGRESQL, (len, ps) -> "numeric(" + (ps != null ? ps : "10,0") + ")");

        // 时间类型
        putMapping("timestamp", DbType.MYSQL, (len, ps) -> "timestamp");
        putMapping("timestamp", DbType.DORIS, (len, ps) -> "datetime");
        putMapping("timestamp", DbType.POSTGRESQL, (len, ps) -> "timestamp without time zone");

        putMapping("datetime", DbType.MYSQL, (len, ps) -> "datetime");
        putMapping("datetime", DbType.DORIS, (len, ps) -> "datetime");
        putMapping("datetime", DbType.POSTGRESQL, (len, ps) -> "timestamp without time zone");

        putMapping("date", DbType.MYSQL, (len, ps) -> "date");
        putMapping("date", DbType.DORIS, (len, ps) -> "date");
        putMapping("date", DbType.POSTGRESQL, (len, ps) -> "date");

        // 大对象/特殊类型
        putMapping("text", DbType.MYSQL, (len, ps) -> "text");
        putMapping("text", DbType.DORIS, (len, ps) -> "string");
        putMapping("text", DbType.POSTGRESQL, (len, ps) -> "text");

        putMapping("mediumtext", DbType.MYSQL, (len, ps) -> "mediumtext");
        putMapping("mediumtext", DbType.DORIS, (len, ps) -> "string");
        putMapping("mediumtext", DbType.POSTGRESQL, (len, ps) -> "text");

        putMapping("longtext", DbType.MYSQL, (len, ps) -> "longtext");
        putMapping("longtext", DbType.DORIS, (len, ps) -> "string");
        putMapping("longtext", DbType.POSTGRESQL, (len, ps) -> "text");

        putMapping("json", DbType.MYSQL, (len, ps) -> "json");
        putMapping("json", DbType.DORIS, (len, ps) -> "string");
        putMapping("json", DbType.POSTGRESQL, (len, ps) -> "jsonb");

        putMapping("enum", DbType.MYSQL, (len, ps) -> "varchar(255)");
        putMapping("enum", DbType.DORIS, (len, ps) -> "varchar(65533)");
        putMapping("enum", DbType.POSTGRESQL, (len, ps) -> "varchar(255)");

        putMapping("set", DbType.MYSQL, (len, ps) -> "varchar(255)");
        putMapping("set", DbType.DORIS, (len, ps) -> "varchar(65533)");
        putMapping("set", DbType.POSTGRESQL, (len, ps) -> "varchar(255)");

        putMapping("boolean", DbType.MYSQL, (len, ps) -> "tinyint(1)");
        putMapping("boolean", DbType.DORIS, (len, ps) -> "boolean");
        putMapping("boolean", DbType.POSTGRESQL, (len, ps) -> "boolean");
    }

    private static void putMapping(String baseType, DbType targetDb, BiFunction<Integer, String, String> mapper) {
        TYPE_MAPPING.computeIfAbsent(baseType, k -> new HashMap<>()).put(targetDb, mapper);
    }

    /**
     * 解析字段类型字符串,提取基础类型、长度、精度等信息
     *
     * @param colType   原始类型字符串,如 "varchar(255)", "int unsigned"
     * @param srcDbType 源数据库类型
     * @return new Object[]{baseType, length, precScale}
     */
    public static Object[] parseColumnType(String colType, DbType srcDbType) {
        if (colType == null) colType = "";
        colType = colType.trim().toLowerCase();
        srcDbType = srcDbType != null ? srcDbType : DbType.MYSQL;

        // ========== PostgreSQL 特殊处理 ==========
        if (srcDbType == DbType.POSTGRESQL) {
            // numeric / decimal
            if (colType.startsWith("numeric") || colType.startsWith("decimal")) {
                Matcher m = Pattern.compile("\\((\\d+),\\s*(\\d+)\\)").matcher(colType);
                String precScale = m.find() ? m.group(1) + "," + m.group(2) : null;
                return new Object[]{"decimal", null, precScale};
            }

            // varchar: character varying(N) 或 varchar(N)
            Matcher varcharMatch = Pattern.compile("^(?:character varying|varchar)\\s*\\((\\d+)\\)").matcher(colType);
            if (varcharMatch.find()) {
                return new Object[]{"varchar", Integer.valueOf(varcharMatch.group(1)), null};
            }
            if ("character varying".equals(colType) || "varchar".equals(colType)) {
                return new Object[]{"varchar", null, null};
            }

            // char: character(N) 或 char(N)
            Matcher charMatch = Pattern.compile("^(?:character|char)\\s*\\((\\d+)\\)").matcher(colType);
            if (charMatch.find()) {
                return new Object[]{"char", Integer.valueOf(charMatch.group(1)), null};
            }
            if ("character".equals(colType) || "char".equals(colType)) {
                return new Object[]{"char", null, null};
            }

            // 类型标准化
            Map<String, String> pgMap = new HashMap<>();
            pgMap.put("integer", "int");
            pgMap.put("bigint", "bigint");
            pgMap.put("double precision", "double");
            pgMap.put("timestamp without time zone", "timestamp");
            pgMap.put("timestamp with time zone", "timestamptz");
            pgMap.put("boolean", "boolean");
            pgMap.put("text", "text");

            String baseType = pgMap.getOrDefault(colType, colType);
            return new Object[]{baseType, null, null};
        }

        // ========== MySQL / Doris 处理 ==========
        Integer length = null;
        String precScale = null;
        String basePart;

        if (colType.contains("(")) {
            String[] parts = colType.split("\\(", 2);
            basePart = parts[0].trim();
            String inner = parts[1].replaceAll("\\)$", "").trim();

            if (inner.contains(",")) {
                precScale = inner; // e.g., "10,2"
            } else {
                try {
                    length = Integer.valueOf(inner);
                } catch (NumberFormatException ignored) {
                }
            }
        } else {
            basePart = colType;
        }

        // 提取基础类型(第一个词)
        String baseType = basePart.split("\\s+")[0];

        // 处理 unsigned
        if (srcDbType == DbType.MYSQL && colType.contains("unsigned")) {
            switch (baseType) {
                case "tinyint":
                    baseType = "smallint";
                    break;
                case "smallint":
                    baseType = "int";
                    break;
                case "int":
                    baseType = "int_unsigned";
                    break;
                case "bigint":
                    baseType = "bigint_unsigned";
                    break;
                case "mediumint":
                    baseType = "int";
                    break;
            }
        }

        return new Object[]{baseType, length, precScale};
    }

    /**
     * 映射字段类型
     *
     * @param srcTypeStr   源类型字符串,如 "varchar(255)"
     * @param srcDbType    源数据库类型
     * @param targetDbType 目标数据库类型
     * @return 目标类型字符串,如 "string" (Doris)
     */
    public static String mapColumnType(String srcTypeStr, DbType srcDbType, DbType targetDbType) {
        if (srcTypeStr == null || srcTypeStr.trim().isEmpty()) {
            return targetDbType == DbType.DORIS ? "string" : "text";
        }

        // 1. 解析源类型
        Object[] parsed = parseColumnType(srcTypeStr, srcDbType);
        String baseType = (String) parsed[0];
        Integer length = (Integer) parsed[1];
        String precScale = (String) parsed[2];

        // 2. 查找映射规则
        Map<DbType, BiFunction<Integer, String, String>> targetMappers = TYPE_MAPPING.get(baseType);
        if (targetMappers != null) {
            BiFunction<Integer, String, String> mapper = targetMappers.get(targetDbType);
            if (mapper != null) {
                return mapper.apply(length, precScale);
            }
        }

        // 3. 无匹配:安全兜底
        return targetDbType == DbType.DORIS ? "string" : "text";
    }
}

config目录

package com.bigdata.datasource.config;

import org.springframework.context.annotation.Configuration;

@Configuration
public class DynamicDataSourceConfig {
}

主程序

package com.bigdata.datasource;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class MainApplication {
    public static void main(String[] args) {
        SpringApplication.run(MainApplication.class, args);
    }
}

测试:

package com.bigdata.datasource.test;

import com.bigdata.datasource.entity.Datasource;
import com.bigdata.datasource.mapper.DatasourceMapper;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
public class TestDatasourceMapper {
    @Autowired
    private DatasourceMapper datasourceMapper;

    @Test
    public void testSelect() {
        Datasource dataSource = datasourceMapper.selectById("1001");
        System.out.println(dataSource);
    }
}
package com.bigdata.datasource.test;

import com.bigdata.datasource.entity.Datasource;
import com.bigdata.datasource.mapper.DatasourceMapper;
import com.bigdata.datasource.service.DatasourceService;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.util.List;
import java.util.Map;

@SpringBootTest
public class TestDatasourceService {
    @Autowired
    private DatasourceMapper datasourceMapper;

    @Autowired
    private DatasourceService datasourceService;

    public Datasource getDataSource(String datasourceId) {
        Datasource ds = datasourceMapper.selectById(datasourceId);
        return ds;
    }

    @Test
    public void test() {
        Datasource ds = getDataSource("1001");
        List<String> allTables = datasourceService.getAllTables(ds);
        System.out.println(allTables);
    }

    @Test
    public void test1() {
        Datasource ds = getDataSource("3001");
        List<Map<String, Object>> columns = datasourceService.getTableColumns(ds, "test_user");
        for (Map<String, Object> column : columns) {
            System.out.println(column);
        }
    }

    @Test
    public void test2() {
        Datasource ds = getDataSource("3001");
        String comment = datasourceService.getTableComment(ds, "test_user");
        System.out.println(comment);
    }

    @Test
    public void test3() {
        Datasource ds = getDataSource("1001");
        List<Map<String, Object>> list = datasourceService.executeQuery(ds, "SELECT * FROM test_user WHERE name = ? AND age = ?", "bob", 18);
        for (Map<String, Object> map : list) {
            System.out.println(map);
        }
    }

    @Test
    public void test4() {
        Datasource ds = getDataSource("2001");
        String sql = datasourceService.getCreateTableSql(ds, "test_mysql2doris");
        System.out.println(sql);
    }
}
package com.bigdata.datasource.test;

import com.bigdata.datasource.entity.Datasource;
import com.bigdata.datasource.mapper.DatasourceMapper;
import com.bigdata.datasource.service.SchemaSyncService;
import com.bigdata.datasource.utils.SyncTableResult;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.util.Map;

@SpringBootTest
public class TestSchemaSyncService {
    @Autowired
    private DatasourceMapper datasourceMapper;

    @Autowired
    private SchemaSyncService schemaSyncService;

    public Datasource getDataSource(String datasourceId) {
        Datasource ds = datasourceMapper.selectById(datasourceId);
        return ds;
    }

    @Test
    public void test() {
        Datasource ds1 = getDataSource("1001"); // mysql
        Datasource ds2 = getDataSource("1002"); // mysql
        Datasource ds3 = getDataSource("3001"); // pg
        Datasource ds4 = getDataSource("2001"); // doris
        // mysql -> mysql
//        SyncTableResult syncTableResult = schemaSyncService.syncTable(ds1, ds2, "test_user", "test_mysql2mysql");

        // mysql -> pg
//        SyncTableResult syncTableResult = schemaSyncService.syncTable(ds1, ds3, "test_user", "test_mysql2pg");

        // mysql -> doris
//        SyncTableResult syncTableResult = schemaSyncService.syncTable(ds1, ds4, "test_user", "test_mysql2doris");

        // pg -> pg
//        SyncTableResult syncTableResult = schemaSyncService.syncTable(ds3, ds3, "test_mysql2pg", "test_pg2pg");

        // pg -> mysql
//        SyncTableResult syncTableResult = schemaSyncService.syncTable(ds3, ds1, "test_mysql2pg", "test_pg2mysql");

        // pg -> doris
//        SyncTableResult syncTableResult = schemaSyncService.syncTable(ds3, ds4, "test_mysql2pg", "test_pg2doris");

        // doris -> doris
//        SyncTableResult syncTableResult = schemaSyncService.syncTable(ds4, ds4, "test_mysql2doris", "test_doris2doris");

        // doris -> mysql
//        SyncTableResult syncTableResult = schemaSyncService.syncTable(ds4, ds1, "test_mysql2doris", "test_doris2mysql");

        // doris -> pg
        SyncTableResult syncTableResult = schemaSyncService.syncTable(ds4, ds3, "test_mysql2doris", "test_doris2pg");


        System.out.println(syncTableResult);
    }
    
    @Test
    public void test2() {
        Datasource ds1 = getDataSource("1001"); // mysql
        Datasource ds2 = getDataSource("1002"); // mysql
        Datasource ds3 = getDataSource("3001"); // pg
        Datasource ds4 = getDataSource("2001"); // doris
        String ddl = schemaSyncService.generateCreateTableDdl(ds4, ds3, "test_mysql2doris", "test_doris2mysql");
        System.out.println(ddl);
    }
}