数据治理

binglog json 元数据校验

技术栈:spring-boot + mybatis plus + kafka + json-path + doris

整体思路: db binlog -> kafka binlog topic -> 主程序 -> kafka result topic -> doris

校验引擎:doris json函数

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.18</version>
    </parent>
    <groupId>com.bigdata</groupId>
    <artifactId>binlog-monitor</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <java.version>1.8</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <mybatis-plus.version>3.5.6</mybatis-plus.version>
        <dynamic-ds.version>4.2.0</dynamic-ds.version>
        <postgresql.version>42.7.3</postgresql.version>
        <mysql.version>8.0.33</mysql.version>
        <commons-codec.version>1.15</commons-codec.version>
    </properties>

    <dependencies>
        <!-- web -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!-- mybatis plus -->
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
            <version>${mybatis-plus.version}</version>
        </dependency>
        
        <!-- dynamic datasource -->
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>dynamic-datasource-spring-boot-starter</artifactId>
            <version>${dynamic-ds.version}</version>
        </dependency>

        <!-- mysql driver -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>${mysql.version}</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>

        <!-- Apache commons codec 用于MD5 -->
        <dependency>
            <groupId>commons-codec</groupId>
            <artifactId>commons-codec</artifactId>
            <version>${commons-codec.version}</version>
        </dependency>

        <!-- DevTools 开发时热部署 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>

        <!-- Test -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <!-- JSON Path(字段提取) -->
        <dependency>
            <groupId>com.jayway.jsonpath</groupId>
            <artifactId>json-path</artifactId>
            <version>2.9.0</version>
        </dependency>

        <!-- Gson(JSON 处理) -->
        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
        </dependency>

        <!-- Kafka 核心依赖(显式声明) -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <!-- 显式引入 kafka-clients(推荐) -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <!-- 版本由 Spring Boot 管理,通常无需指定 -->
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

resources/application.yml

server:
  port: 8088
  servlet:
    context-path: /

spring:
  datasource:
    dynamic:
      primary: mysql  # 默认数据源(用于 MyBatis Plus CRUD)
      strict: false   # 建议关闭 strict,避免未声明数据源时报错
      datasource:
        # 主数据源:MySQL(存储 monitor_rule / monitor_result)
        mysql:
          driver-class-name: com.mysql.cj.jdbc.Driver
          url: jdbc:mysql://localhost:3306/datacenter?useSSL=false&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true
          username: root
          password: root
          hikari:
            connection-timeout: 30000
            idle-timeout: 600000
            max-lifetime: 1800000
            # 规则刷新每 10 分钟一次,不需要大连接池
            maximum-pool-size: 5
            # 确保刷新时有可用连接
            minimum-idle: 2

        # Doris 数据源:用于规则校验
        doris:
          driver-class-name: com.mysql.cj.jdbc.Driver
          url: jdbc:mysql://hadoop002:9030/test?useSSL=false&serverTimezone=Asia/Shanghai
          username: root
          password: root
          hikari:
            # 快速失败,防止线程阻塞
            connection-timeout: 10000
            # 空闲 5 分钟回收(Doris 连接较轻量)
            idle-timeout: 300000
            # 连接最大存活 20 分钟
            max-lifetime: 1200000
            # 应对高并发(根据 Worker 数调整)1 -> 20 , 2 -> 15, 3 -> 10
            maximum-pool-size: 20
            # 预热连接,避免首次查询慢
            minimum-idle: 5


  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: binlog-monitor-group
      auto-offset-reset: latest
      enable-auto-commit: false
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    listener:
      ack-mode: manual

monitor:
  initial-topics: binlog-monitor
  violation-topic: monitor-violation-events

entity目录

package com.bigdata.monitor.entity;

import com.baomidou.mybatisplus.annotation.*;
import lombok.Data;

@Data
@TableName("monitor_rule")
public class MonitorRule {
    @TableId(type = IdType.AUTO)
    private Long id;
    private String ruleCode;      // 业务规则编码,如 linkman_01
    private String ruleType;      // TYPE/RANGE/ENUM/LENGTH/CUSTOM
    private String tableName;     // 监控表名
    private String ruleName;      // 规则名称
    private String simpleCondition; // 简化条件(基础类型)
    private String triggerCondition; // Doris表达式(CUSTOM类型)
    private String outputFields;  // 输出字段
    private String primaryField = "id";  // 表 primary key
    private Integer enabled;      // 是否启用
}
package com.bigdata.monitor.entity;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;

@Data
@TableName("monitor_result")
public class MonitorResult {
    @TableId(type = IdType.AUTO)
    private Long id;
    private String tableName;
    private String primaryKey;
    private String batchId;
    private String ruleName;
    private String ruleCode;      // 规则编码
    private String changedFields; // JSON字符串
}

mapper目录

package com.bigdata.monitor.mapper;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.bigdata.monitor.entity.MonitorRule;
import org.apache.ibatis.annotations.Mapper;

import java.util.List;

@Mapper
public interface MonitorRuleMapper extends BaseMapper<MonitorRule> {
    List<MonitorRule> selectActiveRulesByTable(String tableName);
}
package com.bigdata.monitor.mapper;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.bigdata.monitor.entity.MonitorResult;
import org.apache.ibatis.annotations.Mapper;

@Mapper
public interface MonitorResultMapper extends BaseMapper<MonitorResult> {
}

service目录

package com.bigdata.monitor.service;

import com.bigdata.monitor.entity.MonitorRule;
import com.bigdata.monitor.mapper.MonitorRuleMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

/**
 * 规则缓存服务
 * - 启动时加载全量规则
 * - 按表分组
 * - 同表内:基础监控(TYPE/ENUM/LENGTH)优先于业务监控(CUSTOM)
 * - 刷新策略:每 30 分钟最多刷新一次(懒加载,非定时调度)
 */
@Slf4j
@Service
public class RuleService {

    @Autowired
    private MonitorRuleMapper ruleMapper;

    // 缓存: tableName -> List<MonitorRule>(已排序)
    private volatile Map<String, List<MonitorRule>> ruleCache = new ConcurrentHashMap<>();

    // 上次刷新时间戳(毫秒)
    private volatile long lastRefreshTime = 0L;

    // 刷新间隔:30 分钟(单位:毫秒)
    private static final long REFRESH_INTERVAL_MILLIS = 30 * 60 * 1000L;

    /**
     * 应用启动时初始化规则缓存
     */
    @PostConstruct
    public void init() {
        refreshRules();
    }

    /**
     * 强制刷新规则缓存(供手动触发或初始化使用)
     */
    public synchronized void refreshRules() {
        try {
            List<MonitorRule> allRules = ruleMapper.selectList(
                    new com.baomidou.mybatisplus.core.conditions.query.QueryWrapper<MonitorRule>()
                            .eq("enabled", 1)
            );

            // 按表分组 + 排序
            Map<String, List<MonitorRule>> grouped = allRules.stream()
                    .collect(Collectors.groupingBy(
                            MonitorRule::getTableName,
                            Collectors.collectingAndThen(
                                    Collectors.toList(),
                                    list -> {
                                        // 排序: 基础类型在前,CUSTOM 在后
                                        list.sort((r1, r2) -> {
                                            boolean isCustom1 = "CUSTOM".equals(r1.getRuleType());
                                            boolean isCustom2 = "CUSTOM".equals(r2.getRuleType());
                                            if (isCustom1 && !isCustom2) return 1;
                                            if (!isCustom1 && isCustom2) return -1;
                                            return 0;
                                        });
                                        return list;
                                    }
                            )
                    ));

            this.ruleCache = new ConcurrentHashMap<>(grouped);
            this.lastRefreshTime = System.currentTimeMillis();
            log.info("Rule cache refreshed. Tables: {}, Total rules: {}", grouped.size(), allRules.size());
        } catch (Exception e) {
            log.error("Failed to refresh rule cache", e);
        }
    }

    /**
     * 获取指定表的规则列表(线程安全)
     * 若距离上次刷新超过 30 分钟,则自动触发刷新
     */
    public List<MonitorRule> getSortedRulesByTable(String tableName) {
        // 检查是否需要刷新(双重检查锁)
        if (System.currentTimeMillis() - lastRefreshTime > REFRESH_INTERVAL_MILLIS) {
            synchronized (this) {
                if (System.currentTimeMillis() - lastRefreshTime > REFRESH_INTERVAL_MILLIS) {
                    refreshRules();
                }
            }
        }
        return ruleCache.getOrDefault(tableName, Collections.emptyList());
    }
}
package com.bigdata.monitor.service;

import com.baomidou.mybatisplus.extension.service.IService;
import com.bigdata.monitor.entity.MonitorResult;

public interface MonitorResultService extends IService<MonitorResult> {
}

service/impl目录

package com.bigdata.monitor.service.impl;

import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.bigdata.monitor.entity.MonitorResult;
import com.bigdata.monitor.mapper.MonitorResultMapper;
import com.bigdata.monitor.service.MonitorResultService;
import org.springframework.stereotype.Service;

@Service
public class MonitorResultServiceImpl extends ServiceImpl<MonitorResultMapper, MonitorResult> implements MonitorResultService {
}

util目录

package com.bigdata.monitor.util;

import com.bigdata.monitor.entity.MonitorRule;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.List;

/**
 * 构建综合校验 SQL(1 消息 → 1 次查询)
 */
@Slf4j
@Component
public class CombinedRuleSqlBuilder {

    @Autowired
    private RuleConditionConverter converter;

    /**
     * 构建 SQL: SELECT (expr1) AS rule_code1, (expr2) AS rule_code2 FROM ...
     */
    public String buildSql(String binlogJson, List<MonitorRule> rules) {
        if (rules.isEmpty()) {
            throw new IllegalArgumentException("Rules list is empty");
        }

        // 转义单引号
        String escapedJson = binlogJson.replace("'", "''");

        // 构建 SELECT 子句
        StringBuilder selectClause = new StringBuilder();
        for (int i = 0; i < rules.size(); i++) {
            MonitorRule rule = rules.get(i);
            if (i > 0) selectClause.append(", ");

            // 验证 ruleCode 合法性(防止 SQL 注入)
            if (!rule.getRuleCode().matches("^[a-zA-Z0-9_]+$")) {
                throw new IllegalArgumentException("Invalid ruleCode: " + rule.getRuleCode());
            }

            String dorisExpr = converter.toDorisExpression(rule);
            selectClause.append("(")
                    .append(dorisExpr)
                    .append(") AS ")
                    .append(rule.getRuleCode());
        }

        return String.format(
                "SELECT %s FROM (SELECT json_parse('%s') AS data) t1",
                selectClause.toString(),
                escapedJson
        );
    }
}
package com.bigdata.monitor.util;

import com.jayway.jsonpath.DocumentContext;
import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.PathNotFoundException;
import com.google.gson.JsonObject;
import com.google.gson.JsonPrimitive;
import com.google.gson.JsonNull;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

/**
 * 使用 JsonPath 从 Binlog JSON 提取变更字段
 */
@Slf4j
@Component
public class JsonPathFieldExtractor {

    /**
     * 提取指定字段的 before/after 值
     * @param binlogJson 完整 Binlog JSON
     * @param outputFields 逗号分隔的字段列表
     * @return JSON 字符串: {"field": {"before": "...", "after": "..."}}
     */
    public String extractChangedFields(String binlogJson, String outputFields) {
        if (!StringUtils.hasText(outputFields)) {
            return "{}";
        }

        DocumentContext context = JsonPath.parse(binlogJson);
        JsonObject result = new JsonObject();

        for (String field : outputFields.split(",")) {
            field = field.trim();
            String beforePath = "$.before." + field;
            String afterPath = "$.after." + field;

            Object beforeVal = safeRead(context, beforePath);
            Object afterVal = safeRead(context, afterPath);

            JsonObject pair = new JsonObject();
            pair.add("before", beforeVal == null ? JsonNull.INSTANCE : new JsonPrimitive(beforeVal.toString()));
            pair.add("after", afterVal == null ? JsonNull.INSTANCE : new JsonPrimitive(afterVal.toString()));
            result.add(field, pair);
        }
        return result.toString();
    }

    private Object safeRead(DocumentContext context, String path) {
        try {
            return context.read(path);
        } catch (PathNotFoundException e) {
            return null;
        }
    }
}
package com.bigdata.monitor.util;

import com.google.gson.JsonObject;
import com.google.gson.JsonParser;

public class JsonUtils {
    public static String getTable(String json) {
        JsonObject obj = JsonParser.parseString(json).getAsJsonObject();
        return obj.get("table").getAsString();
    }

    public static String getPrimaryKey(String json, String primaryField) {
        if (primaryField == null || primaryField.trim().isEmpty()) {
            primaryField = "id"; // 默认
        }
        JsonObject obj = JsonParser.parseString(json).getAsJsonObject();
        JsonObject after = obj.getAsJsonObject("after");
        if (after != null && after.has(primaryField)) {
            return after.get(primaryField).getAsString();
        }
        return "unknown";
    }
}
package com.bigdata.monitor.util;

import com.bigdata.monitor.entity.MonitorRule;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

/**
 * 将规则配置转换为 Doris 布尔表达式
 * 基础监控仅包含:
 * - TYPE: 类型校验(严格模式、宽松模式?)
 * - ENUM: 码值校验
 * - LENGTH: 长度校验
 */
@Slf4j
@Component
public class RuleConditionConverter {

    /**
     * 转换规则为 Doris 表达式
     */
    public String toDorisExpression(MonitorRule rule) {
        if ("CUSTOM".equals(rule.getRuleType())) {
            return rule.getTriggerCondition();
        }

        // 基础规则:生成“通过条件”,然后外层加 NOT → 变成“违规条件”
        String dorisPassExpr = buildPassCondition(rule);
        return "NOT (" + dorisPassExpr + ")";
    }

    private String buildPassCondition(MonitorRule rule) {
        String simple = rule.getSimpleCondition();
        if (simple == null || !simple.contains(":")) {
            throw new IllegalArgumentException("Invalid simple_condition for rule: " + rule.getRuleCode());
        }

        String[] parts = simple.split(":", 2);
        String field = parts[0].trim();
        String spec = parts[1].trim();
        String jsonPath = "$.after." + field; // 默认从 after 提取

        switch (rule.getRuleType()) {
            case "TYPE":
                return convertTypeCheck(jsonPath, spec);
//            case "RANGE":
//                return convertRangeCheck(jsonPath, spec);
            case "ENUM":
                return convertEnumCheck(jsonPath, spec);
            case "LENGTH":
                return convertLengthCheck(jsonPath, spec);
            default:
                throw new IllegalArgumentException("Unsupported rule_type: " + rule.getRuleType());
        }
    }

    private String convertTypeCheck(String jsonPath, String typeSpec) {
        // 检查是否为宽松模式(以 ? 结尾)
        boolean isLenient = typeSpec.endsWith("?");
        String type = isLenient ? typeSpec.substring(0, typeSpec.length() - 1) : typeSpec;

        switch (type.toLowerCase()) {
            case "int":
            case "integer":
                if (isLenient) {
                    // 宽松整数:接受 JSON 数字 或 符合整数格式的字符串
                    return String.format(
                            "(" +
                                    "  get_json_bigint(data, '%s') IS NOT NULL" +          // 严格整数
                                    "  OR (" +
                                    "    get_json_string(data, '%s') IS NOT NULL" +        // 是字符串
                                    "    AND get_json_string(data, '%s') REGEXP '^-?(0|[1-9][0-9]*)$'" + // 且符合整数正则
                                    "  )" +
                                    ")",
                            jsonPath, jsonPath, jsonPath
                    );
                } else {
                    // 严格整数:仅接受 JSON 数字
                    return String.format("get_json_bigint(data, '%s') IS NOT NULL", jsonPath);
                }

            case "double":
            case "float":
                if (isLenient) {
                    // 宽松小数:接受 JSON 数字 或 符合小数格式的字符串
                    return String.format(
                            "(" +
                                    "  get_json_double(data, '%s') IS NOT NULL" +          // 严格小数
                                    "  OR (" +
                                    "    get_json_string(data, '%s') IS NOT NULL" +        // 是字符串
                                    "    AND get_json_string(data, '%s') REGEXP '^-?(0|[1-9][0-9]*)(\\\\.[0-9]+)?([eE][-+]?[0-9]+)?$'" + // 小数正则
                                    "  )" +
                                    ")",
                            jsonPath, jsonPath, jsonPath
                    );
                } else {
                    // 严格小数:仅接受 JSON 数字
                    return String.format("get_json_double(data, '%s') IS NOT NULL", jsonPath);
                }

            case "string":
                if (isLenient) {
                    throw new IllegalArgumentException("String type does not support lenient mode");
                }
                // 字符串校验:仅接受 JSON 字符串
                return String.format("get_json_string(data, '%s') IS NOT NULL", jsonPath);

            default:
                throw new IllegalArgumentException("Unsupported type: " + type);
        }
    }

    /**
     * SELECT CAST(json_unquote(get_json_string(data, '$.size')) AS DOUBLE ) BETWEEN 1 AND 1.2
     * FROM (
     *          SELECT json_parse('{"size": 1.2}') AS data
     *      ) t1;
     */
//    private String convertRangeCheck(String jsonPath, String rangeSpec) {
//        if (!rangeSpec.matches("^\\d+-\\d+$")) {
//            throw new IllegalArgumentException("Invalid range format: " + rangeSpec);
//        }
//        String[] bounds = rangeSpec.split("-");
//        int min = Integer.parseInt(bounds[0]);
//        int max = Integer.parseInt(bounds[1]);
//        return String.format(
//                "CAST(get_json_bigint(data, '%s') AS INT) BETWEEN %d AND %d",
//                jsonPath, min, max
//        );
//    }

    private String convertEnumCheck(String jsonPath, String enumSpec) {
        String[] values = enumSpec.split(",");
        StringBuilder inClause = new StringBuilder("get_json_string(data, '");
        inClause.append(jsonPath).append("') IN (");
        for (int i = 0; i < values.length; i++) {
            if (i > 0) inClause.append(", ");
            inClause.append("'").append(values[i].trim()).append("'");
        }
        inClause.append(")");
        return inClause.toString();
    }

    private String convertLengthCheck(String jsonPath, String lengthSpec) {
        int length = Integer.parseInt(lengthSpec);
        return String.format(
                "char_length(json_unquote(json_extract(data, '%s'))) = %d",
                jsonPath, length
        );
    }
}
package com.bigdata.monitor.util;

import com.baomidou.dynamic.datasource.annotation.DS;
import com.bigdata.monitor.entity.MonitorRule;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;

import java.util.*;

/**
 * 执行 Doris 综合校验,返回未通过的规则
 */
@Slf4j
@Service
public class RuleEvaluator {

    @Autowired
    private JdbcTemplate dorisJdbcTemplate;

    @Autowired
    private CombinedRuleSqlBuilder sqlBuilder;

    /**
     * 在 Doris 数据源上执行规则校验
     */
    @DS("doris") // 切换到 Doris 数据源
    public List<MonitorRule> evaluateRules(String binlogJson, List<MonitorRule> rules) {
        if (rules.isEmpty()) return Collections.emptyList();

        String sql = sqlBuilder.buildSql(binlogJson, rules);
        Map<String, Object> resultMap;
        try {
            resultMap = dorisJdbcTemplate.queryForMap(sql);
        } catch (Exception e) {
            log.error("Doris evaluation failed. SQL: {}", sql, e);
            return Collections.emptyList();
        }

        List<MonitorRule> failedRules = new ArrayList<>();
        for (MonitorRule rule : rules) {
            Object value = resultMap.get(rule.getRuleCode());
            if (isTrue(value)) {
                failedRules.add(rule);
            }
        }
        return failedRules;
    }

    private boolean isTrue(Object value) {
        if (value == null) return false;

        // 处理 BOOLEAN 类型 (true/false)
        if (value instanceof Boolean) {
            return (Boolean) value;
        }

        // 处理 INTEGER 类型 (1/0)
        if (value instanceof Number) {
            return ((Number) value).intValue() == 1;
        }

        // 处理 STRING 类型 ("true"/"false" 或 "1"/"0") - 安全兜底
        if (value instanceof String) {
            String str = ((String) value).toLowerCase();
            return "true".equals(str) || "1".equals(str);
        }

        return false;
    }
}

config目录

package com.bigdata.monitor.config;

import com.google.gson.Gson;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

/**
 * Kafka 生产者配置(用于发送违规事件到 monitor-violation-events)
 */
@Configuration
public class KafkaProducerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        // 可选:启用幂等性(防止重复)
        // props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        return new DefaultKafkaProducerFactory<>(props);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    public Gson gson() {
        return new Gson();
    }
}

consumer目录

package com.bigdata.monitor.consumer;

import com.bigdata.monitor.entity.MonitorRule;
import com.bigdata.monitor.entity.MonitorResult;
import com.bigdata.monitor.service.RuleService;
import com.bigdata.monitor.util.JsonPathFieldExtractor;
import com.bigdata.monitor.util.JsonUtils;
import com.bigdata.monitor.util.RuleEvaluator;
import com.google.gson.Gson;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.UUID;

/**
 * Binlog 消息消费者
 * - 从 Kafka 读取 Flink CDC 的 Binlog JSON
 * - 根据规则校验数据
 * - 将违规结果发送到 Kafka Topic(monitor-violation-events),而非直写 Doris
 */
@Slf4j
@Component
public class BinlogConsumer {

    @Autowired
    private RuleService ruleService;

    @Autowired
    private RuleEvaluator ruleEvaluator;

    @Autowired
    private JsonPathFieldExtractor fieldExtractor;

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    private Gson gson; // 用于序列化 MonitorResult 为 JSON

    @Value("${monitor.initial-topics}")
    private String initialTopics;

    @Value("${monitor.violation-topic:monitor-violation-events}")
    private String violationTopic;

    @KafkaListener(topics = "#{'${monitor.initial-topics}'.split(',')}", groupId = "binlog-monitor-group")
    public void consume(String binlogJsonMessage, Acknowledgment ack) {
        try {
            String tableName = JsonUtils.getTable(binlogJsonMessage);
            List<MonitorRule> rules = ruleService.getSortedRulesByTable(tableName);
            if (rules.isEmpty()) {
                ack.acknowledge(); // 无规则也应提交 offset
                return;
            }

            // 1. 针对当前binlog json一次性执行所有校验规则,将不符合要求的规则返回
            List<MonitorRule> failedRules = ruleEvaluator.evaluateRules(binlogJsonMessage, rules);

            // 2. 将每个违规结果发送到 Kafka
            for (MonitorRule rule : failedRules) {
                // 根据配置获取需要输出的字段 并获取变更前后的值
                String changedFields = fieldExtractor.extractChangedFields(binlogJsonMessage, rule.getOutputFields());
                MonitorResult result = new MonitorResult();
                result.setTableName(tableName);
                result.setPrimaryKey(JsonUtils.getPrimaryKey(binlogJsonMessage, rule.getPrimaryField()));
                result.setBatchId(UUID.randomUUID().toString());
                result.setRuleName(rule.getRuleName());
                result.setRuleCode(rule.getRuleCode());
                result.setChangedFields(changedFields);

                // 序列化为 JSON 并发送到违规事件 Topic
                String violationJson = gson.toJson(result);
                kafkaTemplate.send(violationTopic, violationJson);

                log.info("Rule violated: {} ({}) for pk={}", rule.getRuleName(), rule.getRuleCode(), result.getPrimaryKey());
            }
        } catch (Exception e) {
            log.error("Error processing message", e);
        } finally {
            ack.acknowledge(); // 手动提交 offset,确保 at-least-once
        }
    }
}

主程序

package com.bigdata.monitor;

import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
@MapperScan("com.bigdata.monitor.mapper")
public class BinlogMonitorApplication {
    public static void main(String[] args) {
        SpringApplication.run(BinlogMonitorApplication.class, args);
    }
}

简单测试:

package com.bigdata.monitor.test;

import com.bigdata.monitor.entity.MonitorResult;
import com.bigdata.monitor.entity.MonitorRule;
import com.bigdata.monitor.mapper.MonitorRuleMapper;
import com.bigdata.monitor.service.RuleService;
import com.bigdata.monitor.util.*;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.util.List;
import java.util.UUID;

@SpringBootTest
public class Test01 {

    @Autowired
    private MonitorRuleMapper monitorRuleMapper;


    @Autowired
    private RuleConditionConverter ruleConditionConverter;

    @Autowired
    private CombinedRuleSqlBuilder combinedRuleSqlBuilder;

    @Autowired
    private RuleEvaluator ruleEvaluator;

    @Autowired
    private JsonPathFieldExtractor fieldExtractor;

    @Autowired
    private RuleService ruleService;

    @Test
    public void test01() {
        MonitorRule monitorRule = monitorRuleMapper.selectById(1);
        System.out.println(monitorRule);
    }

    @Test
    public void test02() {
        MonitorRule rule01 = monitorRuleMapper.selectById(1);
        String s1 = ruleConditionConverter.toDorisExpression(rule01);
        System.out.println(s1);
    }

    @Test
    public void test03() {
        List<MonitorRule> rules = monitorRuleMapper.selectList(null);
//        System.out.println(rules);
        String cdcJson = "{ \"table\": \"linkman_information\", \"op\": \"u\", \"before\": { \"id\": \"1001\", \"contact_info\": \"12345678\", \"delete_flag\": 0, \"phone\": \"13812345678\" }, \"after\": { \"id\": \"1001\", \"contact_info\": \"0632-12345678\", \"delete_flag\": \"0\", \"phone\": \"1381234567\" } }";

        String res = combinedRuleSqlBuilder.buildSql(cdcJson, rules);
        System.out.println(res);
    }

    @Test
    public void test04() {
        // 1. 一次性执行所有规则
        List<MonitorRule> rules = monitorRuleMapper.selectList(null);
        String binlogJsonMessage = "{ \"table\": \"linkman_information\", \"op\": \"u\", \"before\": { \"id\": \"1001\", \"contact_info\": \"12345678\", \"delete_flag\": 0, \"phone\": \"13812345678\" }, \"after\": { \"id\": \"1001\", \"contact_info\": \"0632-12345678\", \"delete_flag\": \"0\", \"phone\": \"1381234567\" } }";
        List<MonitorRule> failedRules = ruleEvaluator.evaluateRules(binlogJsonMessage, rules);
        // System.out.println(failedRules);
        String tableName = "linkman_information";

        // 2. 保存失败结果
        for (MonitorRule rule : failedRules) {
            String changedFields = fieldExtractor.extractChangedFields(binlogJsonMessage, rule.getOutputFields());
            MonitorResult result = new MonitorResult();
            result.setTableName(tableName);
            result.setPrimaryKey(JsonUtils.getPrimaryKey(binlogJsonMessage, rule.getPrimaryField()));
            result.setBatchId(UUID.randomUUID().toString());
            result.setRuleName(rule.getRuleName());
            result.setRuleCode(rule.getRuleCode());
            result.setChangedFields(changedFields);
            System.out.println(result);
        }
    }

    @Test
    public void test05() {
//        ruleService.refreshRules();
        List<MonitorRule> rules = ruleService.getSortedRulesByTable("linkman_information");
        System.out.println(rules);
    }

}

说明:

## monitor_rule(规则配置表)

```sql
CREATE TABLE `monitor_rule` (
    `id` bigint NOT NULL AUTO_INCREMENT,
    `rule_code` varchar(64) NOT NULL COMMENT '业务规则编码,全局唯一',
    `rule_type` varchar(20) NOT NULL DEFAULT 'CUSTOM' COMMENT '规则类型: TYPE/ENUM/LENGTH/CUSTOM',
    `table_name` varchar(64) NOT NULL COMMENT '监控的表名',
    `rule_name` varchar(128) NOT NULL COMMENT '规则名称',
    `simple_condition` varchar(255) DEFAULT NULL COMMENT '简化条件(仅基础类型)',
    `trigger_condition` text COMMENT 'Doris表达式(仅CUSTOM类型)',
    `output_fields` varchar(255) DEFAULT NULL COMMENT '输出字段列表,逗号分隔',
    `enabled` int DEFAULT '1' COMMENT '是否启用',
    `primary_field` varchar(100) DEFAULT 'id' COMMENT 'primary key field',
    PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
```

## monitor_result(监控结果表)

```sql
CREATE TABLE `datacenter`.`monitor_result` (
    `id` bigint NOT NULL AUTO_INCREMENT,
    `table_name` varchar(64) NOT NULL,
    `primary_key` varchar(128) NOT NULL COMMENT '主键值',
    `batch_id` varchar(64) NOT NULL COMMENT '批次ID',
    `rule_name` varchar(128) NOT NULL COMMENT '规则名称',
    `rule_code` varchar(64) NOT NULL COMMENT '规则编码',
    `changed_fields` json DEFAULT NULL COMMENT '变更字段详情',
    `create_time` datetime DEFAULT NULL,
    PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 
```

## 校验示例
### 1. 插入规则配置

```sql
-- 基础校验:delete_flag 必须是整数
INSERT INTO monitor_rule (id, rule_code, rule_type, table_name, simple_condition, rule_name, output_fields)
VALUES (1, 'linkman_01', 'TYPE', 'linkman_information', 'delete_flag:int', 'delete_flag 类型校验', 'delete_flag');
-- 宽松模式 从字符串中提取 数字、小数类型
-- VALUES (1, 'linkman_01', 'TYPE', 'linkman_information', 'delete_flag:int', 'delete_flag 类型校验', 'delete_flag');

-- 基础校验:phone 长度=11
INSERT INTO monitor_rule (id, rule_code, rule_type, table_name, simple_condition, rule_name, output_fields)
VALUES (2, 'linkman_02', 'LENGTH', 'linkman_information', 'phone:11', '手机号长度校验', 'phone');
-- 业务校验:联系方式变更且未删除
INSERT INTO monitor_rule (id, rule_code, rule_type, table_name, trigger_condition, rule_name, output_fields)
VALUES (
   3,
   'contact_01',
   'CUSTOM',
   'linkman_information',
   "get_json_string(data, '$.before.contact_info') != get_json_string(data, '$.after.contact_info')",
   '联系方式变更',
   'contact_info'
);
```

### 2. 测试 Binlog JSON

```sql
CREATE TABLE IF NOT EXISTS cdc_data (
    id BIGINT,
    data JSON COMMENT 'cdc变更记录'
) ENGINE=OLAP
unique KEY(id)
DISTRIBUTED BY HASH(id) BUCKETS 1
PROPERTIES("replication_num" = "1");
```

```json
{
  "table": "linkman_information",
  "op": "u",
  "before": {
    "id": "1001",
    "contact_info": "12345678",
    "delete_flag": 0,
    "phone": "13812345678"
  },
  "after": {
    "id": "1001",
    "contact_info": "0632-12345678",
    "delete_flag": "0",
    "phone": "1381234567"
  }
}
```

```sql
INSERT INTO test.cdc_data (id, data)
VALUES(1, '{ "table": "linkman_information", "op": "u", "before": { "id": "1001", "contact_info": "12345678", "delete_flag": 0, "phone": "13812345678" }, "after": { "id": "1001", "contact_info": "0632-12345678", "delete_flag": "0", "phone": "1381234567" } }');



-- json基础类型判断
-- 条件判断
SELECT if(get_json_bigint(data, '$.age') IS NOT NULL, 1, 0) as bigint_,
       if(get_json_double(data, '$.score') IS NOT NULL, 1, 0) as double_,
       if(get_json_string(data, '$.name') IS NOT NULL, 1, 0) as string_,
       if(get_json_string(data, '$.name') = 'bob' and get_json_string(data, '$.age') > 18, 1, 0) as contact_01
FROM (
         SELECT json_parse('{"name": "bob", "age": 20, "score": 99.99}') data
     ) t1;
```

### 程序运行 输出sql

```sql
SELECT (get_json_bigint(data, '$.after.delete_flag') IS NOT NULL)                                        AS linkman_01,
       (CHAR_LENGTH(json_unquote(json_extract(data, '$.after.phone'))) = 10)                             AS linkman_02,
       (get_json_string(data, '$.before.contact_info') != get_json_string(data, '$.after.contact_info')) AS contact_01
FROM (SELECT json_parse('{ "table": "linkman_information", "op": "u", "before": { "id": "1001", "contact_info": "12345678", "delete_flag": 0, "phone": "13812345678" }, "after": { "id": "1001", "contact_info": "0632-12345678", "delete_flag": "0", "phone": "1381234567" } }') AS data) t1

```

- 宽松模式

```sql
SELECT ((get_json_bigint(data, '$.after.delete_flag') IS NOT NULL OR
         (get_json_string(data, '$.after.delete_flag') IS NOT NULL AND
          get_json_string(data, '$.after.delete_flag') REGEXP '^-?(0|[1-9][0-9]*)$')))                   AS linkman_01,
       (char_length(json_unquote(json_extract(data, '$.after.phone'))) = 11)                             AS linkman_02,
       (get_json_string(data, '$.before.contact_info') != get_json_string(data, '$.after.contact_info')) AS contact_01,
       (get_json_string(data, '$.after.delete_flag') IN ('0', '1'))                                      AS linkman_03
FROM (SELECT json_parse('{ "table": "linkman_information", "op": "u", "before": { "id": "1001", "contact_info": "12345678", "delete_flag": 0, "phone": "13812345678" }, "after": { "id": "1001", "contact_info": "0632-12345678", "delete_flag": "0", "phone": "1381234567" } }') AS data) t1

```

- 严格模式
```sql
SELECT (char_length(json_unquote(json_extract(data, '$.after.phone'))) = 11)                             AS linkman_02,
       (get_json_string(data, '$.before.contact_info') != get_json_string(data, '$.after.contact_info')) AS contact_01,
       (get_json_string(data, '$.after.delete_flag') IN ('0', '1'))                                      AS linkman_03,
       (get_json_bigint(data, '$.after.delete_flag') IS NOT NULL)                                        AS linkman_01
FROM (SELECT json_parse('{ "table": "linkman_information", "op": "u", "before": { "id": "1001", "contact_info": "12345678", "delete_flag": 0, "phone": "13812345678" }, "after": { "id": "1001", "contact_info": "0632-12345678", "delete_flag": "0", "phone": "1381234567" } }') AS data) t1

```

### 3. 预期结果

- 规则 linkman_01 触发(delete_flag 非整数)
- 规则 linkman_02 触发(phone 长度≠11)
- 规则 contact_01 触发(contact_info 变更且 delete_flag='invalid'≠'0' → 不触发!)

### kafka command

-- start server

```shell
/data/zookeeper/bin/zkServer.sh start
/data/kafka/bin/kafka-server-start.sh -daemon /data/kafka/config/server.properties
```

-- list topics

```shell
/data/kafka/bin/kafka-topics.sh --list  --bootstrap-server localhost:9092
```

- create topic

```shell
/data/kafka/bin/kafka-topics.sh --create --topic binlog-monitor  --bootstrap-server localhost:9092 --partitions 3
/data/kafka/bin/kafka-topics.sh --create --topic monitor-violation-events  --bootstrap-server localhost:9092 --partitions 3
```

-- producer

```shell
/data/kafka/bin/kafka-console-producer.sh --topic binlog-monitor --bootstrap-server localhost:9092
```

-- consumer

```shell
/data/kafka/bin/kafka-console-consumer.sh --topic monitor-violation-events --bootstrap-server localhost:9092
```

sqlite引擎解析版

package com.bigdata.monitor.util;

import lombok.extern.slf4j.Slf4j;
import org.sqlite.SQLiteConfig;

import java.sql.*;
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
 * 基于 SQLite in-memory 的 JSON 规则表达式求值器。
 * 支持使用 JSON Path(如 $.before.field)在 SQL 条件中进行字段比较。
 * 在执行前自动校验所有 JSON 路径是否存在,避免因路径错误导致规则静默失效。
 */
@Slf4j
public class SqliteJsonRuleEvaluator {

    /**
     * SQLite 内存数据库连接 URL
     */
    private static final String SQLITE_URL = "jdbc:sqlite::memory:";

    /**
     * 每个线程持有一个只读的 SQLite 内存数据库连接
     */
    private static final ThreadLocal<Connection> connHolder = ThreadLocal.withInitial(() -> {
        try {
            SQLiteConfig config = new SQLiteConfig();
            config.setReadOnly(true); // 必须在创建连接前设置只读
            Connection conn = config.createConnection(SQLITE_URL);
            // 注册 REGEXP 函数
            org.sqlite.Function.create(conn, "REGEXP", new org.sqlite.Function() {
                @Override
                protected void xFunc() throws SQLException {
                    String pattern = value_text(0);
                    String value = value_text(1);
                    if (value == null) {
                        result(0);
                        return;
                    }
                    try {
                        result(java.util.regex.Pattern.matches(pattern, value) ? 1 : 0);
                    } catch (Exception e) {
                        result(0);
                    }
                }
            });
            return conn;
        } catch (SQLException e) {
            throw new IllegalStateException("Failed to initialize SQLite in-memory database connection", e);
        }
    });

    /**
     * 用于匹配标准点号风格的 JSON Path,例如:$.a.b, $.before.contact_info
     * 要求:
     * - 以 "$." 开头
     * - 后续由字母、数字、下划线、点号组成(不包含空格、引号、方括号等)
     * - 不允许连续点(..)或结尾为点
     */
    private static final Pattern JSON_PATH_PATTERN = Pattern.compile("\\$\\.[\\w.]+");

    /**
     * 从给定的条件表达式中提取所有符合 $.xxx.yyy 格式的 JSON 路径
     *
     * @param expression 包含 JSON Path 的条件表达式,例如:
     *                   "$.before.contact_info != $.after.contact_info"
     * @return 去重后的路径集合,例如:{"$.before.contact_info", "$.after.contact_info"}
     */
    private static Set<String> extractJsonPaths(String expression) {
        if (expression == null) {
            return Collections.emptySet();
        }
        Set<String> paths = new LinkedHashSet<>();
        Matcher matcher = JSON_PATH_PATTERN.matcher(expression);
        while (matcher.find()) {
            String path = matcher.group();
            // 过滤非法形式:不能以点结尾,不能包含连续点
            if (!path.endsWith(".") && !path.contains("..")) {
                paths.add(path);
            }
        }
        return paths;
    }

    /**
     * 校验 JSON 路径是否存在,采用快速失败策略:只要发现一个路径不存在,立即返回 false。
     *
     * @param jsonPayload JSON 字符串
     * @param paths       要校验的 JSON 路径集合
     * @return true 表示所有路径均存在;false 表示至少第一个缺失路径已记录
     */
    private static boolean validateJsonPaths(String jsonPayload, Set<String> paths) {
        if (paths == null || paths.isEmpty()) {
            return true;
        }

        Connection conn = connHolder.get();

        for (String path : paths) {
            PreparedStatement stmt = null;
            ResultSet rs = null;
            try {
                stmt = conn.prepareStatement("SELECT json_type(?, ?)");
                stmt.setString(1, jsonPayload);
                stmt.setString(2, path);
                rs = stmt.executeQuery();

                if (!rs.next() || rs.getString(1) == null) {
                    log.warn("JSON path does not exist: {}, json preview: {}",
                            path,
                            jsonPayload != null ? jsonPayload.substring(0, Math.min(100, jsonPayload.length())) : "null");
                    return false; // ✅ 快速失败:发现一个缺失就返回
                }
            } catch (SQLException e) {
                log.warn("Error while validating JSON path: {}", path, e);
                return false;
            } finally {
                // 安全关闭资源
                if (rs != null) {
                    try { rs.close(); } catch (SQLException ignored) {}
                }
                if (stmt != null) {
                    try { stmt.close(); } catch (SQLException ignored) {}
                }
            }
        }
        return true;
    }

    /**
     * 执行 JSON 条件表达式求值
     *
     * @param jsonPayload      待评估的 JSON 字符串,必须为合法 JSON
     * @param jsonPathCondition SQL WHERE 子句,允许使用 "data" 作为 JSON 列名,并引用 JSON Path,
     *                         例如:json_extract(data, '$.before.contact_info') != json_extract(data, '$.after.contact_info')
     *                         或更简洁地,由调用方保证表达式中路径已转换为 json_extract 形式。
     *                         ⚠️ 注意:本方法会自动提取 $.xxx 路径做存在性校验,但表达式本身必须是合法 SQLite 表达式。
     * @return 如果条件成立返回 true,否则返回 false;执行出错或路径缺失也返回 false
     */
    public static boolean evaluate(String jsonPayload, String jsonPathCondition) {
        if (jsonPayload == null || jsonPathCondition == null) {
            return false;
        }

        // 1. 从条件表达式中提取所有 $.xxx.yyy 形式的路径
        Set<String> requiredPaths = extractJsonPaths(jsonPathCondition);

        // 2. 校验所有路径在当前 JSON 中是否存在
        if (!validateJsonPaths(jsonPayload, requiredPaths)) {
            // 路径缺失已通过 warn 日志上报,此处不再执行表达式,直接返回 false
            return false;
        }

        // 3. 构造可执行的 SQL:将 jsonPayload 作为虚拟表的 data 列
        String sql = "SELECT 1 FROM (SELECT ? AS data) WHERE " + jsonPathCondition;

        Connection conn = connHolder.get();
        try (PreparedStatement stmt = conn.prepareStatement(sql)) {
            stmt.setString(1, jsonPayload);
            try (ResultSet rs = stmt.executeQuery()) {
                return rs.next(); // 有结果即条件成立
            }
        } catch (SQLException e) {
            log.warn("SQLite evaluation failed. Condition: {}, JSON preview: {}",
                    jsonPathCondition,
                    jsonPayload.length() > 100 ? jsonPayload.substring(0, 100) : jsonPayload,
                    e);
            return false;
        }
    }
}
package com.bigdata.monitor.util;

import com.bigdata.monitor.entity.MonitorRule;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;

@Component
@RequiredArgsConstructor
public class UnifiedRuleEngine {

    public boolean evaluate(String binlogJson, MonitorRule rule) {
        String expr;
        if ("CUSTOM".equals(rule.getRuleType())) {
            // 替换 data -> ?
            expr = rule.getTriggerCondition();
        } else {
            // 基础规则:生成“通过条件”,然后外层加 NOT → 变成“违规条件”
            expr = buildPassCondition(rule);
            expr = "NOT (" + expr + ")";
        }
        return SqliteJsonRuleEvaluator.evaluate(binlogJson, expr);
    }

    private String buildPassCondition(MonitorRule rule) {
        String simple = rule.getSimpleCondition();
        if (simple == null || !simple.contains(":")) {
            throw new IllegalArgumentException("Invalid simple_condition for rule: " + rule.getRuleCode());
        }

        String[] parts = simple.split(":", 2);
        String field = parts[0].trim();
        String spec = parts[1].trim();
        String jsonPath = "$.after." + field; // 默认从 after 提取

        switch (rule.getRuleType()) {
            case "TYPE":
                return convertTypeCheck(jsonPath, spec);
            case "ENUM":
                return convertEnumCheck(jsonPath, spec);
            case "LENGTH":
                return convertLengthCheck(jsonPath, spec);
            default:
                throw new IllegalArgumentException("Unsupported rule_type: " + rule.getRuleType());
        }
    }

    private String convertTypeCheck(String jsonPath, String typeSpec) {
        // 检查是否为宽松模式(以 ? 结尾)
        boolean isLenient = typeSpec.endsWith("?");
        String type = isLenient ? typeSpec.substring(0, typeSpec.length() - 1) : typeSpec;

        switch (type.toLowerCase()) {
            case "int":
            case "integer":
                if (isLenient) {
                    // 宽松整数:接受 JSON 数字 或 符合整数格式的字符串
                    return String.format("json_type(json_extract(data, '%s')) = 'integer'", jsonPath);
                } else {
                    // 严格整数:仅接受 JSON 数字
                    return String.format("json_type(data, '%s') = 'integer'", jsonPath);
                }

            case "double":
            case "float":
                if (isLenient) {
                    // 宽松小数:接受 JSON 数字 或 符合小数格式的字符串
                    return String.format("json_type(json_extract(data, '%s')) = 'real'", jsonPath);
                } else {
                    // 严格小数:仅接受 JSON 数字
                    return String.format("json_type(data, '%s') = 'real'", jsonPath);
                }

            case "string":
                if (isLenient) {
                    throw new IllegalArgumentException("String type does not support lenient mode");
                }
                // 字符串校验:仅接受 JSON 字符串
                return String.format("json_type(data, '%s') = 'text'", jsonPath);

            default:
                throw new IllegalArgumentException("Unsupported type: " + type);
        }
    }

    private String convertEnumCheck(String jsonPath, String enumSpec) {
        String[] values = enumSpec.split(",");
        StringBuilder inClause = new StringBuilder("json_extract(data, '");
        inClause.append(jsonPath).append("') IN (");
        for (int i = 0; i < values.length; i++) {
            if (i > 0) inClause.append(", ");
            inClause.append("'").append(values[i].trim()).append("'");
        }
        inClause.append(")");
        return inClause.toString();
    }

    private String convertLengthCheck(String jsonPath, String lengthSpec) {
        int length = Integer.parseInt(lengthSpec);
        return String.format(
                "length(json_extract(data, '%s')) = %d",
                jsonPath, length
        );
    }
}

主程序

package com.bigdata.monitor.consumer;

import com.bigdata.monitor.entity.MonitorRule;
import com.bigdata.monitor.entity.MonitorResult;
import com.bigdata.monitor.service.RuleService;
import com.bigdata.monitor.util.JsonPathFieldExtractor;
import com.bigdata.monitor.util.JsonUtils;
import com.bigdata.monitor.util.UnifiedRuleEngine;
import com.google.gson.Gson;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.UUID;

/**
 * Binlog 消息消费者
 * - 从 Kafka 读取 Flink CDC 的 Binlog JSON(格式包含 table/op/before/after)
 * - 根据缓存的监控规则,逐条校验当前消息
 * - 若某条规则校验失败(即数据违规),则生成违规结果
 * - 将违规结果序列化为 JSON,并发送到 Kafka 违规事件 Topic(如 monitor-violation-events)
 *
 * 架构说明:
 * - 不再使用“批量 SQL 表达式”方式(如 CombinedRuleSqlBuilder)
 * - 改为“逐条规则独立校验”模式,由 UnifiedRuleEngine 自动路由:
 *     • 基础规则(TYPE/ENUM/LENGTH/PATTERN) → 使用 json-schema-validator
 *     • 自定义规则(CUSTOM) → 使用 SQLite 内存表达式求值
 */
@Slf4j
@Component
public class BinlogConsumer {

    @Autowired
    private RuleService ruleService;

    @Autowired
    private UnifiedRuleEngine unifiedRuleEngine;

    @Autowired
    private JsonPathFieldExtractor fieldExtractor;

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    private Gson gson;

    @Value("${monitor.initial-topics}")
    private String initialTopics;

    @Value("${monitor.violation-topic:monitor-violation-events}")
    private String violationTopic;

    /**
     * 监听初始配置的 Kafka Topic,消费 Binlog JSON 消息
     *
     * @param binlogJsonMessage Flink CDC 输出的完整 Binlog JSON 字符串
     *                          示例: {"table":"user","op":"u","before":{...},"after":{...}}
     * @param ack 手动 ACK 控制,确保消息处理完成后才提交 offset(at-least-once 语义)
     */
    @KafkaListener(topics = "#{'${monitor.initial-topics}'.split(',')}", groupId = "binlog-monitor-group")
    public void consume(String binlogJsonMessage, Acknowledgment ack) {
        try {
            // 1. 从 Binlog JSON 中提取表名(如 "user")
            String tableName = JsonUtils.getTable(binlogJsonMessage);
            if (tableName == null || tableName.isEmpty()) {
                log.warn("Skip message: table name not found in JSON: {}", binlogJsonMessage);
                ack.acknowledge();
                return;
            }

            // 2. 获取该表对应的所有监控规则(已按优先级排序,基础规则在前)
            List<MonitorRule> rules = ruleService.getSortedRulesByTable(tableName);
            if (rules.isEmpty()) {
                // 无规则则跳过,但仍需提交 offset 避免堆积
                ack.acknowledge();
                return;
            }

            // 3. 【核心逻辑】逐条校验规则
            //    对每条规则,调用 UnifiedRuleEngine 进行独立校验
            //    如果返回 false,表示该规则校验失败(即数据违规)
            String batchId = UUID.randomUUID().toString();
            for (MonitorRule rule : rules) {
                try {
                    boolean passed = unifiedRuleEngine.evaluate(binlogJsonMessage, rule);
                    if (passed) {
                        // 4. 规则校验成功 → 生成违规结果

                        // 提取主键值(用于标识哪条记录违规)
                        String primaryKey = JsonUtils.getPrimaryKey(binlogJsonMessage, rule.getPrimaryField());

                        // 根据 output_fields 配置,提取 before/after 的变更字段值
                        String changedFields = fieldExtractor.extractChangedFields(binlogJsonMessage, rule.getOutputFields());

                        // 构建违规结果对象
                        MonitorResult result = new MonitorResult();
                        result.setTableName(tableName);
                        result.setPrimaryKey(primaryKey);
                        result.setBatchId(batchId); // 用于去重或追踪
                        result.setRuleName(rule.getRuleName());
                        result.setRuleCode(rule.getRuleCode());
                        result.setChangedFields(changedFields); // JSON 字符串,如 {"name":{"before":"A","after":"B"}}

                        // 序列化为 JSON 并发送到违规事件 Topic
                        String violationJson = gson.toJson(result);
                        kafkaTemplate.send(violationTopic, violationJson);

                        log.info("Rule violated: {} ({}) for table={}, pk={}",
                                rule.getRuleName(), rule.getRuleCode(), tableName, primaryKey);
                    }
                } catch (Exception e) {
                    // 单条规则异常不应中断整个消息处理
                    log.error("Error evaluating rule: {} ({}), table: {}, JSON preview: {}",
                            rule.getRuleName(), rule.getRuleCode(), tableName,
                            binlogJsonMessage != null ? binlogJsonMessage.substring(0, Math.min(200, binlogJsonMessage.length())) : "null", e);
                    // 继续下一条规则
                }
            }

        } catch (Exception e) {
            // 整体消息处理异常(如 JSON 解析失败)
            log.error("Fatal error processing Kafka message", e);
            // 注意:即使异常,仍提交 offset(避免消息阻塞),但需监控告警
        } finally {
            // 手动提交 offset,确保 at-least-once 语义
            // 即使部分规则失败或异常,也认为消息“已处理”
            ack.acknowledge();
        }
    }
}

测试

package com.bigdata.monitor.test;

import com.bigdata.monitor.entity.MonitorResult;
import com.bigdata.monitor.entity.MonitorRule;
import com.bigdata.monitor.service.RuleService;
import com.bigdata.monitor.util.*;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.util.List;
import java.util.UUID;

@SpringBootTest
public class Test02 {
    @Autowired
    private JsonSchemaRuleEvaluator jsonSchemaRuleEvaluator;

    @Autowired
    private RuleService ruleService;

    @Autowired
    private UnifiedRuleEngine unifiedRuleEngine;

    @Autowired
    private JsonPathFieldExtractor fieldExtractor;

    @Test
    public void test01() {
        String cdcJson = "{ \"table\": \"linkman_information\", \"op\": \"u\", \"before\": { \"id\": \"1001\", \"contact_info\": \"12345678\", \"delete_flag\": 0, \"phone\": \"13812345678\" }, \"after\": { \"id\": \"1001\", \"contact_info\": \"0632-12345678\", \"delete_flag\": \"0\", \"phone\": \"13812345678\" } }";
        boolean evaluate = jsonSchemaRuleEvaluator.evaluate(cdcJson, "id:1000-1002");
        System.out.println(evaluate);
    }

    @Test
    public void test02() {
        String cdcJson = "{ \"table\": \"linkman_information\", \"op\": \"u\", \"before\": { \"id\": \"1001\", \"contact_info\": \"1836668860\", \"delete_flag\": 0, \"phone\": \"13812345678\" }, \"after\": { \"id\": \"1001\", \"contact_info\": \"0632-12345678\", \"delete_flag\": \"0\", \"phone\": \"13812345678\" } }";
//        boolean evaluate = SqliteJsonRuleEvaluator.evaluate(cdcJson, "json_extract(data, '$.before.contact_info') REGEXP '^1[3-9]\\d{9}$'");
        // 宽松模式
//        boolean evaluate = SqliteJsonRuleEvaluator.evaluate(cdcJson, "json_type(json_extract(data, '$.after.delete_flag')) = 'integer'");
        // 严格模式
        boolean evaluate = SqliteJsonRuleEvaluator.evaluate(cdcJson, "json_type(data, '$.after.delete_flag') = 'integer'");
        System.out.println(evaluate);
    }

    @Test
    public void test03() {
        String binlogJsonMessage = "{ \"table\": \"linkman_information\", \"op\": \"u\", \"before\": { \"id\": \"1001\", \"contact_info\": \"12345678\", \"delete_flag\": 0, \"phone\": \"13812345678\" }, \"after\": { \"id\": \"1001\", \"contact_info\": \"0632-12345678\", \"delete_flag\": \"0\", \"phone\": \"1381234567\" } }";
        String tableName = JsonUtils.getTable(binlogJsonMessage);
        List<MonitorRule> rules = ruleService.getSortedRulesByTable(tableName);
        for (MonitorRule rule : rules) {
            try {
                boolean passed = unifiedRuleEngine.evaluate(binlogJsonMessage, rule);
                if (passed) {
                    // 4. 规则校验成功 → 生成违规结果

                    // 提取主键值(用于标识哪条记录违规)
                    String primaryKey = JsonUtils.getPrimaryKey(binlogJsonMessage, rule.getPrimaryField());

                    // 根据 output_fields 配置,提取 before/after 的变更字段值
                    String changedFields = fieldExtractor.extractChangedFields(binlogJsonMessage, rule.getOutputFields());

                    // 构建违规结果对象
                    MonitorResult result = new MonitorResult();
                    result.setTableName(tableName);
                    result.setPrimaryKey(primaryKey);
                    result.setBatchId(UUID.randomUUID().toString()); // 用于去重或追踪
                    result.setRuleName(rule.getRuleName());
                    result.setRuleCode(rule.getRuleCode());
                    result.setChangedFields(changedFields); // JSON 字符串,如 {"name":{"before":"A","after":"B"}}

                    System.out.println(result);
                }
            } catch (Exception e) {
                // 单条规则异常不应中断整个消息处理
                System.out.println("Error evaluating rule: " + rule.getRuleName());
                // 继续下一条规则
            }
        }
    }
}