文章
Flink集成MyBatis-Flex与Druid示例
下面是一个完整的 Flink 作业示例,集成了 MyBatis-Flex 和 Druid 数据源,并在 RichFunction 中使用 Mapper 进行数据查询。
- 项目依赖 (pom.xml)
首先确保你的项目包含以下依赖:
<dependencies>
<!-- flink相关 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>1.17.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>1.17.2</version>
</dependency>
<dependency>
<groupId>com.mybatis-flex</groupId>
<artifactId>mybatis-flex-spring-boot-starter</artifactId>
<version>1.9.9</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.2.23</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.20</version>
</dependency>
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<version>8.0.33</version>
</dependency>
</dependencies>
<!-- 打包插件配置 -->
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
<exclude>org.apache.hadoop:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- 打包时不复制META-INF下的签名文件,避免报非法签名文件SecurityExceptions异常 -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers combine.children="append">
<!-- connector和format以来的工厂类打包时会相互覆盖,需要使用ServiceResourceTransformer解决 -->
<tranformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
实体类定义 bean/User.java
package com.learn.bean;
import com.mybatisflex.annotation.Column;
import com.mybatisflex.annotation.Id;
import com.mybatisflex.annotation.Table;
import lombok.Data;
/**
* 用户实体类
* 对应数据库中的 user 表
*/
@Data
@Table("user")
public class User {
@Id
private Long id;
@Column("username")
private String username;
@Column("age")
private Integer age;
@Column("email")
private String email;
@Column("create_time")
private String createTime;
}
Mapper 接口定义 mapper/UserMapper.java
package com.learn.mapper;
import com.learn.bean.User;
import org.apache.ibatis.annotations.Mapper;
import java.util.List;
@Mapper
public interface UserMapper {
List<User> getAllUser();
}
MyBatis-Flex 配置类 config/MyBatisFlexConfig
package com.learn.config;
import com.alibaba.druid.pool.DruidDataSource;
import com.learn.mapper.UserMapper;
import com.mybatisflex.core.MybatisFlexBootstrap;
import com.mybatisflex.core.audit.AuditManager;
import com.mybatisflex.core.audit.ConsoleMessageCollector;
import com.mybatisflex.core.mybatis.Mappers;
import javax.sql.DataSource;
public class MyBatisFlexConfig {
private static volatile boolean initialized = false;
public static synchronized void initialize() {
if (initialized) {
return;
}
// 启用审计功能(可选)
// AuditManager.setAuditEnable(true);
// AuditManager.setMessageCollector(new ConsoleMessageCollector());
// 创建 Druid 数据源
DataSource dataSource = createDruidDataSource();
// 初始化 MyBatis-Flex
MybatisFlexBootstrap.getInstance()
.setDataSource(dataSource)
.addMapper(UserMapper.class)
.start();
initialized = true;
}
private static DataSource createDruidDataSource() {
DruidDataSource dataSource = new DruidDataSource();
dataSource.setUrl("jdbc:mysql://xxx.xxx.xxx.xxx:3306/xxx?useUnicode=true");
dataSource.setUsername("xxx");
dataSource.setPassword("xxx");
dataSource.setDriverClassName("com.mysql.cj.jdbc.Driver");
// 连接池配置
dataSource.setInitialSize(2);
dataSource.setMinIdle(5);
dataSource.setMaxActive(10);
dataSource.setMaxWait(60000);
// 关键保活配置
dataSource.setValidationQuery("SELECT 1"); // MySQL 用 SELECT 1,Oracle 用 SELECT 1 FROM DUAL
dataSource.setTestWhileIdle(true); // 空闲时检查连接有效性
dataSource.setTestOnBorrow(true); // 获取连接时检查有效性
dataSource.setTimeBetweenEvictionRunsMillis(30000); // 30秒检测一次
dataSource.setMinEvictableIdleTimeMillis(600000); // 10分钟无活动则回收
return dataSource;
}
public static UserMapper getUserMapper() {
return Mappers.ofMapperClass(UserMapper.class);
}
}
Flink RichFunction 实现 function/MyProcessFunction.java
package com.learn.function;
import com.learn.bean.User;
import com.learn.config.MyBatisFlexConfig;
import com.learn.mapper.UserMapper;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import java.util.List;
public class MyProcessFunction extends ProcessFunction<String, String> {
private transient UserMapper mapper;
private transient long lastRefreshTime;
private static final long REFRESH_INTERVAL = 3600000; // 1小时刷新一次
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// 初始化 MyBatis-Flex
MyBatisFlexConfig.initialize();
// 获取 UserMapper 实例
mapper = MyBatisFlexConfig.getUserMapper();
lastRefreshTime = System.currentTimeMillis();
}
@Override
public void processElement(String value, ProcessFunction<String, String>.Context ctx, Collector<String> out) throws Exception {
// 一小时刷新一次mapper
if (System.currentTimeMillis() - lastRefreshTime > REFRESH_INTERVAL) {
mapper = MyBatisFlexConfig.getUserMapper();
lastRefreshTime = System.currentTimeMillis();
}
List<User> allUser = mapper.getAllUser();
System.out.println("已有数据量: " + allUser.size());
out.collect(value);
}
}
Flink 主程序 job/Main
ipackage com.learn.job;
import com.learn.function.MyProcessFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class Main {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> ds = env.fromElements("1", "2", "3");
SingleOutputStreamOperator<String> processDs = ds.process(new MyProcessFunction());
processDs.print();
env.execute();
}
}
配置文件 rources: com/learn/mapper/UserMapper.xml
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.learn.mapper.UserMapper">
<!-- List<User> getAllUser(); -->
<select id="getAllUser" resultType="com.learn.bean.User">
SELECT *
FROM user
</select>
</mapper>