文章
binglog json 元数据校验
技术栈:spring-boot + mybatis plus + kafka + json-path + doris
整体思路: db binlog -> kafka binlog topic -> 主程序 -> kafka result topic -> doris
校验引擎:doris json函数
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.18</version>
</parent>
<groupId>com.bigdata</groupId>
<artifactId>binlog-monitor</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<mybatis-plus.version>3.5.6</mybatis-plus.version>
<dynamic-ds.version>4.2.0</dynamic-ds.version>
<postgresql.version>42.7.3</postgresql.version>
<mysql.version>8.0.33</mysql.version>
<commons-codec.version>1.15</commons-codec.version>
</properties>
<dependencies>
<!-- web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- mybatis plus -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>${mybatis-plus.version}</version>
</dependency>
<!-- dynamic datasource -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>dynamic-datasource-spring-boot-starter</artifactId>
<version>${dynamic-ds.version}</version>
</dependency>
<!-- mysql driver -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!-- Apache commons codec 用于MD5 -->
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>${commons-codec.version}</version>
</dependency>
<!-- DevTools 开发时热部署 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<!-- Test -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- JSON Path(字段提取) -->
<dependency>
<groupId>com.jayway.jsonpath</groupId>
<artifactId>json-path</artifactId>
<version>2.9.0</version>
</dependency>
<!-- Gson(JSON 处理) -->
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
<!-- Kafka 核心依赖(显式声明) -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!-- 显式引入 kafka-clients(推荐) -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<!-- 版本由 Spring Boot 管理,通常无需指定 -->
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>resources/application.yml
server:
port: 8088
servlet:
context-path: /
spring:
datasource:
dynamic:
primary: mysql # 默认数据源(用于 MyBatis Plus CRUD)
strict: false # 建议关闭 strict,避免未声明数据源时报错
datasource:
# 主数据源:MySQL(存储 monitor_rule / monitor_result)
mysql:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3306/datacenter?useSSL=false&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true
username: root
password: root
hikari:
connection-timeout: 30000
idle-timeout: 600000
max-lifetime: 1800000
# 规则刷新每 10 分钟一次,不需要大连接池
maximum-pool-size: 5
# 确保刷新时有可用连接
minimum-idle: 2
# Doris 数据源:用于规则校验
doris:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://hadoop002:9030/test?useSSL=false&serverTimezone=Asia/Shanghai
username: root
password: root
hikari:
# 快速失败,防止线程阻塞
connection-timeout: 10000
# 空闲 5 分钟回收(Doris 连接较轻量)
idle-timeout: 300000
# 连接最大存活 20 分钟
max-lifetime: 1200000
# 应对高并发(根据 Worker 数调整)1 -> 20 , 2 -> 15, 3 -> 10
maximum-pool-size: 20
# 预热连接,避免首次查询慢
minimum-idle: 5
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: binlog-monitor-group
auto-offset-reset: latest
enable-auto-commit: false
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
listener:
ack-mode: manual
monitor:
initial-topics: binlog-monitor
violation-topic: monitor-violation-eventsentity目录
package com.bigdata.monitor.entity;
import com.baomidou.mybatisplus.annotation.*;
import lombok.Data;
@Data
@TableName("monitor_rule")
public class MonitorRule {
@TableId(type = IdType.AUTO)
private Long id;
private String ruleCode; // 业务规则编码,如 linkman_01
private String ruleType; // TYPE/RANGE/ENUM/LENGTH/CUSTOM
private String tableName; // 监控表名
private String ruleName; // 规则名称
private String simpleCondition; // 简化条件(基础类型)
private String triggerCondition; // Doris表达式(CUSTOM类型)
private String outputFields; // 输出字段
private String primaryField = "id"; // 表 primary key
private Integer enabled; // 是否启用
}package com.bigdata.monitor.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
@Data
@TableName("monitor_result")
public class MonitorResult {
@TableId(type = IdType.AUTO)
private Long id;
private String tableName;
private String primaryKey;
private String batchId;
private String ruleName;
private String ruleCode; // 规则编码
private String changedFields; // JSON字符串
}
mapper目录
package com.bigdata.monitor.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.bigdata.monitor.entity.MonitorRule;
import org.apache.ibatis.annotations.Mapper;
import java.util.List;
@Mapper
public interface MonitorRuleMapper extends BaseMapper<MonitorRule> {
List<MonitorRule> selectActiveRulesByTable(String tableName);
}
package com.bigdata.monitor.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.bigdata.monitor.entity.MonitorResult;
import org.apache.ibatis.annotations.Mapper;
@Mapper
public interface MonitorResultMapper extends BaseMapper<MonitorResult> {
}service目录
package com.bigdata.monitor.service;
import com.bigdata.monitor.entity.MonitorRule;
import com.bigdata.monitor.mapper.MonitorRuleMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
/**
* 规则缓存服务
* - 启动时加载全量规则
* - 按表分组
* - 同表内:基础监控(TYPE/ENUM/LENGTH)优先于业务监控(CUSTOM)
* - 刷新策略:每 30 分钟最多刷新一次(懒加载,非定时调度)
*/
@Slf4j
@Service
public class RuleService {
@Autowired
private MonitorRuleMapper ruleMapper;
// 缓存: tableName -> List<MonitorRule>(已排序)
private volatile Map<String, List<MonitorRule>> ruleCache = new ConcurrentHashMap<>();
// 上次刷新时间戳(毫秒)
private volatile long lastRefreshTime = 0L;
// 刷新间隔:30 分钟(单位:毫秒)
private static final long REFRESH_INTERVAL_MILLIS = 30 * 60 * 1000L;
/**
* 应用启动时初始化规则缓存
*/
@PostConstruct
public void init() {
refreshRules();
}
/**
* 强制刷新规则缓存(供手动触发或初始化使用)
*/
public synchronized void refreshRules() {
try {
List<MonitorRule> allRules = ruleMapper.selectList(
new com.baomidou.mybatisplus.core.conditions.query.QueryWrapper<MonitorRule>()
.eq("enabled", 1)
);
// 按表分组 + 排序
Map<String, List<MonitorRule>> grouped = allRules.stream()
.collect(Collectors.groupingBy(
MonitorRule::getTableName,
Collectors.collectingAndThen(
Collectors.toList(),
list -> {
// 排序: 基础类型在前,CUSTOM 在后
list.sort((r1, r2) -> {
boolean isCustom1 = "CUSTOM".equals(r1.getRuleType());
boolean isCustom2 = "CUSTOM".equals(r2.getRuleType());
if (isCustom1 && !isCustom2) return 1;
if (!isCustom1 && isCustom2) return -1;
return 0;
});
return list;
}
)
));
this.ruleCache = new ConcurrentHashMap<>(grouped);
this.lastRefreshTime = System.currentTimeMillis();
log.info("Rule cache refreshed. Tables: {}, Total rules: {}", grouped.size(), allRules.size());
} catch (Exception e) {
log.error("Failed to refresh rule cache", e);
}
}
/**
* 获取指定表的规则列表(线程安全)
* 若距离上次刷新超过 30 分钟,则自动触发刷新
*/
public List<MonitorRule> getSortedRulesByTable(String tableName) {
// 检查是否需要刷新(双重检查锁)
if (System.currentTimeMillis() - lastRefreshTime > REFRESH_INTERVAL_MILLIS) {
synchronized (this) {
if (System.currentTimeMillis() - lastRefreshTime > REFRESH_INTERVAL_MILLIS) {
refreshRules();
}
}
}
return ruleCache.getOrDefault(tableName, Collections.emptyList());
}
}package com.bigdata.monitor.service;
import com.baomidou.mybatisplus.extension.service.IService;
import com.bigdata.monitor.entity.MonitorResult;
public interface MonitorResultService extends IService<MonitorResult> {
}
service/impl目录
package com.bigdata.monitor.service.impl;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.bigdata.monitor.entity.MonitorResult;
import com.bigdata.monitor.mapper.MonitorResultMapper;
import com.bigdata.monitor.service.MonitorResultService;
import org.springframework.stereotype.Service;
@Service
public class MonitorResultServiceImpl extends ServiceImpl<MonitorResultMapper, MonitorResult> implements MonitorResultService {
}
util目录
package com.bigdata.monitor.util;
import com.bigdata.monitor.entity.MonitorRule;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* 构建综合校验 SQL(1 消息 → 1 次查询)
*/
@Slf4j
@Component
public class CombinedRuleSqlBuilder {
@Autowired
private RuleConditionConverter converter;
/**
* 构建 SQL: SELECT (expr1) AS rule_code1, (expr2) AS rule_code2 FROM ...
*/
public String buildSql(String binlogJson, List<MonitorRule> rules) {
if (rules.isEmpty()) {
throw new IllegalArgumentException("Rules list is empty");
}
// 转义单引号
String escapedJson = binlogJson.replace("'", "''");
// 构建 SELECT 子句
StringBuilder selectClause = new StringBuilder();
for (int i = 0; i < rules.size(); i++) {
MonitorRule rule = rules.get(i);
if (i > 0) selectClause.append(", ");
// 验证 ruleCode 合法性(防止 SQL 注入)
if (!rule.getRuleCode().matches("^[a-zA-Z0-9_]+$")) {
throw new IllegalArgumentException("Invalid ruleCode: " + rule.getRuleCode());
}
String dorisExpr = converter.toDorisExpression(rule);
selectClause.append("(")
.append(dorisExpr)
.append(") AS ")
.append(rule.getRuleCode());
}
return String.format(
"SELECT %s FROM (SELECT json_parse('%s') AS data) t1",
selectClause.toString(),
escapedJson
);
}
}
package com.bigdata.monitor.util;
import com.jayway.jsonpath.DocumentContext;
import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.PathNotFoundException;
import com.google.gson.JsonObject;
import com.google.gson.JsonPrimitive;
import com.google.gson.JsonNull;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
/**
* 使用 JsonPath 从 Binlog JSON 提取变更字段
*/
@Slf4j
@Component
public class JsonPathFieldExtractor {
/**
* 提取指定字段的 before/after 值
* @param binlogJson 完整 Binlog JSON
* @param outputFields 逗号分隔的字段列表
* @return JSON 字符串: {"field": {"before": "...", "after": "..."}}
*/
public String extractChangedFields(String binlogJson, String outputFields) {
if (!StringUtils.hasText(outputFields)) {
return "{}";
}
DocumentContext context = JsonPath.parse(binlogJson);
JsonObject result = new JsonObject();
for (String field : outputFields.split(",")) {
field = field.trim();
String beforePath = "$.before." + field;
String afterPath = "$.after." + field;
Object beforeVal = safeRead(context, beforePath);
Object afterVal = safeRead(context, afterPath);
JsonObject pair = new JsonObject();
pair.add("before", beforeVal == null ? JsonNull.INSTANCE : new JsonPrimitive(beforeVal.toString()));
pair.add("after", afterVal == null ? JsonNull.INSTANCE : new JsonPrimitive(afterVal.toString()));
result.add(field, pair);
}
return result.toString();
}
private Object safeRead(DocumentContext context, String path) {
try {
return context.read(path);
} catch (PathNotFoundException e) {
return null;
}
}
}
package com.bigdata.monitor.util;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
public class JsonUtils {
public static String getTable(String json) {
JsonObject obj = JsonParser.parseString(json).getAsJsonObject();
return obj.get("table").getAsString();
}
public static String getPrimaryKey(String json, String primaryField) {
if (primaryField == null || primaryField.trim().isEmpty()) {
primaryField = "id"; // 默认
}
JsonObject obj = JsonParser.parseString(json).getAsJsonObject();
JsonObject after = obj.getAsJsonObject("after");
if (after != null && after.has(primaryField)) {
return after.get(primaryField).getAsString();
}
return "unknown";
}
}
package com.bigdata.monitor.util;
import com.bigdata.monitor.entity.MonitorRule;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
* 将规则配置转换为 Doris 布尔表达式
* 基础监控仅包含:
* - TYPE: 类型校验(严格模式、宽松模式?)
* - ENUM: 码值校验
* - LENGTH: 长度校验
*/
@Slf4j
@Component
public class RuleConditionConverter {
/**
* 转换规则为 Doris 表达式
*/
public String toDorisExpression(MonitorRule rule) {
if ("CUSTOM".equals(rule.getRuleType())) {
return rule.getTriggerCondition();
}
// 基础规则:生成“通过条件”,然后外层加 NOT → 变成“违规条件”
String dorisPassExpr = buildPassCondition(rule);
return "NOT (" + dorisPassExpr + ")";
}
private String buildPassCondition(MonitorRule rule) {
String simple = rule.getSimpleCondition();
if (simple == null || !simple.contains(":")) {
throw new IllegalArgumentException("Invalid simple_condition for rule: " + rule.getRuleCode());
}
String[] parts = simple.split(":", 2);
String field = parts[0].trim();
String spec = parts[1].trim();
String jsonPath = "$.after." + field; // 默认从 after 提取
switch (rule.getRuleType()) {
case "TYPE":
return convertTypeCheck(jsonPath, spec);
// case "RANGE":
// return convertRangeCheck(jsonPath, spec);
case "ENUM":
return convertEnumCheck(jsonPath, spec);
case "LENGTH":
return convertLengthCheck(jsonPath, spec);
default:
throw new IllegalArgumentException("Unsupported rule_type: " + rule.getRuleType());
}
}
private String convertTypeCheck(String jsonPath, String typeSpec) {
// 检查是否为宽松模式(以 ? 结尾)
boolean isLenient = typeSpec.endsWith("?");
String type = isLenient ? typeSpec.substring(0, typeSpec.length() - 1) : typeSpec;
switch (type.toLowerCase()) {
case "int":
case "integer":
if (isLenient) {
// 宽松整数:接受 JSON 数字 或 符合整数格式的字符串
return String.format(
"(" +
" get_json_bigint(data, '%s') IS NOT NULL" + // 严格整数
" OR (" +
" get_json_string(data, '%s') IS NOT NULL" + // 是字符串
" AND get_json_string(data, '%s') REGEXP '^-?(0|[1-9][0-9]*)$'" + // 且符合整数正则
" )" +
")",
jsonPath, jsonPath, jsonPath
);
} else {
// 严格整数:仅接受 JSON 数字
return String.format("get_json_bigint(data, '%s') IS NOT NULL", jsonPath);
}
case "double":
case "float":
if (isLenient) {
// 宽松小数:接受 JSON 数字 或 符合小数格式的字符串
return String.format(
"(" +
" get_json_double(data, '%s') IS NOT NULL" + // 严格小数
" OR (" +
" get_json_string(data, '%s') IS NOT NULL" + // 是字符串
" AND get_json_string(data, '%s') REGEXP '^-?(0|[1-9][0-9]*)(\\\\.[0-9]+)?([eE][-+]?[0-9]+)?$'" + // 小数正则
" )" +
")",
jsonPath, jsonPath, jsonPath
);
} else {
// 严格小数:仅接受 JSON 数字
return String.format("get_json_double(data, '%s') IS NOT NULL", jsonPath);
}
case "string":
if (isLenient) {
throw new IllegalArgumentException("String type does not support lenient mode");
}
// 字符串校验:仅接受 JSON 字符串
return String.format("get_json_string(data, '%s') IS NOT NULL", jsonPath);
default:
throw new IllegalArgumentException("Unsupported type: " + type);
}
}
/**
* SELECT CAST(json_unquote(get_json_string(data, '$.size')) AS DOUBLE ) BETWEEN 1 AND 1.2
* FROM (
* SELECT json_parse('{"size": 1.2}') AS data
* ) t1;
*/
// private String convertRangeCheck(String jsonPath, String rangeSpec) {
// if (!rangeSpec.matches("^\\d+-\\d+$")) {
// throw new IllegalArgumentException("Invalid range format: " + rangeSpec);
// }
// String[] bounds = rangeSpec.split("-");
// int min = Integer.parseInt(bounds[0]);
// int max = Integer.parseInt(bounds[1]);
// return String.format(
// "CAST(get_json_bigint(data, '%s') AS INT) BETWEEN %d AND %d",
// jsonPath, min, max
// );
// }
private String convertEnumCheck(String jsonPath, String enumSpec) {
String[] values = enumSpec.split(",");
StringBuilder inClause = new StringBuilder("get_json_string(data, '");
inClause.append(jsonPath).append("') IN (");
for (int i = 0; i < values.length; i++) {
if (i > 0) inClause.append(", ");
inClause.append("'").append(values[i].trim()).append("'");
}
inClause.append(")");
return inClause.toString();
}
private String convertLengthCheck(String jsonPath, String lengthSpec) {
int length = Integer.parseInt(lengthSpec);
return String.format(
"char_length(json_unquote(json_extract(data, '%s'))) = %d",
jsonPath, length
);
}
}
package com.bigdata.monitor.util;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.bigdata.monitor.entity.MonitorRule;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;
import java.util.*;
/**
* 执行 Doris 综合校验,返回未通过的规则
*/
@Slf4j
@Service
public class RuleEvaluator {
@Autowired
private JdbcTemplate dorisJdbcTemplate;
@Autowired
private CombinedRuleSqlBuilder sqlBuilder;
/**
* 在 Doris 数据源上执行规则校验
*/
@DS("doris") // 切换到 Doris 数据源
public List<MonitorRule> evaluateRules(String binlogJson, List<MonitorRule> rules) {
if (rules.isEmpty()) return Collections.emptyList();
String sql = sqlBuilder.buildSql(binlogJson, rules);
Map<String, Object> resultMap;
try {
resultMap = dorisJdbcTemplate.queryForMap(sql);
} catch (Exception e) {
log.error("Doris evaluation failed. SQL: {}", sql, e);
return Collections.emptyList();
}
List<MonitorRule> failedRules = new ArrayList<>();
for (MonitorRule rule : rules) {
Object value = resultMap.get(rule.getRuleCode());
if (isTrue(value)) {
failedRules.add(rule);
}
}
return failedRules;
}
private boolean isTrue(Object value) {
if (value == null) return false;
// 处理 BOOLEAN 类型 (true/false)
if (value instanceof Boolean) {
return (Boolean) value;
}
// 处理 INTEGER 类型 (1/0)
if (value instanceof Number) {
return ((Number) value).intValue() == 1;
}
// 处理 STRING 类型 ("true"/"false" 或 "1"/"0") - 安全兜底
if (value instanceof String) {
String str = ((String) value).toLowerCase();
return "true".equals(str) || "1".equals(str);
}
return false;
}
}
config目录
package com.bigdata.monitor.config;
import com.google.gson.Gson;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.HashMap;
import java.util.Map;
/**
* Kafka 生产者配置(用于发送违规事件到 monitor-violation-events)
*/
@Configuration
public class KafkaProducerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// 可选:启用幂等性(防止重复)
// props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
return new DefaultKafkaProducerFactory<>(props);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public Gson gson() {
return new Gson();
}
}consumer目录
package com.bigdata.monitor.consumer;
import com.bigdata.monitor.entity.MonitorRule;
import com.bigdata.monitor.entity.MonitorResult;
import com.bigdata.monitor.service.RuleService;
import com.bigdata.monitor.util.JsonPathFieldExtractor;
import com.bigdata.monitor.util.JsonUtils;
import com.bigdata.monitor.util.RuleEvaluator;
import com.google.gson.Gson;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.UUID;
/**
* Binlog 消息消费者
* - 从 Kafka 读取 Flink CDC 的 Binlog JSON
* - 根据规则校验数据
* - 将违规结果发送到 Kafka Topic(monitor-violation-events),而非直写 Doris
*/
@Slf4j
@Component
public class BinlogConsumer {
@Autowired
private RuleService ruleService;
@Autowired
private RuleEvaluator ruleEvaluator;
@Autowired
private JsonPathFieldExtractor fieldExtractor;
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Autowired
private Gson gson; // 用于序列化 MonitorResult 为 JSON
@Value("${monitor.initial-topics}")
private String initialTopics;
@Value("${monitor.violation-topic:monitor-violation-events}")
private String violationTopic;
@KafkaListener(topics = "#{'${monitor.initial-topics}'.split(',')}", groupId = "binlog-monitor-group")
public void consume(String binlogJsonMessage, Acknowledgment ack) {
try {
String tableName = JsonUtils.getTable(binlogJsonMessage);
List<MonitorRule> rules = ruleService.getSortedRulesByTable(tableName);
if (rules.isEmpty()) {
ack.acknowledge(); // 无规则也应提交 offset
return;
}
// 1. 针对当前binlog json一次性执行所有校验规则,将不符合要求的规则返回
List<MonitorRule> failedRules = ruleEvaluator.evaluateRules(binlogJsonMessage, rules);
// 2. 将每个违规结果发送到 Kafka
for (MonitorRule rule : failedRules) {
// 根据配置获取需要输出的字段 并获取变更前后的值
String changedFields = fieldExtractor.extractChangedFields(binlogJsonMessage, rule.getOutputFields());
MonitorResult result = new MonitorResult();
result.setTableName(tableName);
result.setPrimaryKey(JsonUtils.getPrimaryKey(binlogJsonMessage, rule.getPrimaryField()));
result.setBatchId(UUID.randomUUID().toString());
result.setRuleName(rule.getRuleName());
result.setRuleCode(rule.getRuleCode());
result.setChangedFields(changedFields);
// 序列化为 JSON 并发送到违规事件 Topic
String violationJson = gson.toJson(result);
kafkaTemplate.send(violationTopic, violationJson);
log.info("Rule violated: {} ({}) for pk={}", rule.getRuleName(), rule.getRuleCode(), result.getPrimaryKey());
}
} catch (Exception e) {
log.error("Error processing message", e);
} finally {
ack.acknowledge(); // 手动提交 offset,确保 at-least-once
}
}
}主程序
package com.bigdata.monitor;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
@MapperScan("com.bigdata.monitor.mapper")
public class BinlogMonitorApplication {
public static void main(String[] args) {
SpringApplication.run(BinlogMonitorApplication.class, args);
}
}
简单测试:
package com.bigdata.monitor.test;
import com.bigdata.monitor.entity.MonitorResult;
import com.bigdata.monitor.entity.MonitorRule;
import com.bigdata.monitor.mapper.MonitorRuleMapper;
import com.bigdata.monitor.service.RuleService;
import com.bigdata.monitor.util.*;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.util.List;
import java.util.UUID;
@SpringBootTest
public class Test01 {
@Autowired
private MonitorRuleMapper monitorRuleMapper;
@Autowired
private RuleConditionConverter ruleConditionConverter;
@Autowired
private CombinedRuleSqlBuilder combinedRuleSqlBuilder;
@Autowired
private RuleEvaluator ruleEvaluator;
@Autowired
private JsonPathFieldExtractor fieldExtractor;
@Autowired
private RuleService ruleService;
@Test
public void test01() {
MonitorRule monitorRule = monitorRuleMapper.selectById(1);
System.out.println(monitorRule);
}
@Test
public void test02() {
MonitorRule rule01 = monitorRuleMapper.selectById(1);
String s1 = ruleConditionConverter.toDorisExpression(rule01);
System.out.println(s1);
}
@Test
public void test03() {
List<MonitorRule> rules = monitorRuleMapper.selectList(null);
// System.out.println(rules);
String cdcJson = "{ \"table\": \"linkman_information\", \"op\": \"u\", \"before\": { \"id\": \"1001\", \"contact_info\": \"12345678\", \"delete_flag\": 0, \"phone\": \"13812345678\" }, \"after\": { \"id\": \"1001\", \"contact_info\": \"0632-12345678\", \"delete_flag\": \"0\", \"phone\": \"1381234567\" } }";
String res = combinedRuleSqlBuilder.buildSql(cdcJson, rules);
System.out.println(res);
}
@Test
public void test04() {
// 1. 一次性执行所有规则
List<MonitorRule> rules = monitorRuleMapper.selectList(null);
String binlogJsonMessage = "{ \"table\": \"linkman_information\", \"op\": \"u\", \"before\": { \"id\": \"1001\", \"contact_info\": \"12345678\", \"delete_flag\": 0, \"phone\": \"13812345678\" }, \"after\": { \"id\": \"1001\", \"contact_info\": \"0632-12345678\", \"delete_flag\": \"0\", \"phone\": \"1381234567\" } }";
List<MonitorRule> failedRules = ruleEvaluator.evaluateRules(binlogJsonMessage, rules);
// System.out.println(failedRules);
String tableName = "linkman_information";
// 2. 保存失败结果
for (MonitorRule rule : failedRules) {
String changedFields = fieldExtractor.extractChangedFields(binlogJsonMessage, rule.getOutputFields());
MonitorResult result = new MonitorResult();
result.setTableName(tableName);
result.setPrimaryKey(JsonUtils.getPrimaryKey(binlogJsonMessage, rule.getPrimaryField()));
result.setBatchId(UUID.randomUUID().toString());
result.setRuleName(rule.getRuleName());
result.setRuleCode(rule.getRuleCode());
result.setChangedFields(changedFields);
System.out.println(result);
}
}
@Test
public void test05() {
// ruleService.refreshRules();
List<MonitorRule> rules = ruleService.getSortedRulesByTable("linkman_information");
System.out.println(rules);
}
}
说明:
## monitor_rule(规则配置表)
```sql
CREATE TABLE `monitor_rule` (
`id` bigint NOT NULL AUTO_INCREMENT,
`rule_code` varchar(64) NOT NULL COMMENT '业务规则编码,全局唯一',
`rule_type` varchar(20) NOT NULL DEFAULT 'CUSTOM' COMMENT '规则类型: TYPE/ENUM/LENGTH/CUSTOM',
`table_name` varchar(64) NOT NULL COMMENT '监控的表名',
`rule_name` varchar(128) NOT NULL COMMENT '规则名称',
`simple_condition` varchar(255) DEFAULT NULL COMMENT '简化条件(仅基础类型)',
`trigger_condition` text COMMENT 'Doris表达式(仅CUSTOM类型)',
`output_fields` varchar(255) DEFAULT NULL COMMENT '输出字段列表,逗号分隔',
`enabled` int DEFAULT '1' COMMENT '是否启用',
`primary_field` varchar(100) DEFAULT 'id' COMMENT 'primary key field',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
```
## monitor_result(监控结果表)
```sql
CREATE TABLE `datacenter`.`monitor_result` (
`id` bigint NOT NULL AUTO_INCREMENT,
`table_name` varchar(64) NOT NULL,
`primary_key` varchar(128) NOT NULL COMMENT '主键值',
`batch_id` varchar(64) NOT NULL COMMENT '批次ID',
`rule_name` varchar(128) NOT NULL COMMENT '规则名称',
`rule_code` varchar(64) NOT NULL COMMENT '规则编码',
`changed_fields` json DEFAULT NULL COMMENT '变更字段详情',
`create_time` datetime DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
```
## 校验示例
### 1. 插入规则配置
```sql
-- 基础校验:delete_flag 必须是整数
INSERT INTO monitor_rule (id, rule_code, rule_type, table_name, simple_condition, rule_name, output_fields)
VALUES (1, 'linkman_01', 'TYPE', 'linkman_information', 'delete_flag:int', 'delete_flag 类型校验', 'delete_flag');
-- 宽松模式 从字符串中提取 数字、小数类型
-- VALUES (1, 'linkman_01', 'TYPE', 'linkman_information', 'delete_flag:int', 'delete_flag 类型校验', 'delete_flag');
-- 基础校验:phone 长度=11
INSERT INTO monitor_rule (id, rule_code, rule_type, table_name, simple_condition, rule_name, output_fields)
VALUES (2, 'linkman_02', 'LENGTH', 'linkman_information', 'phone:11', '手机号长度校验', 'phone');
-- 业务校验:联系方式变更且未删除
INSERT INTO monitor_rule (id, rule_code, rule_type, table_name, trigger_condition, rule_name, output_fields)
VALUES (
3,
'contact_01',
'CUSTOM',
'linkman_information',
"get_json_string(data, '$.before.contact_info') != get_json_string(data, '$.after.contact_info')",
'联系方式变更',
'contact_info'
);
```
### 2. 测试 Binlog JSON
```sql
CREATE TABLE IF NOT EXISTS cdc_data (
id BIGINT,
data JSON COMMENT 'cdc变更记录'
) ENGINE=OLAP
unique KEY(id)
DISTRIBUTED BY HASH(id) BUCKETS 1
PROPERTIES("replication_num" = "1");
```
```json
{
"table": "linkman_information",
"op": "u",
"before": {
"id": "1001",
"contact_info": "12345678",
"delete_flag": 0,
"phone": "13812345678"
},
"after": {
"id": "1001",
"contact_info": "0632-12345678",
"delete_flag": "0",
"phone": "1381234567"
}
}
```
```sql
INSERT INTO test.cdc_data (id, data)
VALUES(1, '{ "table": "linkman_information", "op": "u", "before": { "id": "1001", "contact_info": "12345678", "delete_flag": 0, "phone": "13812345678" }, "after": { "id": "1001", "contact_info": "0632-12345678", "delete_flag": "0", "phone": "1381234567" } }');
-- json基础类型判断
-- 条件判断
SELECT if(get_json_bigint(data, '$.age') IS NOT NULL, 1, 0) as bigint_,
if(get_json_double(data, '$.score') IS NOT NULL, 1, 0) as double_,
if(get_json_string(data, '$.name') IS NOT NULL, 1, 0) as string_,
if(get_json_string(data, '$.name') = 'bob' and get_json_string(data, '$.age') > 18, 1, 0) as contact_01
FROM (
SELECT json_parse('{"name": "bob", "age": 20, "score": 99.99}') data
) t1;
```
### 程序运行 输出sql
```sql
SELECT (get_json_bigint(data, '$.after.delete_flag') IS NOT NULL) AS linkman_01,
(CHAR_LENGTH(json_unquote(json_extract(data, '$.after.phone'))) = 10) AS linkman_02,
(get_json_string(data, '$.before.contact_info') != get_json_string(data, '$.after.contact_info')) AS contact_01
FROM (SELECT json_parse('{ "table": "linkman_information", "op": "u", "before": { "id": "1001", "contact_info": "12345678", "delete_flag": 0, "phone": "13812345678" }, "after": { "id": "1001", "contact_info": "0632-12345678", "delete_flag": "0", "phone": "1381234567" } }') AS data) t1
```
- 宽松模式
```sql
SELECT ((get_json_bigint(data, '$.after.delete_flag') IS NOT NULL OR
(get_json_string(data, '$.after.delete_flag') IS NOT NULL AND
get_json_string(data, '$.after.delete_flag') REGEXP '^-?(0|[1-9][0-9]*)$'))) AS linkman_01,
(char_length(json_unquote(json_extract(data, '$.after.phone'))) = 11) AS linkman_02,
(get_json_string(data, '$.before.contact_info') != get_json_string(data, '$.after.contact_info')) AS contact_01,
(get_json_string(data, '$.after.delete_flag') IN ('0', '1')) AS linkman_03
FROM (SELECT json_parse('{ "table": "linkman_information", "op": "u", "before": { "id": "1001", "contact_info": "12345678", "delete_flag": 0, "phone": "13812345678" }, "after": { "id": "1001", "contact_info": "0632-12345678", "delete_flag": "0", "phone": "1381234567" } }') AS data) t1
```
- 严格模式
```sql
SELECT (char_length(json_unquote(json_extract(data, '$.after.phone'))) = 11) AS linkman_02,
(get_json_string(data, '$.before.contact_info') != get_json_string(data, '$.after.contact_info')) AS contact_01,
(get_json_string(data, '$.after.delete_flag') IN ('0', '1')) AS linkman_03,
(get_json_bigint(data, '$.after.delete_flag') IS NOT NULL) AS linkman_01
FROM (SELECT json_parse('{ "table": "linkman_information", "op": "u", "before": { "id": "1001", "contact_info": "12345678", "delete_flag": 0, "phone": "13812345678" }, "after": { "id": "1001", "contact_info": "0632-12345678", "delete_flag": "0", "phone": "1381234567" } }') AS data) t1
```
### 3. 预期结果
- 规则 linkman_01 触发(delete_flag 非整数)
- 规则 linkman_02 触发(phone 长度≠11)
- 规则 contact_01 触发(contact_info 变更且 delete_flag='invalid'≠'0' → 不触发!)
### kafka command
-- start server
```shell
/data/zookeeper/bin/zkServer.sh start
/data/kafka/bin/kafka-server-start.sh -daemon /data/kafka/config/server.properties
```
-- list topics
```shell
/data/kafka/bin/kafka-topics.sh --list --bootstrap-server localhost:9092
```
- create topic
```shell
/data/kafka/bin/kafka-topics.sh --create --topic binlog-monitor --bootstrap-server localhost:9092 --partitions 3
/data/kafka/bin/kafka-topics.sh --create --topic monitor-violation-events --bootstrap-server localhost:9092 --partitions 3
```
-- producer
```shell
/data/kafka/bin/kafka-console-producer.sh --topic binlog-monitor --bootstrap-server localhost:9092
```
-- consumer
```shell
/data/kafka/bin/kafka-console-consumer.sh --topic monitor-violation-events --bootstrap-server localhost:9092
```sqlite引擎解析版
package com.bigdata.monitor.util;
import lombok.extern.slf4j.Slf4j;
import org.sqlite.SQLiteConfig;
import java.sql.*;
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* 基于 SQLite in-memory 的 JSON 规则表达式求值器。
* 支持使用 JSON Path(如 $.before.field)在 SQL 条件中进行字段比较。
* 在执行前自动校验所有 JSON 路径是否存在,避免因路径错误导致规则静默失效。
*/
@Slf4j
public class SqliteJsonRuleEvaluator {
/**
* SQLite 内存数据库连接 URL
*/
private static final String SQLITE_URL = "jdbc:sqlite::memory:";
/**
* 每个线程持有一个只读的 SQLite 内存数据库连接
*/
private static final ThreadLocal<Connection> connHolder = ThreadLocal.withInitial(() -> {
try {
SQLiteConfig config = new SQLiteConfig();
config.setReadOnly(true); // 必须在创建连接前设置只读
Connection conn = config.createConnection(SQLITE_URL);
// 注册 REGEXP 函数
org.sqlite.Function.create(conn, "REGEXP", new org.sqlite.Function() {
@Override
protected void xFunc() throws SQLException {
String pattern = value_text(0);
String value = value_text(1);
if (value == null) {
result(0);
return;
}
try {
result(java.util.regex.Pattern.matches(pattern, value) ? 1 : 0);
} catch (Exception e) {
result(0);
}
}
});
return conn;
} catch (SQLException e) {
throw new IllegalStateException("Failed to initialize SQLite in-memory database connection", e);
}
});
/**
* 用于匹配标准点号风格的 JSON Path,例如:$.a.b, $.before.contact_info
* 要求:
* - 以 "$." 开头
* - 后续由字母、数字、下划线、点号组成(不包含空格、引号、方括号等)
* - 不允许连续点(..)或结尾为点
*/
private static final Pattern JSON_PATH_PATTERN = Pattern.compile("\\$\\.[\\w.]+");
/**
* 从给定的条件表达式中提取所有符合 $.xxx.yyy 格式的 JSON 路径
*
* @param expression 包含 JSON Path 的条件表达式,例如:
* "$.before.contact_info != $.after.contact_info"
* @return 去重后的路径集合,例如:{"$.before.contact_info", "$.after.contact_info"}
*/
private static Set<String> extractJsonPaths(String expression) {
if (expression == null) {
return Collections.emptySet();
}
Set<String> paths = new LinkedHashSet<>();
Matcher matcher = JSON_PATH_PATTERN.matcher(expression);
while (matcher.find()) {
String path = matcher.group();
// 过滤非法形式:不能以点结尾,不能包含连续点
if (!path.endsWith(".") && !path.contains("..")) {
paths.add(path);
}
}
return paths;
}
/**
* 校验 JSON 路径是否存在,采用快速失败策略:只要发现一个路径不存在,立即返回 false。
*
* @param jsonPayload JSON 字符串
* @param paths 要校验的 JSON 路径集合
* @return true 表示所有路径均存在;false 表示至少第一个缺失路径已记录
*/
private static boolean validateJsonPaths(String jsonPayload, Set<String> paths) {
if (paths == null || paths.isEmpty()) {
return true;
}
Connection conn = connHolder.get();
for (String path : paths) {
PreparedStatement stmt = null;
ResultSet rs = null;
try {
stmt = conn.prepareStatement("SELECT json_type(?, ?)");
stmt.setString(1, jsonPayload);
stmt.setString(2, path);
rs = stmt.executeQuery();
if (!rs.next() || rs.getString(1) == null) {
log.warn("JSON path does not exist: {}, json preview: {}",
path,
jsonPayload != null ? jsonPayload.substring(0, Math.min(100, jsonPayload.length())) : "null");
return false; // ✅ 快速失败:发现一个缺失就返回
}
} catch (SQLException e) {
log.warn("Error while validating JSON path: {}", path, e);
return false;
} finally {
// 安全关闭资源
if (rs != null) {
try { rs.close(); } catch (SQLException ignored) {}
}
if (stmt != null) {
try { stmt.close(); } catch (SQLException ignored) {}
}
}
}
return true;
}
/**
* 执行 JSON 条件表达式求值
*
* @param jsonPayload 待评估的 JSON 字符串,必须为合法 JSON
* @param jsonPathCondition SQL WHERE 子句,允许使用 "data" 作为 JSON 列名,并引用 JSON Path,
* 例如:json_extract(data, '$.before.contact_info') != json_extract(data, '$.after.contact_info')
* 或更简洁地,由调用方保证表达式中路径已转换为 json_extract 形式。
* ⚠️ 注意:本方法会自动提取 $.xxx 路径做存在性校验,但表达式本身必须是合法 SQLite 表达式。
* @return 如果条件成立返回 true,否则返回 false;执行出错或路径缺失也返回 false
*/
public static boolean evaluate(String jsonPayload, String jsonPathCondition) {
if (jsonPayload == null || jsonPathCondition == null) {
return false;
}
// 1. 从条件表达式中提取所有 $.xxx.yyy 形式的路径
Set<String> requiredPaths = extractJsonPaths(jsonPathCondition);
// 2. 校验所有路径在当前 JSON 中是否存在
if (!validateJsonPaths(jsonPayload, requiredPaths)) {
// 路径缺失已通过 warn 日志上报,此处不再执行表达式,直接返回 false
return false;
}
// 3. 构造可执行的 SQL:将 jsonPayload 作为虚拟表的 data 列
String sql = "SELECT 1 FROM (SELECT ? AS data) WHERE " + jsonPathCondition;
Connection conn = connHolder.get();
try (PreparedStatement stmt = conn.prepareStatement(sql)) {
stmt.setString(1, jsonPayload);
try (ResultSet rs = stmt.executeQuery()) {
return rs.next(); // 有结果即条件成立
}
} catch (SQLException e) {
log.warn("SQLite evaluation failed. Condition: {}, JSON preview: {}",
jsonPathCondition,
jsonPayload.length() > 100 ? jsonPayload.substring(0, 100) : jsonPayload,
e);
return false;
}
}
}package com.bigdata.monitor.util;
import com.bigdata.monitor.entity.MonitorRule;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
@Component
@RequiredArgsConstructor
public class UnifiedRuleEngine {
public boolean evaluate(String binlogJson, MonitorRule rule) {
String expr;
if ("CUSTOM".equals(rule.getRuleType())) {
// 替换 data -> ?
expr = rule.getTriggerCondition();
} else {
// 基础规则:生成“通过条件”,然后外层加 NOT → 变成“违规条件”
expr = buildPassCondition(rule);
expr = "NOT (" + expr + ")";
}
return SqliteJsonRuleEvaluator.evaluate(binlogJson, expr);
}
private String buildPassCondition(MonitorRule rule) {
String simple = rule.getSimpleCondition();
if (simple == null || !simple.contains(":")) {
throw new IllegalArgumentException("Invalid simple_condition for rule: " + rule.getRuleCode());
}
String[] parts = simple.split(":", 2);
String field = parts[0].trim();
String spec = parts[1].trim();
String jsonPath = "$.after." + field; // 默认从 after 提取
switch (rule.getRuleType()) {
case "TYPE":
return convertTypeCheck(jsonPath, spec);
case "ENUM":
return convertEnumCheck(jsonPath, spec);
case "LENGTH":
return convertLengthCheck(jsonPath, spec);
default:
throw new IllegalArgumentException("Unsupported rule_type: " + rule.getRuleType());
}
}
private String convertTypeCheck(String jsonPath, String typeSpec) {
// 检查是否为宽松模式(以 ? 结尾)
boolean isLenient = typeSpec.endsWith("?");
String type = isLenient ? typeSpec.substring(0, typeSpec.length() - 1) : typeSpec;
switch (type.toLowerCase()) {
case "int":
case "integer":
if (isLenient) {
// 宽松整数:接受 JSON 数字 或 符合整数格式的字符串
return String.format("json_type(json_extract(data, '%s')) = 'integer'", jsonPath);
} else {
// 严格整数:仅接受 JSON 数字
return String.format("json_type(data, '%s') = 'integer'", jsonPath);
}
case "double":
case "float":
if (isLenient) {
// 宽松小数:接受 JSON 数字 或 符合小数格式的字符串
return String.format("json_type(json_extract(data, '%s')) = 'real'", jsonPath);
} else {
// 严格小数:仅接受 JSON 数字
return String.format("json_type(data, '%s') = 'real'", jsonPath);
}
case "string":
if (isLenient) {
throw new IllegalArgumentException("String type does not support lenient mode");
}
// 字符串校验:仅接受 JSON 字符串
return String.format("json_type(data, '%s') = 'text'", jsonPath);
default:
throw new IllegalArgumentException("Unsupported type: " + type);
}
}
private String convertEnumCheck(String jsonPath, String enumSpec) {
String[] values = enumSpec.split(",");
StringBuilder inClause = new StringBuilder("json_extract(data, '");
inClause.append(jsonPath).append("') IN (");
for (int i = 0; i < values.length; i++) {
if (i > 0) inClause.append(", ");
inClause.append("'").append(values[i].trim()).append("'");
}
inClause.append(")");
return inClause.toString();
}
private String convertLengthCheck(String jsonPath, String lengthSpec) {
int length = Integer.parseInt(lengthSpec);
return String.format(
"length(json_extract(data, '%s')) = %d",
jsonPath, length
);
}
}
主程序
package com.bigdata.monitor.consumer;
import com.bigdata.monitor.entity.MonitorRule;
import com.bigdata.monitor.entity.MonitorResult;
import com.bigdata.monitor.service.RuleService;
import com.bigdata.monitor.util.JsonPathFieldExtractor;
import com.bigdata.monitor.util.JsonUtils;
import com.bigdata.monitor.util.UnifiedRuleEngine;
import com.google.gson.Gson;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.UUID;
/**
* Binlog 消息消费者
* - 从 Kafka 读取 Flink CDC 的 Binlog JSON(格式包含 table/op/before/after)
* - 根据缓存的监控规则,逐条校验当前消息
* - 若某条规则校验失败(即数据违规),则生成违规结果
* - 将违规结果序列化为 JSON,并发送到 Kafka 违规事件 Topic(如 monitor-violation-events)
*
* 架构说明:
* - 不再使用“批量 SQL 表达式”方式(如 CombinedRuleSqlBuilder)
* - 改为“逐条规则独立校验”模式,由 UnifiedRuleEngine 自动路由:
* • 基础规则(TYPE/ENUM/LENGTH/PATTERN) → 使用 json-schema-validator
* • 自定义规则(CUSTOM) → 使用 SQLite 内存表达式求值
*/
@Slf4j
@Component
public class BinlogConsumer {
@Autowired
private RuleService ruleService;
@Autowired
private UnifiedRuleEngine unifiedRuleEngine;
@Autowired
private JsonPathFieldExtractor fieldExtractor;
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Autowired
private Gson gson;
@Value("${monitor.initial-topics}")
private String initialTopics;
@Value("${monitor.violation-topic:monitor-violation-events}")
private String violationTopic;
/**
* 监听初始配置的 Kafka Topic,消费 Binlog JSON 消息
*
* @param binlogJsonMessage Flink CDC 输出的完整 Binlog JSON 字符串
* 示例: {"table":"user","op":"u","before":{...},"after":{...}}
* @param ack 手动 ACK 控制,确保消息处理完成后才提交 offset(at-least-once 语义)
*/
@KafkaListener(topics = "#{'${monitor.initial-topics}'.split(',')}", groupId = "binlog-monitor-group")
public void consume(String binlogJsonMessage, Acknowledgment ack) {
try {
// 1. 从 Binlog JSON 中提取表名(如 "user")
String tableName = JsonUtils.getTable(binlogJsonMessage);
if (tableName == null || tableName.isEmpty()) {
log.warn("Skip message: table name not found in JSON: {}", binlogJsonMessage);
ack.acknowledge();
return;
}
// 2. 获取该表对应的所有监控规则(已按优先级排序,基础规则在前)
List<MonitorRule> rules = ruleService.getSortedRulesByTable(tableName);
if (rules.isEmpty()) {
// 无规则则跳过,但仍需提交 offset 避免堆积
ack.acknowledge();
return;
}
// 3. 【核心逻辑】逐条校验规则
// 对每条规则,调用 UnifiedRuleEngine 进行独立校验
// 如果返回 false,表示该规则校验失败(即数据违规)
String batchId = UUID.randomUUID().toString();
for (MonitorRule rule : rules) {
try {
boolean passed = unifiedRuleEngine.evaluate(binlogJsonMessage, rule);
if (passed) {
// 4. 规则校验成功 → 生成违规结果
// 提取主键值(用于标识哪条记录违规)
String primaryKey = JsonUtils.getPrimaryKey(binlogJsonMessage, rule.getPrimaryField());
// 根据 output_fields 配置,提取 before/after 的变更字段值
String changedFields = fieldExtractor.extractChangedFields(binlogJsonMessage, rule.getOutputFields());
// 构建违规结果对象
MonitorResult result = new MonitorResult();
result.setTableName(tableName);
result.setPrimaryKey(primaryKey);
result.setBatchId(batchId); // 用于去重或追踪
result.setRuleName(rule.getRuleName());
result.setRuleCode(rule.getRuleCode());
result.setChangedFields(changedFields); // JSON 字符串,如 {"name":{"before":"A","after":"B"}}
// 序列化为 JSON 并发送到违规事件 Topic
String violationJson = gson.toJson(result);
kafkaTemplate.send(violationTopic, violationJson);
log.info("Rule violated: {} ({}) for table={}, pk={}",
rule.getRuleName(), rule.getRuleCode(), tableName, primaryKey);
}
} catch (Exception e) {
// 单条规则异常不应中断整个消息处理
log.error("Error evaluating rule: {} ({}), table: {}, JSON preview: {}",
rule.getRuleName(), rule.getRuleCode(), tableName,
binlogJsonMessage != null ? binlogJsonMessage.substring(0, Math.min(200, binlogJsonMessage.length())) : "null", e);
// 继续下一条规则
}
}
} catch (Exception e) {
// 整体消息处理异常(如 JSON 解析失败)
log.error("Fatal error processing Kafka message", e);
// 注意:即使异常,仍提交 offset(避免消息阻塞),但需监控告警
} finally {
// 手动提交 offset,确保 at-least-once 语义
// 即使部分规则失败或异常,也认为消息“已处理”
ack.acknowledge();
}
}
}测试
package com.bigdata.monitor.test;
import com.bigdata.monitor.entity.MonitorResult;
import com.bigdata.monitor.entity.MonitorRule;
import com.bigdata.monitor.service.RuleService;
import com.bigdata.monitor.util.*;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.util.List;
import java.util.UUID;
@SpringBootTest
public class Test02 {
@Autowired
private JsonSchemaRuleEvaluator jsonSchemaRuleEvaluator;
@Autowired
private RuleService ruleService;
@Autowired
private UnifiedRuleEngine unifiedRuleEngine;
@Autowired
private JsonPathFieldExtractor fieldExtractor;
@Test
public void test01() {
String cdcJson = "{ \"table\": \"linkman_information\", \"op\": \"u\", \"before\": { \"id\": \"1001\", \"contact_info\": \"12345678\", \"delete_flag\": 0, \"phone\": \"13812345678\" }, \"after\": { \"id\": \"1001\", \"contact_info\": \"0632-12345678\", \"delete_flag\": \"0\", \"phone\": \"13812345678\" } }";
boolean evaluate = jsonSchemaRuleEvaluator.evaluate(cdcJson, "id:1000-1002");
System.out.println(evaluate);
}
@Test
public void test02() {
String cdcJson = "{ \"table\": \"linkman_information\", \"op\": \"u\", \"before\": { \"id\": \"1001\", \"contact_info\": \"1836668860\", \"delete_flag\": 0, \"phone\": \"13812345678\" }, \"after\": { \"id\": \"1001\", \"contact_info\": \"0632-12345678\", \"delete_flag\": \"0\", \"phone\": \"13812345678\" } }";
// boolean evaluate = SqliteJsonRuleEvaluator.evaluate(cdcJson, "json_extract(data, '$.before.contact_info') REGEXP '^1[3-9]\\d{9}$'");
// 宽松模式
// boolean evaluate = SqliteJsonRuleEvaluator.evaluate(cdcJson, "json_type(json_extract(data, '$.after.delete_flag')) = 'integer'");
// 严格模式
boolean evaluate = SqliteJsonRuleEvaluator.evaluate(cdcJson, "json_type(data, '$.after.delete_flag') = 'integer'");
System.out.println(evaluate);
}
@Test
public void test03() {
String binlogJsonMessage = "{ \"table\": \"linkman_information\", \"op\": \"u\", \"before\": { \"id\": \"1001\", \"contact_info\": \"12345678\", \"delete_flag\": 0, \"phone\": \"13812345678\" }, \"after\": { \"id\": \"1001\", \"contact_info\": \"0632-12345678\", \"delete_flag\": \"0\", \"phone\": \"1381234567\" } }";
String tableName = JsonUtils.getTable(binlogJsonMessage);
List<MonitorRule> rules = ruleService.getSortedRulesByTable(tableName);
for (MonitorRule rule : rules) {
try {
boolean passed = unifiedRuleEngine.evaluate(binlogJsonMessage, rule);
if (passed) {
// 4. 规则校验成功 → 生成违规结果
// 提取主键值(用于标识哪条记录违规)
String primaryKey = JsonUtils.getPrimaryKey(binlogJsonMessage, rule.getPrimaryField());
// 根据 output_fields 配置,提取 before/after 的变更字段值
String changedFields = fieldExtractor.extractChangedFields(binlogJsonMessage, rule.getOutputFields());
// 构建违规结果对象
MonitorResult result = new MonitorResult();
result.setTableName(tableName);
result.setPrimaryKey(primaryKey);
result.setBatchId(UUID.randomUUID().toString()); // 用于去重或追踪
result.setRuleName(rule.getRuleName());
result.setRuleCode(rule.getRuleCode());
result.setChangedFields(changedFields); // JSON 字符串,如 {"name":{"before":"A","after":"B"}}
System.out.println(result);
}
} catch (Exception e) {
// 单条规则异常不应中断整个消息处理
System.out.println("Error evaluating rule: " + rule.getRuleName());
// 继续下一条规则
}
}
}
}