文章
flink集成spring-boot及mybatis plus
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>press.huang</groupId>
<artifactId>flink-spring-boot-demo</artifactId>
<version>1.0.0</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>1.17.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>1.17.2</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<version>2.7.18</version>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.5.6</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.33</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.20</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<!-- 指定编译器 JDK8 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>resources/application.yml
spring:
datasource:
url: jdbc:mysql://localhost:3306/flink_etl?useSSL=false&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true
username: root
password: root
driver-class-name: com.mysql.cj.jdbc.Driver
hikari:
minimum-idle: 5
maximum-pool-size: 20
# 10分钟无活动回收连接
idle-timeout: 600000
# 30分钟最大生命周期,到点重建
max-lifetime: 1800000
# 每5分钟测试连接可用性
keepalive-time: 300000
connection-timeout: 30000
mybatis-plus:
mapper-locations: classpath*:mapper/*.xml
configuration:
map-underscore-to-camel-case: trueresources/mapper/UserMapper.xml
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="press.huang.dev.mapper.UserMapper">
</mapper>DDL
CREATE TABLE `user_info` (
`id` bigint NOT NULL AUTO_INCREMENT,
`name` varchar(100) DEFAULT NULL,
`age` int DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4press.huang.dev.entity
package press.huang.dev.entity;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
@Data
@TableName("user_info")
public class User {
private Long id;
private String name;
private Integer age;
}
press.huang.dev.functions
package press.huang.dev.functions;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import press.huang.dev.entity.User;
import press.huang.dev.mapper.UserMapper;
import press.huang.dev.utils.SpringContextUtil;
/**
* 富函数,用于 MySQL 查询维表数据
* 注意:Mapper由 Spring管理,连接池自动维护,无需关闭连接
*/
public class UserRichMapFunction extends RichMapFunction<Long, String> {
private transient UserMapper userMapper;
@Override
public void open(Configuration parameters) throws Exception {
// 只初始化一次,避免频繁创建连接
userMapper = SpringContextUtil.getBean(UserMapper.class);
}
@Override
public String map(Long userId) throws Exception {
User user = userMapper.selectById(userId);
return user == null ? "User Not found" : user.getName() + " | age=" + user.getAge();
}
}
press.huang.dev.mapper
package press.huang.dev.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Mapper;
import press.huang.dev.entity.User;
@Mapper
public interface UserMapper extends BaseMapper<User> {
}
press.huang.dev.utils
package press.huang.dev.utils;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
/**
* 让 Flink 运行时动态获取 Spring Bean(Mapper)
*/
@Component
public class SpringContextUtil implements ApplicationContextAware {
private static ApplicationContext CONTEXT;
@Override
public void setApplicationContext(ApplicationContext ctx) throws BeansException {
CONTEXT = ctx;
}
public static <T> T getBean(Class<T> type) {
return CONTEXT.getBean(type);
}
}
主程序
package press.huang.dev;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import press.huang.dev.functions.UserRichMapFunction;
/**
* Flink + Spring Boot + MyBatis Plus 启动入口
*/
@SpringBootApplication
public class FlinkApplication {
public static void main(String[] args) throws Exception {
// 启动 Spring 容器
SpringApplication.run(FlinkApplication.class, args);
// 初始化 Flink 执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 模拟输入数据流
env.fromElements(1L, 2L, 999L)
.map(new UserRichMapFunction())
.print();
env.execute("Flink With SpringBoot MyBatis Plus Demo");
}
}