大数据

Flink集成MyBatis-Flex与Druid示例

下面是一个完整的 Flink 作业示例,集成了 MyBatis-Flex 和 Druid 数据源,并在 RichFunction 中使用 Mapper 进行数据查询。

  1. 项目依赖 (pom.xml)
    首先确保你的项目包含以下依赖:
<dependencies>
        <!-- flink相关 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>1.17.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>1.17.2</version>
        </dependency>
        <dependency>
            <groupId>com.mybatis-flex</groupId>
            <artifactId>mybatis-flex-spring-boot-starter</artifactId>
            <version>1.9.9</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid-spring-boot-starter</artifactId>
            <version>1.2.23</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.20</version>
        </dependency>
        <dependency>
            <groupId>com.mysql</groupId>
            <artifactId>mysql-connector-j</artifactId>
            <version>8.0.33</version>
        </dependency>
    </dependencies>

    <!-- 打包插件配置 -->
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.1.1</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <exclude>log4j:*</exclude>
                                    <exclude>org.apache.hadoop:*</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <!-- 打包时不复制META-INF下的签名文件,避免报非法签名文件SecurityExceptions异常 -->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers combine.children="append">
                                <!-- connector和format以来的工厂类打包时会相互覆盖,需要使用ServiceResourceTransformer解决 -->
                                <tranformer
                                        implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

实体类定义 bean/User.java

package com.learn.bean;
import com.mybatisflex.annotation.Column;
import com.mybatisflex.annotation.Id;
import com.mybatisflex.annotation.Table;
import lombok.Data;

/**
 * 用户实体类
 * 对应数据库中的 user 表
 */
@Data
@Table("user")
public class User {
    @Id
    private Long id;

    @Column("username")
    private String username;

    @Column("age")
    private Integer age;

    @Column("email")
    private String email;

    @Column("create_time")
    private String createTime;
}

Mapper 接口定义 mapper/UserMapper.java

package com.learn.mapper;

import com.learn.bean.User;
import org.apache.ibatis.annotations.Mapper;

import java.util.List;

@Mapper
public interface UserMapper {
    List<User> getAllUser();
}

MyBatis-Flex 配置类 config/MyBatisFlexConfig

package com.learn.config;

import com.alibaba.druid.pool.DruidDataSource;

import com.learn.mapper.UserMapper;
import com.mybatisflex.core.MybatisFlexBootstrap;
import com.mybatisflex.core.audit.AuditManager;
import com.mybatisflex.core.audit.ConsoleMessageCollector;
import com.mybatisflex.core.mybatis.Mappers;

import javax.sql.DataSource;

public class MyBatisFlexConfig {

    private static volatile boolean initialized = false;

    public static synchronized void initialize() {
        if (initialized) {
            return;
        }

        // 启用审计功能(可选)
        // AuditManager.setAuditEnable(true);
        //  AuditManager.setMessageCollector(new ConsoleMessageCollector());

        // 创建 Druid 数据源
        DataSource dataSource = createDruidDataSource();

        // 初始化 MyBatis-Flex
        MybatisFlexBootstrap.getInstance()
                .setDataSource(dataSource)
                .addMapper(UserMapper.class)
                .start();

        initialized = true;
    }

    private static DataSource createDruidDataSource() {
        DruidDataSource dataSource = new DruidDataSource();
        dataSource.setUrl("jdbc:mysql://xxx.xxx.xxx.xxx:3306/xxx?useUnicode=true");
        dataSource.setUsername("xxx");
        dataSource.setPassword("xxx");
        dataSource.setDriverClassName("com.mysql.cj.jdbc.Driver");

        // 连接池配置
        dataSource.setInitialSize(2);
        dataSource.setMinIdle(5);
        dataSource.setMaxActive(10);
        dataSource.setMaxWait(60000);

        // 关键保活配置
        dataSource.setValidationQuery("SELECT 1");  // MySQL 用 SELECT 1,Oracle 用 SELECT 1 FROM DUAL
        dataSource.setTestWhileIdle(true);         // 空闲时检查连接有效性
        dataSource.setTestOnBorrow(true);          // 获取连接时检查有效性
        dataSource.setTimeBetweenEvictionRunsMillis(30000); // 30秒检测一次
        dataSource.setMinEvictableIdleTimeMillis(600000);  // 10分钟无活动则回收

        return dataSource;
    }

    public static UserMapper getUserMapper() {
        return Mappers.ofMapperClass(UserMapper.class);
    }
}

Flink RichFunction 实现 function/MyProcessFunction.java

package com.learn.function;

import com.learn.bean.User;
import com.learn.config.MyBatisFlexConfig;
import com.learn.mapper.UserMapper;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;

import java.util.List;

public class MyProcessFunction extends ProcessFunction<String, String> {
    private transient UserMapper mapper;
    private transient long lastRefreshTime;
    private static final long REFRESH_INTERVAL = 3600000; // 1小时刷新一次

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        // 初始化 MyBatis-Flex
        MyBatisFlexConfig.initialize();
        // 获取 UserMapper 实例
        mapper = MyBatisFlexConfig.getUserMapper();
        lastRefreshTime = System.currentTimeMillis();
    }

    @Override
    public void processElement(String value, ProcessFunction<String, String>.Context ctx, Collector<String> out) throws Exception {
        // 一小时刷新一次mapper
        if (System.currentTimeMillis() - lastRefreshTime > REFRESH_INTERVAL) {
            mapper = MyBatisFlexConfig.getUserMapper();
            lastRefreshTime = System.currentTimeMillis();
        }
        List<User> allUser = mapper.getAllUser();
        System.out.println("已有数据量: " + allUser.size());
        out.collect(value);
    }
}

Flink 主程序 job/Main

ipackage com.learn.job;

import com.learn.function.MyProcessFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class Main {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<String> ds = env.fromElements("1", "2", "3");

        SingleOutputStreamOperator<String> processDs = ds.process(new MyProcessFunction());

        processDs.print();

        env.execute();
    }
}

配置文件 rources: com/learn/mapper/UserMapper.xml

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">

<mapper namespace="com.learn.mapper.UserMapper">
    <!-- List<User> getAllUser(); -->
    <select id="getAllUser" resultType="com.learn.bean.User">
        SELECT *
        FROM user
    </select>
</mapper>