文章
spring-boot实现多数据源管理
技术栈:spring-boot + mybatis-plus + dynamic-datasource
实现动态添加数据源,并可执行query操作
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.18</version>
</parent>
<groupId>com.bigdata</groupId>
<artifactId>bigdata-center</artifactId>
<version>1.0</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<mybatis-plus.version>3.5.6</mybatis-plus.version>
<dynamic-ds.version>4.2.0</dynamic-ds.version>
<postgresql.version>42.7.3</postgresql.version>
<mysql.version>8.0.33</mysql.version>
<commons-codec.version>1.15</commons-codec.version>
</properties>
<dependencies>
<!-- web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- mybatis plus -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>${mybatis-plus.version}</version>
</dependency>
<!-- dynamic datasource -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>dynamic-datasource-spring-boot-starter</artifactId>
<version>${dynamic-ds.version}</version>
</dependency>
<!-- mysql driver -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.version}</version>
</dependency>
<!-- postgresql -->
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>${postgresql.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!-- Apache commons codec 用于MD5 -->
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>${commons-codec.version}</version>
</dependency>
<!-- DevTools 开发时热部署 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<!-- Test -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>resources/application.yml
server:
port: 8080
servlet:
context-path: /
spring:
application:
name: datasource-manager
# 默认数据源(用于存储 datasource 表)
datasource:
dynamic:
primary: default
strict: true # 允许动态添加不存在的 datasource
datasource:
default:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/django_datacenter?useUnicode=true&characterEncoding=utf8&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true&useSSL=false
username: root
password: root
hikari:
connection-timeout: 30000
idle-timeout: 600000
max-lifetime: 1800000
maximum-pool-size: 20
minimum-idle: 5
# 关闭 banner(可选)
main:
banner-mode: "off"
# MyBatis-Plus 配置
mybatis-plus:
configuration:
map-underscore-to-camel-case: true
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl # 开发时可开启 SQL 日志
mapper-locations: classpath:mapper/*.xml
global-config:
db-config:
id-type: none # datasource.id 为字符串主键
table-underline: true
# 日志配置(可选)
logging:
level:
com.example.datasource: debug
com.baomidou.dynamic: inforesources/mapper/DatasourceMapper.xml
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper
PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.bigdata.datasource.mapper.DatasourceMapper">
<resultMap id="BaseResultMap" type="com.bigdata.datasource.entity.Datasource">
<id property="id" column="id"/>
<result property="sourceName" column="source_name"/>
<result property="host" column="host"/>
<result property="username" column="username"/>
<result property="password" column="password"/>
<result property="port" column="port"/>
<result property="defaultDatabase" column="default_database"/>
<result property="dbAlias" column="db_alias"/>
<result property="dbType" column="db_type"/>
<result property="deleteFlag" column="delete_flag"/>
<result property="createTime" column="create_time"/>
<result property="updateTime" column="update_time"/>
<result property="schemaName" column="schema_name"/>
</resultMap>
<sql id="Base_Column_List">
id
,source_name,host,username,password,port,
default_database,db_alias,db_type,delete_flag,create_time,
update_time,schema_name
</sql>
</mapper>
entity目录
package com.bigdata.datasource.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import com.bigdata.datasource.utils.DbType;
import lombok.Data;
import java.time.LocalDateTime;
import java.util.Objects;
@Data
@TableName("datasource")
public class Datasource {
@TableId(type = IdType.NONE)
private String id;
private String sourceName;
private String host;
private String username;
private String password;
private Integer port;
private String defaultDatabase;
private String schemaName; // postgresql 专用
private String dbAlias; // 唯一别名 用于动态数据源 key
private String dbType; // mysql/doris/postgresql
private String deleteFlag = "0";
private LocalDateTime createTime;
private LocalDateTime updateTime;
public String getEffectSchema() {
DbType type = DbType.fromString(dbType);
if (type == DbType.POSTGRESQL) {
return Objects.toString(schemaName, "public");
}
return null;
}
}mapper目录
package com.bigdata.datasource.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.bigdata.datasource.entity.Datasource;
import org.apache.ibatis.annotations.Mapper;
@Mapper
public interface DatasourceMapper extends BaseMapper<Datasource> {
}
service目录
package com.bigdata.datasource.service;
import com.baomidou.mybatisplus.extension.service.IService;
import com.bigdata.datasource.entity.Datasource;
import javax.sql.DataSource;
import java.util.List;
import java.util.Map;
public interface DatasourceService extends IService<Datasource> {
DataSource registerDataSource(Datasource ds);
void unregisterDataSource(String alias);
List<String> getAllTables(Datasource ds);
List<Map<String, Object>> getTableColumns(Datasource ds, String tableName);
String getCreateTableSql(Datasource ds, String tableName);
String getTableComment(Datasource ds, String tableName);
List<Map<String, Object>> executeQuery(Datasource ds, String sql);
List<Map<String, Object>> executeQuery(Datasource ds, String sql, Object... params);
int executeUpdate(Datasource ds, String sql);
int executeUpdate(Datasource ds, String sql, Object... params);
void executeDdl(Datasource ds, String sql);
void executeDdl(Datasource ds, String sql, Object... params);
}package com.bigdata.datasource.service;
import com.bigdata.datasource.entity.Datasource;
import com.bigdata.datasource.utils.SyncTableResult;
import java.util.Map;
public interface SchemaSyncService {
// 同步表结构:从源数据源 → 目标数据源
SyncTableResult syncTable(Datasource sourceDs, Datasource targetDs, String sourceTable, String targetTable);
// 生成目标数据源的建表 DDL(不执行)
String generateCreateTableDdl(Datasource sourceDs, Datasource targetDs, String sourceTable, String targetTable);
}
service/impl/目录
package com.bigdata.datasource.service.impl;
import com.baomidou.dynamic.datasource.DynamicRoutingDataSource;
import com.baomidou.dynamic.datasource.creator.DataSourceCreator;
import com.baomidou.dynamic.datasource.creator.DataSourceProperty;
import com.baomidou.dynamic.datasource.creator.hikaricp.HikariCpConfig;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.bigdata.datasource.entity.Datasource;
import com.bigdata.datasource.mapper.DatasourceMapper;
import com.bigdata.datasource.service.DatasourceService;
import com.bigdata.datasource.utils.DbType;
import com.bigdata.datasource.utils.DdlBuilderUtils;
import com.bigdata.datasource.utils.TableNameValidator;
import org.apache.commons.codec.digest.DigestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
import javax.sql.DataSource;
import java.sql.*;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
/**
* 数据源服务实现类
* <p>
* 职责:
* - 动态注册/注销数据源到 DynamicRoutingDataSource
* - 提供统一的 SQL 执行接口(查询、DML、DDL)
* - 获取数据库元数据(表列表、字段信息、建表语句、注释等)
* <p>
* 设计原则:
* - 线程安全:依赖 DynamicRoutingDataSource 内部同步机制
* - 重试机制:对连接失效异常自动重试一次
* - 类型安全:按 DbType 分发不同数据库的元数据查询逻辑
* - 日志完备:关键操作均有 info/debug/error 日志
*/
@Service
public class DataSourceServiceImpl extends ServiceImpl<DatasourceMapper, Datasource> implements DatasourceService {
private static final Logger log = LoggerFactory.getLogger(DataSourceServiceImpl.class);
@Autowired
private DynamicRoutingDataSource dynamicRoutingDataSource;
@Autowired
@Qualifier("hikariDataSourceCreator")
private DataSourceCreator dataSourceCreator;
private final Map<String, Object> registrationLocks = new ConcurrentHashMap<>();
// ========== 数据源注册与连接管理 ==========
/**
* 动态注册数据源到路由容器
*
* @param ds 数据源配置实体
* @return 创建的 DataSource 实例
* @throws IllegalArgumentException 当数据源为空或别名为 'default' 时
*/
@Override
public DataSource registerDataSource(Datasource ds) {
if (ds == null) {
throw new IllegalArgumentException("Datasource must not be null");
}
String alias = generateAlias(ds);
if ("default".equals(alias)) {
throw new IllegalArgumentException("Alias 'default' is reserved for primary datasource and cannot be used");
}
log.debug("准备注册动态数据源: alias={}, host={}:{}, db={}",
alias, ds.getHost(), ds.getPort(), ds.getDefaultDatabase());
// 构建数据源属性
DataSourceProperty property = buildDataSourceProperty(ds);
// 创建并注册
DataSource dataSource = dataSourceCreator.createDataSource(property);
dynamicRoutingDataSource.addDataSource(alias, dataSource);
log.info("动态数据源注册成功: alias={}", alias);
return dataSource;
}
/**
* 从路由容器中移除数据源
*
* @param alias 数据源别名
* @throws IllegalArgumentException 当尝试移除 'default' 数据源时
*/
@Override
public void unregisterDataSource(String alias) {
if ("default".equals(alias)) {
throw new IllegalArgumentException("Cannot remove primary datasource");
}
// 获取与 getConnection 中相同的锁对象
Object lock = registrationLocks.get(alias);
if (lock != null) {
synchronized (lock) {
// 先检查是否存在(避免移除不存在的)
if (dynamicRoutingDataSource.getDataSources().containsKey(alias)) {
dynamicRoutingDataSource.removeDataSource(alias);
log.info("动态数据源已移除: alias={}", alias);
}
}
} else {
// 没有锁,可能是预注册的,直接尝试移除(remove 内部是安全的)
dynamicRoutingDataSource.removeDataSource(alias);
log.debug("尝试移除不存在的数据源: alias={}", alias);
}
}
/**
* 获取数据源连接(带懒加载和重试)
* <p>
* 策略:
* - 先查缓存
* - 不存在则注册(幂等)
* - 返回连接
*/
private Connection getConnection(Datasource ds) throws SQLException {
String alias = generateAlias(ds);
// 第一次检查:直接查底层 map(不会抛异常)
DataSource target = dynamicRoutingDataSource.getDataSources().get(alias);
if (target == null) {
Object lock = registrationLocks.computeIfAbsent(alias, k -> new Object());
synchronized (lock) {
// 双重检查:仍然查 map,不是 getDataSource()
target = dynamicRoutingDataSource.getDataSources().get(alias);
if (target == null) {
target = registerDataSource(ds); // 直接拿到新创建的 DataSource
}
}
}
return target.getConnection();
}
/**
* 将 ResultSet 转换为 List<Map>
*/
private List<Map<String, Object>> resultSetToList(ResultSet rs) throws SQLException {
List<Map<String, Object>> list = new ArrayList<>();
ResultSetMetaData meta = rs.getMetaData();
int cols = meta.getColumnCount();
while (rs.next()) {
HashMap<String, Object> row = new HashMap<>();
for (int i = 1; i <= cols; i++) {
row.put(meta.getColumnLabel(i), rs.getObject(i));
}
list.add(row);
}
return list;
}
// ========== SQL 执行核心逻辑 ==========
/**
* 执行 SQL(带连接失效重试)
*
* @param ds 数据源
* @param sql SQL 语句
* @param params 参数(用于 PreparedStatement)
* @param fetch 是否查询(返回 ResultSet)
* @param isDml 是否 DML(返回影响行数)
* @return 查询结果列表 或 DML 影响行数包装 或 null(DDL)
*/
private List<Map<String, Object>> executeWithRetry(Datasource ds, String sql, Object[] params, boolean fetch, boolean isDml) {
for (int attempt = 0; attempt <= 1; attempt++) { // 最多重试1次
try (Connection conn = getConnection(ds)) {
return executeSqlInternal(conn, sql, params, fetch, isDml);
} catch (SQLException e) {
if (isConnectionError(e) && attempt == 0) {
log.warn("检测到连接失效,尝试重新注册数据源并重试: {}", e.getMessage());
String alias = generateAlias(ds);
unregisterDataSource(alias);
continue; // 重试
}
log.error("SQL 执行失败 (attempt={})", attempt + 1, e);
throw new RuntimeException("SQL 执行失败: " + e.getMessage(), e);
}
}
return null; // 不可达
}
/**
* 判断是否为连接相关异常(基于 JDBC 标准异常分类)
*/
private boolean isConnectionError(SQLException e) {
// 使用 SQLState 和异常类型判断,比字符串匹配更可靠
return e instanceof SQLTransientConnectionException ||
e instanceof SQLNonTransientConnectionException ||
"08".equals(e.getSQLState()) || // SQLState 08xxx 表示连接异常
(e.getErrorCode() == 0 && e.getMessage() != null &&
(e.getMessage().toLowerCase().contains("closed") ||
e.getMessage().toLowerCase().contains("gone away") ||
e.getMessage().toLowerCase().contains("broken pipe") ||
e.getMessage().toLowerCase().contains("connection reset")));
}
/**
* 在已有连接上执行 SQL
*/
private List<Map<String, Object>> executeSqlInternal(Connection conn, String sql, Object[] params, boolean fetch, boolean isDml)
throws SQLException {
try (PreparedStatement ps = conn.prepareStatement(sql)) {
if (params != null) {
for (int i = 0; i < params.length; i++) {
ps.setObject(i + 1, params[i]);
}
}
if (fetch) {
try (ResultSet rs = ps.executeQuery()) {
return resultSetToList(rs);
}
} else if (isDml) {
int count = ps.executeUpdate();
return Arrays.asList(Collections.singletonMap("rowcount", count));
} else {
ps.execute();
return null;
}
}
}
// ========== 元数据查询 ==========
@Override
public List<String> getAllTables(Datasource ds) {
DbType type = DbType.fromString(ds.getDbType());
if (type == DbType.MYSQL || type == DbType.DORIS) {
List<Map<String, Object>> rows = executeWithRetry(ds, "SHOW TABLES", null, true, false);
String key = "Tables_in_" + ds.getDefaultDatabase();
return rows.stream().map(r -> (String) r.get(key)).collect(Collectors.toList());
} else if (type == DbType.POSTGRESQL) {
String schema = ds.getEffectSchema();
List<Map<String, Object>> rows = executeQuery(ds,
"SELECT table_name FROM information_schema.tables WHERE table_schema = ? AND table_type = 'BASE TABLE'",
schema);
return rows.stream().map(r -> (String) r.get("table_name")).collect(Collectors.toList());
}
throw new UnsupportedOperationException("不支持的数据库类型");
}
@Override
public List<Map<String, Object>> getTableColumns(Datasource ds, String tableName) {
tableName = TableNameValidator.validate(tableName);
DbType type = DbType.fromString(ds.getDbType());
if (type == DbType.MYSQL || type == DbType.DORIS) {
return getTableColumnsFromMysqlOrDoris(ds, tableName);
} else if (type == DbType.POSTGRESQL) {
return getTableColumnsFromPostgreSQL(ds, tableName);
}
throw new UnsupportedOperationException("不支持的数据库类型");
}
/**
* 从 MySQL 或 Doris 获取表字段信息
*/
private List<Map<String, Object>> getTableColumnsFromMysqlOrDoris(Datasource ds, String tableName) {
List<Map<String, Object>> rows = executeQuery(ds,
"SELECT COLUMN_NAME, COLUMN_TYPE, IS_NULLABLE, COLUMN_DEFAULT, COLUMN_COMMENT, EXTRA, COLUMN_KEY " +
"FROM information_schema.COLUMNS " +
"WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? ORDER BY ORDINAL_POSITION",
ds.getDefaultDatabase(), tableName);
return rows.stream().map(row -> {
Map<String, Object> col = new HashMap<>();
col.put("name", row.get("COLUMN_NAME"));
col.put("type", row.get("COLUMN_TYPE"));
col.put("nullable", "YES".equals(row.get("IS_NULLABLE")));
col.put("default", row.get("COLUMN_DEFAULT"));
col.put("comment", row.get("COLUMN_COMMENT") != null ? row.get("COLUMN_COMMENT") : "");
col.put("extra", row.get("EXTRA") != null ? row.get("EXTRA") : "");
col.put("is_primary_key", "PRI".equals(row.get("COLUMN_KEY")));
return col;
}).collect(Collectors.toList());
}
/**
* 从 PostgreSQL 获取表字段信息(使用参数化查询减少拼接)
*/
private List<Map<String, Object>> getTableColumnsFromPostgreSQL(Datasource ds, String tableName) {
String schema = ds.getEffectSchema();
String sql = String.format(
"SELECT " +
"c.column_name, " +
"c.data_type, " +
"c.character_maximum_length, " +
"c.numeric_precision, " +
"c.numeric_scale, " +
"c.is_nullable, " +
"c.column_default, " +
"pg_catalog.col_description( " +
"(SELECT oid FROM pg_class WHERE relname = '%s' AND relnamespace = " +
"(SELECT oid FROM pg_namespace WHERE nspname = '%s') " +
"), c.ordinal_position " +
") AS column_comment, " +
"CASE " +
"WHEN pk_cols.column_name IS NOT NULL THEN TRUE " +
"ELSE FALSE " +
"END AS is_primary_key " +
"FROM information_schema.columns c " +
"LEFT JOIN ( " +
"SELECT kcu.column_name " +
"FROM information_schema.table_constraints tc " +
"JOIN information_schema.key_column_usage kcu " +
"ON tc.constraint_name = kcu.constraint_name " +
"AND tc.table_schema = kcu.table_schema " +
"WHERE tc.constraint_type = 'PRIMARY KEY' " +
"AND tc.table_schema = '%s' " +
"AND tc.table_name = '%s' " +
") pk_cols ON c.column_name = pk_cols.column_name " +
"WHERE c.table_schema = '%s' AND c.table_name = '%s' " +
"ORDER BY c.ordinal_position ",
tableName, schema, schema, tableName, schema, tableName
);
List<Map<String, Object>> rows = executeQuery(ds, sql);
return rows.stream().map(row -> {
String dataType = (String) row.get("data_type");
Integer charLen = (Integer) row.get("character_maximum_length");
String displayType;
// 类型标准化
if ("character varying".equals(dataType)) {
displayType = charLen != null ? "varchar(" + charLen + ")" : "varchar";
} else if ("character".equals(dataType)) {
displayType = charLen != null ? "char(" + charLen + ")" : "char";
} else if ("numeric".equals(dataType)) {
Integer prec = (Integer) row.get("numeric_precision");
Integer scale = (Integer) row.get("numeric_scale");
displayType = (prec != null && scale != null) ? "numeric(" + prec + "," + scale + ")" : "numeric";
} else {
switch (dataType) {
case "integer":
displayType = "int";
break;
case "bigint":
displayType = "bigint";
break;
case "double precision":
displayType = "double";
break;
case "timestamp without time zone":
displayType = "timestamp";
break;
default:
displayType = dataType;
break;
}
}
Map<String, Object> col = new HashMap<>();
col.put("name", row.get("column_name"));
col.put("type", displayType);
col.put("nullable", "YES".equals(row.get("is_nullable")));
// pg默认值可能包含::需要特殊处理
Object rawDefault = row.get("column_default");
if (rawDefault != null) {
String columnDefault = rawDefault.toString();
// 1. 去掉 ::type
if (columnDefault.contains("::")) {
columnDefault = columnDefault.split("::", 2)[0]; // 只 split 一次,避免内容中有 ::
}
// 2. 如果是字符串字面量(以 ' 开头和结尾),去掉外层引号并处理转义
if (columnDefault.length() >= 2 &&
columnDefault.startsWith("'") &&
columnDefault.endsWith("'")) {
// 去掉首尾单引号
columnDefault = columnDefault.substring(1, columnDefault.length() - 1);
// 将 '' 替换为 '
columnDefault = columnDefault.replace("''", "'");
}
col.put("default", columnDefault);
} else {
col.put("default", null);
}
col.put("comment", row.get("column_comment") != null ? row.get("column_comment") : "");
col.put("extra", "");
col.put("is_primary_key", row.get("is_primary_key"));
return col;
}).collect(Collectors.toList());
}
@Override
public String getCreateTableSql(Datasource ds, String tableName) {
tableName = TableNameValidator.validate(tableName);
DbType type = DbType.fromString(ds.getDbType());
if (type == DbType.MYSQL || type == DbType.DORIS) {
List<Map<String, Object>> rows = executeWithRetry(ds, "SHOW CREATE TABLE `" + tableName + "`", new Object[0], true, false);
return (String) rows.get(0).get("Create Table");
} else if (type == DbType.POSTGRESQL) {
// PostgreSQL 无原生 SHOW CREATE TABLE,需重建
List<Map<String, Object>> columns = getTableColumns(ds, tableName);
String tableComment = getTableComment(ds, tableName);
String schema = ds.getEffectSchema();
String createSql = DdlBuilderUtils.buildCreateTableSql(
columns, type, type, tableName, schema, tableComment, true);
return createSql;
}
throw new UnsupportedOperationException("不支持的数据库类型");
}
@Override
public String getTableComment(Datasource ds, String tableName) {
tableName = TableNameValidator.validate(tableName);
DbType type = DbType.fromString(ds.getDbType());
if (type == DbType.MYSQL || type == DbType.DORIS) {
List<Map<String, Object>> rows = executeQuery(ds,
"SELECT TABLE_COMMENT FROM information_schema.TABLES WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ?",
ds.getDefaultDatabase(), tableName);
return rows.isEmpty() ? null : (String) rows.get(0).get("TABLE_COMMENT");
} else if (type == DbType.POSTGRESQL) {
String schema = ds.getEffectSchema();
List<Map<String, Object>> rows = executeQuery(ds,
"SELECT obj_description(c.oid, 'pg_class') AS table_comment " +
"FROM pg_class c JOIN pg_namespace n ON n.oid = c.relnamespace " +
"WHERE c.relname = ? AND n.nspname = ?",
tableName, schema);
return rows.isEmpty() ? null : (String) rows.get(0).get("table_comment");
}
return null;
}
// ========== SQL 执行公开接口 ==========
@Override
public List<Map<String, Object>> executeQuery(Datasource ds, String sql) {
return executeQuery(ds, sql, new Object[0]);
}
@Override
public List<Map<String, Object>> executeQuery(Datasource ds, String sql, Object... params) {
if (!sql.trim().toUpperCase().startsWith("SELECT")) {
throw new IllegalArgumentException("仅支持 SELECT 查询");
}
return executeWithRetry(ds, sql, params, true, false);
}
@Override
public int executeUpdate(Datasource ds, String sql) {
return executeUpdate(ds, sql, new Object[0]);
}
@Override
public int executeUpdate(Datasource ds, String sql, Object... params) {
String upper = sql.trim().toUpperCase();
if (!(upper.startsWith("INSERT") || upper.startsWith("UPDATE") || upper.startsWith("DELETE"))) {
throw new IllegalArgumentException("仅支持 DML 语句");
}
List<Map<String, Object>> result = executeWithRetry(ds, sql, params, false, true);
return result == null ? 0 : (int) result.get(0).get("rowcount");
}
@Override
public void executeDdl(Datasource ds, String sql) {
executeDdl(ds, sql, new Object[0]);
}
@Override
public void executeDdl(Datasource ds, String sql, Object... params) {
for (String stmt : splitSql(sql)) {
String upper = stmt.trim().toUpperCase();
if (!(upper.startsWith("CREATE") || upper.startsWith("ALTER") ||
upper.startsWith("DROP") || upper.startsWith("TRUNCATE") || upper.startsWith("COMMENT"))) {
throw new IllegalArgumentException("DDL 仅支持 CREATE/ALTER/DROP/TRUNCATE/COMMENT");
}
executeWithRetry(ds, stmt, params, false, false);
}
}
private List<String> splitSql(String sql) {
return Arrays.stream(sql.split(";"))
.map(String::trim)
.filter(s -> !s.isEmpty())
.collect(Collectors.toList());
}
// ========== 工具方法 ==========
/**
* 生成数据源唯一别名(基于连接信息 MD5)
*/
private String generateAlias(Datasource ds) {
String schema = ds.getEffectSchema();
String fingerprintInput = String.format(
"%s:%d/%s/%s?user=%s&pass=%s",
ds.getHost(),
ds.getPort(),
Objects.toString(ds.getDefaultDatabase(), ""),
Objects.toString(schema, ""),
Objects.toString(ds.getUsername(), ""),
Objects.toString(ds.getPassword(), "")
);
String fingerprint = DigestUtils.md5Hex(fingerprintInput);
return "dynamic_" + fingerprint.substring(0, 8);
}
/**
* 构建 JDBC URL
*/
private String buildJdbcUrl(Datasource ds) {
DbType dbType = DbType.fromString(ds.getDbType());
String host = ds.getHost();
int port = ds.getPort();
String db = ds.getDefaultDatabase();
switch (dbType) {
case MYSQL:
case DORIS:
return "jdbc:mysql://" + host + ":" + port + "/" + db +
"?useUnicode=true&characterEncoding=utf8&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true&useSSL=false";
case POSTGRESQL:
return "jdbc:postgresql://" + host + ":" + port + "/" + db;
default:
throw new IllegalArgumentException("不支持的类型: " + dbType);
}
}
/**
* 获取 JDBC 驱动类名
*/
public String getDriverClass(String dbType) {
switch (DbType.fromString(dbType)) {
case MYSQL:
case DORIS:
return "com.mysql.cj.jdbc.Driver";
case POSTGRESQL:
return "org.postgresql.Driver";
default:
throw new IllegalArgumentException("不支持的类型: " + dbType);
}
}
/**
* 构建 DataSourceProperty(含连接池配置)
*/
private DataSourceProperty buildDataSourceProperty(Datasource ds) {
DataSourceProperty property = new DataSourceProperty();
// 设置基础连接信息
property.setUsername(ds.getUsername());
property.setPassword(ds.getPassword());
// 构建JDBC URL
property.setUrl(buildJdbcUrl(ds));
// 设置驱动类
property.setDriverClassName(getDriverClass(ds.getDbType()));
// 可选 设置连接池参数(HikariCP)
property.setHikari(buildHikariConfig(ds));
return property;
}
/**
* 构建 HikariCP 连接池配置
* <p>
* TODO: 未来可从配置中心读取
*/
private HikariCpConfig buildHikariConfig(Datasource ds) {
HikariCpConfig config = new HikariCpConfig();
config.setConnectionTimeout(30000L); // 30秒
config.setIdleTimeout(600000L); // 10分钟
config.setMaxLifetime(1800000L); // 30分钟
config.setMaximumPoolSize(20);
config.setMinIdle(5);
return config;
}
}
package com.bigdata.datasource.service.impl;
import com.bigdata.datasource.entity.Datasource;
import com.bigdata.datasource.service.DatasourceService;
import com.bigdata.datasource.service.SchemaSyncService;
import com.bigdata.datasource.utils.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import java.util.*;
import java.util.regex.Pattern;
/**
* 表结构同步服务实现类
* <p>
* 职责:
* - 校验同步参数(表名、数据源等)
* - 区分同构(相同数据库类型)与异构(不同数据库类型)同步
* - 生成并执行目标建表语句
* - 处理 PostgreSQL 的额外注释语句
* - 新增能力:仅生成 DDL(供前端预览/编辑)
* <p>
* 注意:
* - 本服务层不返回 Map,不处理 Web 响应格式
* - 业务异常抛出 SchemaSyncException,由 Controller 统一捕获
* - 系统异常(如数据库连接失败)也包装为 SchemaSyncException 向上抛出
*/
@Service
public class SchemaSyncServiceImpl implements SchemaSyncService {
@Autowired
private DatasourceService datasourceService;
private static final Logger log = LoggerFactory.getLogger(SchemaSyncServiceImpl.class);
/**
* 同步表结构:从源数据源的源表同步到目标数据源的目标表
* <p>
* 此方法会:
* 1. 校验参数
* 2. 生成目标 DDL
* 3. 在目标数据源中执行建表
*
* @param sourceDs 源数据源(必须已注册)
* @param targetDs 目标数据源(必须已注册)
* @param sourceTable 源表名(必填)
* @param targetTable 目标表名(可为空,默认同源表名)
* @return 同步结果(成功/失败 + SQL + 消息)
* @throws SchemaSyncException 业务校验失败或同步过程中发生错误
*/
@Override
public SyncTableResult syncTable(Datasource sourceDs, Datasource targetDs, String sourceTable, String targetTable) {
// ====== 1. 参数校验与预处理 ======
validateAndPrepare(sourceDs, targetDs, sourceTable, targetTable);
// 提取标准化后的表名
sourceTable = TableNameValidator.validate(sourceTable.trim());
targetTable = (targetTable == null || targetTable.trim().isEmpty())
? sourceTable
: TableNameValidator.validate(targetTable.trim());
// 获取数据库类型
DbType sourceType = DbType.fromString(sourceDs.getDbType());
DbType targetType = DbType.fromString(targetDs.getDbType());
log.info("开始同步表结构: {}({}) -> {}({})",
sourceDs.getSourceName(), sourceTable, targetDs.getSourceName(), targetTable);
try {
if (sourceType == targetType) {
// 同构同步:复用源库建表语句(替换表名/schema)
return handleHomogeneousSync(sourceDs, targetDs, sourceTable, targetTable, sourceType);
} else {
// 异构同步:解析字段元数据,重建目标建表语句
return handleHeterogeneousSync(sourceDs, targetDs, sourceTable, targetTable, sourceType, targetType);
}
} catch (Exception e) {
log.error("表结构同步过程中发生未预期异常", e);
throw new SchemaSyncException("同步失败: " + e.getMessage(), e);
}
}
/**
* 生成目标数据源的建表 DDL(不执行)
* <p>
* 此方法适用于:
* - 前端预览建表语句
* - 用户手动编辑后再提交执行
* - 自动化脚本生成
*
* @param sourceDs 源数据源
* @param targetDs 目标数据源
* @param sourceTable 源表名
* @param targetTable 目标表名(可为空,默认同源表名)
* @return 生成的完整 DDL 语句(字符串)
* @throws SchemaSyncException 参数校验失败或生成过程中出错
*/
public String generateCreateTableDdl(
Datasource sourceDs,
Datasource targetDs,
String sourceTable,
String targetTable) {
// 1. 参数校验(复用相同逻辑,但不检查目标表是否存在,因为只是生成)
if (sourceTable == null || sourceTable.trim().isEmpty()) {
throw new SchemaSyncException("源表名不能为空");
}
// 2. 表名标准化
sourceTable = TableNameValidator.validate(sourceTable.trim());
targetTable = (targetTable == null || targetTable.trim().isEmpty())
? sourceTable
: TableNameValidator.validate(targetTable.trim());
// 3. 获取数据库类型
DbType sourceType = DbType.fromString(sourceDs.getDbType());
DbType targetType = DbType.fromString(targetDs.getDbType());
log.debug("生成建表 DDL: {}({}) -> {}({})", sourceDs.getSourceName(), sourceTable, targetDs.getSourceName(), targetTable);
// 4. 根据是否同构,调用对应的 DDL 生成方法
if (sourceType == targetType) {
return generateHomogeneousDdl(sourceDs, targetDs, sourceTable, targetTable, sourceType);
} else {
return generateHeterogeneousDdl(sourceDs, targetDs, sourceTable, targetTable, sourceType, targetType);
}
}
// ==============================
// Private: 参数校验(仅用于 syncTable,因为要检查目标表是否存在)
// ==============================
/**
* 参数校验与前置检查(仅用于实际同步场景)
*/
private void validateAndPrepare(Datasource sourceDs, Datasource targetDs, String sourceTable, String targetTable) {
if (sourceTable == null || sourceTable.trim().isEmpty()) {
throw new SchemaSyncException("源表名不能为空");
}
// 禁止自同步(同一数据源 + 同名表)
if (Objects.equals(sourceDs.getId(), targetDs.getId()) &&
sourceTable.trim().equals((targetTable == null ? sourceTable : targetTable.trim()))) {
throw new SchemaSyncException("禁止将表同步到自身");
}
// 检查目标表是否已存在(仅 syncTable 需要,generate 不需要)
String finalTargetTable = (targetTable == null || targetTable.trim().isEmpty())
? sourceTable.trim()
: targetTable.trim();
List<String> targetTables = datasourceService.getAllTables(targetDs);
if (targetTables.contains(finalTargetTable)) {
throw new SchemaSyncException("目标表 " + finalTargetTable + " 已存在");
}
}
// ==============================
// 同构同步:执行 + 生成
// ==============================
/**
* 处理同构同步(如 MySQL → MySQL, PG → PG)
* <p>
* 此方法会生成 DDL 并执行。
*/
private SyncTableResult handleHomogeneousSync(
Datasource sourceDs,
Datasource targetDs,
String sourceTable,
String targetTable,
DbType dbType) {
// 1. 生成 DDL
String finalSql = generateHomogeneousDdl(sourceDs, targetDs, sourceTable, targetTable, dbType);
// 2. 执行
try {
datasourceService.executeDdl(targetDs, finalSql);
log.info("同构同步成功:表 {} 已在目标数据源创建", targetTable);
return SyncTableResult.success(finalSql, "表 " + targetTable + " 已通过原生SQL同步创建");
} catch (Exception e) {
log.error("执行同构建表SQL失败", e);
return SyncTableResult.failure(finalSql, "执行建表语句失败: " + e.getMessage());
}
}
/**
* 生成同构同步的 DDL(不执行)
* <p>
* 逻辑:
* - 从源库获取原生 SHOW CREATE TABLE
* - 替换表名(MySQL/Doris)或 schema.table 引用(PostgreSQL)
*/
private String generateHomogeneousDdl(
Datasource sourceDs,
Datasource targetDs,
String sourceTable,
String targetTable,
DbType dbType) {
String createSql = datasourceService.getCreateTableSql(sourceDs, sourceTable);
if (createSql == null || createSql.trim().isEmpty()) {
throw new SchemaSyncException("无法获取源表 " + sourceTable + " 的建表语句");
}
if (dbType == DbType.POSTGRESQL) {
// PostgreSQL: 替换 "schema"."table" 引用
String srcSchema = sourceDs.getEffectSchema();
String tgtSchema = targetDs.getEffectSchema();
String escapedSrcSchema = Pattern.quote(srcSchema);
String escapedSrcTable = Pattern.quote(sourceTable);
String pattern = "\"" + escapedSrcSchema + "\"\\s*\\.\\s*\"" + escapedSrcTable + "\"";
String replacedSql = createSql.replaceAll(pattern, "\"" + tgtSchema + "\".\"" + targetTable + "\"");
log.debug("PostgreSQL 同构 DDL 生成:替换表引用 {} -> {}", pattern, "\"" + tgtSchema + "\".\"" + targetTable + "\"");
return replacedSql;
} else {
// MySQL / Doris: 替换反引号中的表名(仅替换完整标识符)
String regex = "`" + Pattern.quote(sourceTable) + "`";
String replacedSql = createSql.replaceAll(regex, "`" + targetTable + "`");
log.debug("MySQL/Doris 同构 DDL 生成:替换表名 {} -> {}", sourceTable, targetTable);
return replacedSql;
}
}
// ==============================
// 异构同步:执行 + 生成
// ==============================
/**
* 处理异构同步(如 Doris → MySQL, MySQL → PG 等)
* <p>
* 此方法会生成 DDL 并执行。
* <p>
* 重要原则:
* - 从 Doris 同步到 MySQL/PG 时,默认不生成 PRIMARY KEY
* (因为 Doris 的 Key 字段是逻辑去重键,不一定适合作为主键)
* - 主键处理应由用户显式配置(本版本暂不支持,留作扩展点)
*/
private SyncTableResult handleHeterogeneousSync(
Datasource sourceDs,
Datasource targetDs,
String sourceTable,
String targetTable,
DbType sourceType,
DbType targetType) {
// 1. 生成完整 DDL(含 PG 注释)
String fullSql = generateHeterogeneousDdl(sourceDs, targetDs, sourceTable, targetTable, sourceType, targetType);
// 2. 执行 DDL
try {
if (targetType == DbType.POSTGRESQL) {
// PostgreSQL 需要逐条执行(因包含 COMMENT ON 语句)
String[] statements = fullSql.split("(?<=;)(?=\\s*(?:--.*?\\n)?\\s*\\S)");
for (String stmt : statements) {
stmt = stmt.trim();
// 跳过空语句和注释行
if (!stmt.isEmpty() && !stmt.startsWith("--")) {
datasourceService.executeDdl(targetDs, stmt);
}
}
} else {
// MySQL / Doris 等支持单条多语句或无需分割
datasourceService.executeDdl(targetDs, fullSql);
}
log.info("异构建表成功:表 {} 已在目标数据源创建", targetTable);
return SyncTableResult.success(fullSql, "表 " + targetTable + " 已成功创建于目标数据源");
} catch (Exception e) {
log.error("异构建表失败", e);
return SyncTableResult.failure(fullSql, "建表失败: " + e.getMessage());
}
}
/**
* 生成异构同步的 DDL(不执行)
* <p>
* 步骤:
* 1. 获取源表字段元数据
* 2. 获取表注释
* 3. 调用 DdlBuilderUtils 生成基础 CREATE TABLE
* 4. 对 PostgreSQL,追加 COMMENT ON 语句
*/
private String generateHeterogeneousDdl(
Datasource sourceDs,
Datasource targetDs,
String sourceTable,
String targetTable,
DbType sourceType,
DbType targetType) {
// 1. 获取源表字段
List<Map<String, Object>> columns = datasourceService.getTableColumns(sourceDs, sourceTable);
if (columns == null || columns.isEmpty()) {
throw new SchemaSyncException("源表 " + sourceTable + " 不存在或无字段");
}
// 2. 获取表注释
String tableComment = datasourceService.getTableComment(sourceDs, sourceTable);
// 3. 生成基础 CREATE TABLE 语句(DdlBuilderUtils 内部处理类型映射、默认值、注释位置等)
String createSql = DdlBuilderUtils.buildCreateTableSql(
columns,
sourceType,
targetType,
targetTable,
targetDs.getSchemaName(),
tableComment,
false // 异构路径由本服务自己追加 PostgreSQL 注释
);
// 4. 构建完整脚本(含 PG 注释)
StringBuilder fullSql = new StringBuilder(createSql);
if (targetType == DbType.POSTGRESQL) {
List<String> commentStatements = buildPostgreSqlCommentStatements(
targetDs, targetTable, tableComment, columns);
if (!commentStatements.isEmpty()) {
fullSql.append("\n\n-- PostgreSQL 字段注释:\n")
.append(String.join("\n", commentStatements));
}
}
return fullSql.toString();
}
// ==============================
// PostgreSQL 注释辅助方法
// ==============================
/**
* 构建 PostgreSQL 的 COMMENT ON 语句列表
* <p>
* 安全措施:
* - 使用 escapeSingleQuote 转义单引号(PostgreSQL 注释用单引号包裹)
* - 严格校验 schema/table/column 名称(已由 TableNameValidator 保证)
*/
private List<String> buildPostgreSqlCommentStatements(
Datasource targetDs,
String targetTable,
String tableComment,
List<Map<String, Object>> columns) {
List<String> statements = new ArrayList<>();
String schema = targetDs.getEffectSchema();
// 表注释
if (isNotBlank(tableComment)) {
String safeComment = escapeSingleQuote(tableComment);
statements.add(String.format(
"COMMENT ON TABLE \"%s\".\"%s\" IS '%s';",
schema, targetTable, safeComment
));
}
// 字段注释
for (Map<String, Object> col : columns) {
String colName = (String) col.get("name");
String comment = (String) col.getOrDefault("comment", "");
if (isNotBlank(comment)) {
String safeComment = escapeSingleQuote(comment);
statements.add(String.format(
"COMMENT ON COLUMN \"%s\".\"%s\".\"%s\" IS '%s';",
schema, targetTable, colName, safeComment
));
}
}
return statements;
}
/**
* 判断字符串是否非空(非 null 且去除空格后长度 > 0)
*/
private boolean isNotBlank(String str) {
return str != null && !str.trim().isEmpty();
}
/**
* 转义 PostgreSQL 注释中的单引号:' → ''
*/
private String escapeSingleQuote(String input) {
return input.replace("'", "''");
}
}
utils目录
package com.bigdata.datasource.utils;
public enum DbType {
MYSQL, DORIS, POSTGRESQL;
public static DbType fromString(String type) {
if (type == null) throw new IllegalArgumentException("dbType 不能为空");
switch (type.toLowerCase()) {
case "mysql":
return MYSQL;
case "doris":
return DORIS;
case "postgresql":
return POSTGRESQL;
default:
throw new IllegalArgumentException("不支持的数据库类型:" + type);
}
}
}
package com.bigdata.datasource.utils;
import java.util.*;
import java.util.stream.Collectors;
public class DdlBuilderUtils {
/**
* 构建目标数据库的 CREATE TABLE 语句
*
* @param columns 字段列表(每个字段为 Map,含 name/type/nullable/default/comment/is_primary_key)
* @param sourceDbType 源数据库类型
* @param targetDbType 目标数据库类型
* @param targetTable 目标表名
* @param targetSchema 目标 schema(仅 PostgreSQL 使用)
* @param tableComment 表注释
* @param includeComments 是否在 SQL 中包含 COMMENT ON 语句(仅 PostgreSQL 同构路径使用)
* @return 完整的 CREATE TABLE SQL
*/
public static String buildCreateTableSql(
List<Map<String, Object>> columns,
DbType sourceDbType,
DbType targetDbType,
String targetTable,
String targetSchema,
String tableComment,
boolean includeComments
) {
// 主键字段必须 NOT NULL(安全兜底)
for (Map<String, Object> col : columns) {
if (Boolean.TRUE.equals(col.get("is_primary_key"))) {
col.put("nullable", false);
}
}
List<String> pkColumns = columns.stream()
.filter(col -> Boolean.TRUE.equals(col.get("is_primary_key")))
.map(col -> (String) col.get("name"))
.collect(Collectors.toList());
// ========== MySQL / PostgreSQL ==========
if (targetDbType == DbType.MYSQL || targetDbType == DbType.POSTGRESQL) {
List<String> columnDefs = new ArrayList<>();
for (Map<String, Object> col : columns) {
String def = buildColumnDef(col, sourceDbType, targetDbType, false);
columnDefs.add(def);
}
if (targetDbType == DbType.MYSQL) {
StringBuilder sql = new StringBuilder();
sql.append("CREATE TABLE `").append(targetTable).append("` (\n ")
.append(String.join(",\n ", columnDefs));
if (!pkColumns.isEmpty()) {
String pkClause = pkColumns.stream()
.map(name -> "`" + name + "`")
.collect(Collectors.joining(", "));
sql.append(",\n PRIMARY KEY (").append(pkClause).append(")");
}
sql.append("\n)");
if (tableComment != null && !tableComment.trim().isEmpty()) {
String escaped = tableComment.replace("'", "\\'");
sql.append(" COMMENT='").append(escaped).append("'");
}
sql.append(";");
return sql.toString();
} else { // PostgreSQL
String schema = Optional.ofNullable(targetSchema).orElse("public");
StringBuilder sql = new StringBuilder();
sql.append("CREATE TABLE \"").append(schema).append("\".\"").append(targetTable).append("\" (\n ")
.append(String.join(",\n ", columnDefs));
if (!pkColumns.isEmpty()) {
String pkClause = pkColumns.stream()
.map(name -> "\"" + name + "\"")
.collect(Collectors.joining(", "));
sql.append(",\n PRIMARY KEY (").append(pkClause).append(")");
}
sql.append("\n);");
if (includeComments) {
List<String> statements = new ArrayList<>();
statements.add(sql.toString());
// 表注释
if (tableComment != null && !tableComment.trim().isEmpty()) {
String escaped = tableComment.replace("'", "''");
statements.add(String.format(
"COMMENT ON TABLE \"%s\".\"%s\" IS '%s';",
schema, targetTable, escaped
));
}
// 字段注释
for (Map<String, Object> col : columns) {
String comment = (String) col.getOrDefault("comment", "");
if (comment != null && !comment.trim().isEmpty()) {
String escaped = comment.replace("'", "''");
statements.add(String.format(
"COMMENT ON COLUMN \"%s\".\"%s\".\"%s\" IS '%s';",
schema, targetTable, col.get("name"), escaped
));
}
}
return String.join("\n", statements);
} else {
return sql.toString();
}
}
}
// ========== Doris ==========
boolean hasExplicitPk = columns.stream().anyMatch(col -> Boolean.TRUE.equals(col.get("is_primary_key")));
List<Map<String, Object>> keyColumns, valueColumns;
if (hasExplicitPk) {
keyColumns = columns.stream().filter(col -> Boolean.TRUE.equals(col.get("is_primary_key"))).collect(Collectors.toList());
valueColumns = columns.stream().filter(col -> !Boolean.TRUE.equals(col.get("is_primary_key"))).collect(Collectors.toList());
} else {
keyColumns = Arrays.asList(columns.get(0));
valueColumns = columns.subList(1, columns.size());
}
List<Map<String, Object>> reordered = new ArrayList<>(keyColumns);
reordered.addAll(valueColumns);
List<String> keyNames = keyColumns.stream().map(col -> (String) col.get("name")).collect(Collectors.toList());
String firstKey = keyNames.get(0);
List<String> columnDefs = new ArrayList<>();
for (Map<String, Object> col : reordered) {
boolean isKey = keyNames.contains(col.get("name"));
String def = buildColumnDef(col, sourceDbType, DbType.DORIS, isKey);
columnDefs.add(def);
}
StringBuilder sql = new StringBuilder();
sql.append("CREATE TABLE `").append(targetTable).append("` (\n ")
.append(String.join(",\n ", columnDefs))
.append("\n)");
if (hasExplicitPk) {
String pkClause = keyNames.stream().map(name -> "`" + name + "`").collect(Collectors.joining(", "));
sql.append("\nUNIQUE KEY(").append(pkClause).append(")");
} else {
sql.append("\nDUPLICATE KEY(`").append(firstKey).append("`)");
}
if (tableComment != null && !tableComment.trim().isEmpty()) {
String escaped = tableComment.replace("'", "\\'");
sql.append(" COMMENT \"").append(escaped).append("\"");
}
sql.append("\nDISTRIBUTED BY HASH(`").append(firstKey).append("`) BUCKETS 32\n")
.append("PROPERTIES(\"replication_num\" = \"1\");");
return sql.toString();
}
/**
* 映射默认值,确保目标数据库兼容
*/
private static String mapDefaultValue(String defaultValue, DbType sourceDbType, DbType targetDbType, String columnType) {
if (defaultValue == null) return null;
String dv = defaultValue.trim();
String dvLower = dv.toLowerCase();
// NULL
if ("null".equals(dvLower)) {
return "NULL";
}
// 时间函数
if ("now()".equals(dvLower) || "current_timestamp".equals(dvLower)) {
if (targetDbType == DbType.MYSQL || targetDbType == DbType.POSTGRESQL || targetDbType == DbType.DORIS) {
return "CURRENT_TIMESTAMP";
}
}
// Doris 只支持常量
if (targetDbType == DbType.DORIS) {
boolean isStringType = columnType != null &&
(columnType.toLowerCase().contains("char") ||
columnType.toLowerCase().contains("varchar") ||
columnType.toLowerCase().contains("string") ||
columnType.toLowerCase().contains("text"));
if (isStringType) {
if (dv.startsWith("'") && dv.endsWith("'")) {
return dv;
} else {
return "'" + dv + "'";
}
} else {
// 数值类型:只允许纯数字(含小数、负号)
String clean = dv.replace(".", "").replace("-", "");
if (clean.chars().allMatch(Character::isDigit)) {
return dv;
} else {
return null; // 丢弃非法默认值
}
}
}
// 默认:原样返回
return dv;
}
/**
* 构建单个字段定义
*/
private static String buildColumnDef(Map<String, Object> col, DbType sourceDbType, DbType targetDbType, boolean isKey) {
String name = (String) col.get("name");
String srcType = (String) col.get("type");
Boolean nullable = (Boolean) col.get("nullable");
String defaultValue = (String) col.get("default");
String comment = (String) col.getOrDefault("comment", "");
// 1. 类型映射
String mappedType = TypeMapper.mapColumnType(srcType, sourceDbType, targetDbType);
// 2. 字段名引号
String colDef;
if (targetDbType == DbType.MYSQL || targetDbType == DbType.DORIS) {
colDef = "`" + name + "` " + mappedType;
} else { // PostgreSQL
colDef = "\"" + name + "\" " + mappedType;
}
// 3. 默认值
String mappedDefault = mapDefaultValue(defaultValue, sourceDbType, targetDbType, mappedType);
if (mappedDefault != null) {
if ("NULL".equals(mappedDefault)) {
if (targetDbType == DbType.MYSQL || targetDbType == DbType.DORIS) {
if (!isKey) {
colDef += " DEFAULT NULL";
}
}
} else {
colDef += " DEFAULT " + mappedDefault;
}
}
// 4. 非空约束
if (isKey) {
colDef += " NOT NULL";
} else if (nullable != null && !nullable) {
// Doris 规则:有默认值的非主键字段不能写 NOT NULL
if (targetDbType == DbType.DORIS && mappedDefault != null) {
// skip NOT NULL
} else {
colDef += " NOT NULL";
}
}
// 5. 内联注释(仅 MySQL/Doris)
if ((targetDbType == DbType.MYSQL || targetDbType == DbType.DORIS) && comment != null && !comment.trim().isEmpty()) {
String escaped = comment.replace("'", "\\'");
colDef += " COMMENT '" + escaped + "'";
}
return colDef;
}
}package com.bigdata.datasource.utils;
public class SchemaSyncException extends RuntimeException {
public SchemaSyncException(String message) {
super(message);
}
public SchemaSyncException(String message, Throwable cause) {
super(message, cause);
}
}
package com.bigdata.datasource.utils;
import lombok.Data;
@Data
public class SyncTableResult {
private final boolean success;
private final String sql;
private final String message;
private final String error;
private SyncTableResult(boolean success, String sql, String message, String error) {
this.success = success;
this.sql = sql;
this.message = message;
this.error = error;
}
public static SyncTableResult success(String sql, String message) {
return new SyncTableResult(true, sql, message, null);
}
public static SyncTableResult failure(String sql, String error) {
return new SyncTableResult(false, sql, null, error);
}
}
package com.bigdata.datasource.utils;
import java.util.regex.Pattern;
public class TableNameValidator {
private static final Pattern VALID_NAME = Pattern.compile("^[a-zA-Z_][a-zA-Z0-9_]*$");
public static String validate(String name) {
if (name == null || name.trim().isEmpty()) {
throw new IllegalArgumentException("表名不能为空");
}
name = name.trim();
if (!VALID_NAME.matcher(name).matches()) {
throw new IllegalArgumentException("表名 '" + name + "' 不合法:必须以字母或下划线开头,仅包含字母、数字、下划线");
}
return name;
}
}
package com.bigdata.datasource.utils;
import java.util.HashMap;
import java.util.Map;
import java.util.function.BiFunction;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* 数据库字段类型映射器
* 支持 MySQL / Doris / PostgreSQL 之间的类型转换
*/
public class TypeMapper {
// ========== 类型映射规则表 ==========
// 结构: { 标准化基础类型 -> { 目标数据库 -> 映射函数 } }
// 映射函数: (length, precScale) -> 目标类型字符串
private static final Map<String, Map<DbType, BiFunction<Integer, String, String>>> TYPE_MAPPING = new HashMap<>();
static {
// 字符串类型
putMapping("varchar", DbType.MYSQL, (len, ps) -> len != null ? "varchar(" + len + ")" : "varchar(255)");
putMapping("varchar", DbType.DORIS, (len, ps) -> "varchar(" + Math.min(len != null ? len : 65533, 65533) + ")");
putMapping("varchar", DbType.POSTGRESQL, (len, ps) -> len != null ? "varchar(" + len + ")" : "varchar");
putMapping("char", DbType.MYSQL, (len, ps) -> "char(" + (len != null ? len : 1) + ")");
putMapping("char", DbType.DORIS, (len, ps) -> "char(" + (len != null ? len : 1) + ")");
putMapping("char", DbType.POSTGRESQL, (len, ps) -> "char(" + (len != null ? len : 1) + ")");
// 整数类型
putMapping("int", DbType.MYSQL, (len, ps) -> "int");
putMapping("int", DbType.DORIS, (len, ps) -> "int");
putMapping("int", DbType.POSTGRESQL, (len, ps) -> "int");
putMapping("int_unsigned", DbType.MYSQL, (len, ps) -> "int unsigned");
putMapping("int_unsigned", DbType.DORIS, (len, ps) -> "bigint"); // Doris 无 unsigned
putMapping("int_unsigned", DbType.POSTGRESQL, (len, ps) -> "bigint");
putMapping("bigint", DbType.MYSQL, (len, ps) -> "bigint");
putMapping("bigint", DbType.DORIS, (len, ps) -> "bigint");
putMapping("bigint", DbType.POSTGRESQL, (len, ps) -> "bigint");
putMapping("bigint_unsigned", DbType.MYSQL, (len, ps) -> "bigint unsigned");
putMapping("bigint_unsigned", DbType.DORIS, (len, ps) -> "bigint");
putMapping("bigint_unsigned", DbType.POSTGRESQL, (len, ps) -> "numeric(20,0)");
putMapping("smallint", DbType.MYSQL, (len, ps) -> "smallint");
putMapping("smallint", DbType.DORIS, (len, ps) -> "smallint");
putMapping("smallint", DbType.POSTGRESQL, (len, ps) -> "smallint");
putMapping("tinyint", DbType.MYSQL, (len, ps) -> "tinyint(1)");
putMapping("tinyint", DbType.DORIS, (len, ps) -> "boolean");
putMapping("tinyint", DbType.POSTGRESQL, (len, ps) -> "boolean");
// 浮点/定点类型
putMapping("double", DbType.MYSQL, (len, ps) -> "double");
putMapping("double", DbType.DORIS, (len, ps) -> "double");
putMapping("double", DbType.POSTGRESQL, (len, ps) -> "double precision");
putMapping("float", DbType.MYSQL, (len, ps) -> "float");
putMapping("float", DbType.DORIS, (len, ps) -> "float");
putMapping("float", DbType.POSTGRESQL, (len, ps) -> "real");
putMapping("decimal", DbType.MYSQL, (len, ps) -> "decimal(" + (ps != null ? ps : "10,0") + ")");
putMapping("decimal", DbType.DORIS, (len, ps) -> "decimal(" + (ps != null ? ps : "10,0") + ")");
putMapping("decimal", DbType.POSTGRESQL, (len, ps) -> "numeric(" + (ps != null ? ps : "10,0") + ")");
// 时间类型
putMapping("timestamp", DbType.MYSQL, (len, ps) -> "timestamp");
putMapping("timestamp", DbType.DORIS, (len, ps) -> "datetime");
putMapping("timestamp", DbType.POSTGRESQL, (len, ps) -> "timestamp without time zone");
putMapping("datetime", DbType.MYSQL, (len, ps) -> "datetime");
putMapping("datetime", DbType.DORIS, (len, ps) -> "datetime");
putMapping("datetime", DbType.POSTGRESQL, (len, ps) -> "timestamp without time zone");
putMapping("date", DbType.MYSQL, (len, ps) -> "date");
putMapping("date", DbType.DORIS, (len, ps) -> "date");
putMapping("date", DbType.POSTGRESQL, (len, ps) -> "date");
// 大对象/特殊类型
putMapping("text", DbType.MYSQL, (len, ps) -> "text");
putMapping("text", DbType.DORIS, (len, ps) -> "string");
putMapping("text", DbType.POSTGRESQL, (len, ps) -> "text");
putMapping("mediumtext", DbType.MYSQL, (len, ps) -> "mediumtext");
putMapping("mediumtext", DbType.DORIS, (len, ps) -> "string");
putMapping("mediumtext", DbType.POSTGRESQL, (len, ps) -> "text");
putMapping("longtext", DbType.MYSQL, (len, ps) -> "longtext");
putMapping("longtext", DbType.DORIS, (len, ps) -> "string");
putMapping("longtext", DbType.POSTGRESQL, (len, ps) -> "text");
putMapping("json", DbType.MYSQL, (len, ps) -> "json");
putMapping("json", DbType.DORIS, (len, ps) -> "string");
putMapping("json", DbType.POSTGRESQL, (len, ps) -> "jsonb");
putMapping("enum", DbType.MYSQL, (len, ps) -> "varchar(255)");
putMapping("enum", DbType.DORIS, (len, ps) -> "varchar(65533)");
putMapping("enum", DbType.POSTGRESQL, (len, ps) -> "varchar(255)");
putMapping("set", DbType.MYSQL, (len, ps) -> "varchar(255)");
putMapping("set", DbType.DORIS, (len, ps) -> "varchar(65533)");
putMapping("set", DbType.POSTGRESQL, (len, ps) -> "varchar(255)");
putMapping("boolean", DbType.MYSQL, (len, ps) -> "tinyint(1)");
putMapping("boolean", DbType.DORIS, (len, ps) -> "boolean");
putMapping("boolean", DbType.POSTGRESQL, (len, ps) -> "boolean");
}
private static void putMapping(String baseType, DbType targetDb, BiFunction<Integer, String, String> mapper) {
TYPE_MAPPING.computeIfAbsent(baseType, k -> new HashMap<>()).put(targetDb, mapper);
}
/**
* 解析字段类型字符串,提取基础类型、长度、精度等信息
*
* @param colType 原始类型字符串,如 "varchar(255)", "int unsigned"
* @param srcDbType 源数据库类型
* @return new Object[]{baseType, length, precScale}
*/
public static Object[] parseColumnType(String colType, DbType srcDbType) {
if (colType == null) colType = "";
colType = colType.trim().toLowerCase();
srcDbType = srcDbType != null ? srcDbType : DbType.MYSQL;
// ========== PostgreSQL 特殊处理 ==========
if (srcDbType == DbType.POSTGRESQL) {
// numeric / decimal
if (colType.startsWith("numeric") || colType.startsWith("decimal")) {
Matcher m = Pattern.compile("\\((\\d+),\\s*(\\d+)\\)").matcher(colType);
String precScale = m.find() ? m.group(1) + "," + m.group(2) : null;
return new Object[]{"decimal", null, precScale};
}
// varchar: character varying(N) 或 varchar(N)
Matcher varcharMatch = Pattern.compile("^(?:character varying|varchar)\\s*\\((\\d+)\\)").matcher(colType);
if (varcharMatch.find()) {
return new Object[]{"varchar", Integer.valueOf(varcharMatch.group(1)), null};
}
if ("character varying".equals(colType) || "varchar".equals(colType)) {
return new Object[]{"varchar", null, null};
}
// char: character(N) 或 char(N)
Matcher charMatch = Pattern.compile("^(?:character|char)\\s*\\((\\d+)\\)").matcher(colType);
if (charMatch.find()) {
return new Object[]{"char", Integer.valueOf(charMatch.group(1)), null};
}
if ("character".equals(colType) || "char".equals(colType)) {
return new Object[]{"char", null, null};
}
// 类型标准化
Map<String, String> pgMap = new HashMap<>();
pgMap.put("integer", "int");
pgMap.put("bigint", "bigint");
pgMap.put("double precision", "double");
pgMap.put("timestamp without time zone", "timestamp");
pgMap.put("timestamp with time zone", "timestamptz");
pgMap.put("boolean", "boolean");
pgMap.put("text", "text");
String baseType = pgMap.getOrDefault(colType, colType);
return new Object[]{baseType, null, null};
}
// ========== MySQL / Doris 处理 ==========
Integer length = null;
String precScale = null;
String basePart;
if (colType.contains("(")) {
String[] parts = colType.split("\\(", 2);
basePart = parts[0].trim();
String inner = parts[1].replaceAll("\\)$", "").trim();
if (inner.contains(",")) {
precScale = inner; // e.g., "10,2"
} else {
try {
length = Integer.valueOf(inner);
} catch (NumberFormatException ignored) {
}
}
} else {
basePart = colType;
}
// 提取基础类型(第一个词)
String baseType = basePart.split("\\s+")[0];
// 处理 unsigned
if (srcDbType == DbType.MYSQL && colType.contains("unsigned")) {
switch (baseType) {
case "tinyint":
baseType = "smallint";
break;
case "smallint":
baseType = "int";
break;
case "int":
baseType = "int_unsigned";
break;
case "bigint":
baseType = "bigint_unsigned";
break;
case "mediumint":
baseType = "int";
break;
}
}
return new Object[]{baseType, length, precScale};
}
/**
* 映射字段类型
*
* @param srcTypeStr 源类型字符串,如 "varchar(255)"
* @param srcDbType 源数据库类型
* @param targetDbType 目标数据库类型
* @return 目标类型字符串,如 "string" (Doris)
*/
public static String mapColumnType(String srcTypeStr, DbType srcDbType, DbType targetDbType) {
if (srcTypeStr == null || srcTypeStr.trim().isEmpty()) {
return targetDbType == DbType.DORIS ? "string" : "text";
}
// 1. 解析源类型
Object[] parsed = parseColumnType(srcTypeStr, srcDbType);
String baseType = (String) parsed[0];
Integer length = (Integer) parsed[1];
String precScale = (String) parsed[2];
// 2. 查找映射规则
Map<DbType, BiFunction<Integer, String, String>> targetMappers = TYPE_MAPPING.get(baseType);
if (targetMappers != null) {
BiFunction<Integer, String, String> mapper = targetMappers.get(targetDbType);
if (mapper != null) {
return mapper.apply(length, precScale);
}
}
// 3. 无匹配:安全兜底
return targetDbType == DbType.DORIS ? "string" : "text";
}
}
config目录
package com.bigdata.datasource.config;
import org.springframework.context.annotation.Configuration;
@Configuration
public class DynamicDataSourceConfig {
}主程序
package com.bigdata.datasource;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class MainApplication {
public static void main(String[] args) {
SpringApplication.run(MainApplication.class, args);
}
}
测试:
package com.bigdata.datasource.test;
import com.bigdata.datasource.entity.Datasource;
import com.bigdata.datasource.mapper.DatasourceMapper;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
public class TestDatasourceMapper {
@Autowired
private DatasourceMapper datasourceMapper;
@Test
public void testSelect() {
Datasource dataSource = datasourceMapper.selectById("1001");
System.out.println(dataSource);
}
}
package com.bigdata.datasource.test;
import com.bigdata.datasource.entity.Datasource;
import com.bigdata.datasource.mapper.DatasourceMapper;
import com.bigdata.datasource.service.DatasourceService;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.util.List;
import java.util.Map;
@SpringBootTest
public class TestDatasourceService {
@Autowired
private DatasourceMapper datasourceMapper;
@Autowired
private DatasourceService datasourceService;
public Datasource getDataSource(String datasourceId) {
Datasource ds = datasourceMapper.selectById(datasourceId);
return ds;
}
@Test
public void test() {
Datasource ds = getDataSource("1001");
List<String> allTables = datasourceService.getAllTables(ds);
System.out.println(allTables);
}
@Test
public void test1() {
Datasource ds = getDataSource("3001");
List<Map<String, Object>> columns = datasourceService.getTableColumns(ds, "test_user");
for (Map<String, Object> column : columns) {
System.out.println(column);
}
}
@Test
public void test2() {
Datasource ds = getDataSource("3001");
String comment = datasourceService.getTableComment(ds, "test_user");
System.out.println(comment);
}
@Test
public void test3() {
Datasource ds = getDataSource("1001");
List<Map<String, Object>> list = datasourceService.executeQuery(ds, "SELECT * FROM test_user WHERE name = ? AND age = ?", "bob", 18);
for (Map<String, Object> map : list) {
System.out.println(map);
}
}
@Test
public void test4() {
Datasource ds = getDataSource("2001");
String sql = datasourceService.getCreateTableSql(ds, "test_mysql2doris");
System.out.println(sql);
}
}
package com.bigdata.datasource.test;
import com.bigdata.datasource.entity.Datasource;
import com.bigdata.datasource.mapper.DatasourceMapper;
import com.bigdata.datasource.service.SchemaSyncService;
import com.bigdata.datasource.utils.SyncTableResult;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.util.Map;
@SpringBootTest
public class TestSchemaSyncService {
@Autowired
private DatasourceMapper datasourceMapper;
@Autowired
private SchemaSyncService schemaSyncService;
public Datasource getDataSource(String datasourceId) {
Datasource ds = datasourceMapper.selectById(datasourceId);
return ds;
}
@Test
public void test() {
Datasource ds1 = getDataSource("1001"); // mysql
Datasource ds2 = getDataSource("1002"); // mysql
Datasource ds3 = getDataSource("3001"); // pg
Datasource ds4 = getDataSource("2001"); // doris
// mysql -> mysql
// SyncTableResult syncTableResult = schemaSyncService.syncTable(ds1, ds2, "test_user", "test_mysql2mysql");
// mysql -> pg
// SyncTableResult syncTableResult = schemaSyncService.syncTable(ds1, ds3, "test_user", "test_mysql2pg");
// mysql -> doris
// SyncTableResult syncTableResult = schemaSyncService.syncTable(ds1, ds4, "test_user", "test_mysql2doris");
// pg -> pg
// SyncTableResult syncTableResult = schemaSyncService.syncTable(ds3, ds3, "test_mysql2pg", "test_pg2pg");
// pg -> mysql
// SyncTableResult syncTableResult = schemaSyncService.syncTable(ds3, ds1, "test_mysql2pg", "test_pg2mysql");
// pg -> doris
// SyncTableResult syncTableResult = schemaSyncService.syncTable(ds3, ds4, "test_mysql2pg", "test_pg2doris");
// doris -> doris
// SyncTableResult syncTableResult = schemaSyncService.syncTable(ds4, ds4, "test_mysql2doris", "test_doris2doris");
// doris -> mysql
// SyncTableResult syncTableResult = schemaSyncService.syncTable(ds4, ds1, "test_mysql2doris", "test_doris2mysql");
// doris -> pg
SyncTableResult syncTableResult = schemaSyncService.syncTable(ds4, ds3, "test_mysql2doris", "test_doris2pg");
System.out.println(syncTableResult);
}
@Test
public void test2() {
Datasource ds1 = getDataSource("1001"); // mysql
Datasource ds2 = getDataSource("1002"); // mysql
Datasource ds3 = getDataSource("3001"); // pg
Datasource ds4 = getDataSource("2001"); // doris
String ddl = schemaSyncService.generateCreateTableDdl(ds4, ds3, "test_mysql2doris", "test_doris2mysql");
System.out.println(ddl);
}
}