大数据

数据质量监控

数据质量实时监控方案

1. 整体架构

2. 技术指标与校验规则

技术指标校验规则校验方法实时实现示例
完整性关键字段非空率≥99%统计字段空值占比,触发阈值告警Flink SQL:COUNT(CASE WHEN field IS NULL THEN 1 END) / COUNT(*)
准确性数值字段在合理范围(如年龄≤150)范围校验、正则表达式匹配规则引擎(如Apache Griffin):WHERE age > 150 THEN 'ERROR'
一致性跨系统同一主键数据差异率<0.1%实时对比源系统和目标系统的关键字段(如订单金额)Flink双流Join:source_stream JOIN target_stream ON key WHERE amount_diff > 0
唯一性主键重复数=0实时去重计数(如基于主键的分布式计数器)Redis HyperLogLog或Flink DISTINCT(key).COUNT()
及时性数据从产生到入库延迟<10秒打时间戳并计算端到端延迟在数据中注入event_time,Flink计算PROCESS_TIME - event_time
有效性枚举值符合预定义(如性别∈['男','女'])实时校验字典表流式Join维度表:JOIN dim_gender ON raw_data.gender = dim_gender.code

技术实现工具

  • 流处理框架:Apache Flink、Spark Streaming
  • 质量规则引擎:Great Expectations、Apache Griffin
  • 实时存储:Redis(计数器)、Kafka(告警事件)

3. 业务指标与校验规则

业务指标校验规则校验方法实时实现示例
业务规则合规性贷款申请金额≤客户信用额度×2实时调用风控模型或信用额度服务Flink异步IO:调用风控API校验IF loan_amount > credit_limit*2 THEN REJECT
KPI可信度实时GMV波动率<±5%(同比昨日同时段)滑动窗口统计GMV并与历史数据对比Flink Window Aggregation:SUM(amount) OVER 1h + 同比计算
数据关联性订单状态为“已支付”时,支付流水必存在实时关联订单流和支付流,检查支付流水缺失的订单Flink Interval Join:orders LEFT JOIN payments WITHIN 5m ON order_id
业务时效性风控审批响应时间<1秒监控从请求到结果的端到端时间在审批日志中打时间戳,计算end_time - start_time
风险性指标同一IP高频注册(>5次/分钟)滑动窗口统计IP注册频次Flink Keyed State + COUNT_STATE(ip) + 阈值告警

业务规则实现工具

  • 复杂规则:Drools规则引擎、Flink CEP(复杂事件处理)
  • 外部服务调用:异步IO(如风控API)
  • 实时数仓:Kafka + Flink + HBase(关联查询)

4. 告警与修复机制

告警分级

  • P0(阻塞):数据中断、核心KPI失效(如GMV计算异常) → 短信/电话告警
  • P1(严重):业务规则违规(如超额贷款) → 企业微信/钉钉告警
  • P2(警告):技术指标异常(如字段空值率超阈值) → 邮件告警

修复流程

  1. 自动修复
    • 丢弃无效数据(如重复记录)
    • 补发延迟数据(通过死信队列重试)
  2. 人工干预
    • 数据溯源修复(如从备份系统补录)
    • 业务规则调整(如临时放宽风控阈值)

5. 可视化与报告

  • 实时看板:Grafana展示核心指标(如数据延迟、错误率)
  • 质量报告:每日/每周汇总数据质量评分(按技术+业务维度)
  • 根因分析:关联错误日志与数据血缘(如Apache Atlas)

方案优势

  1. 实时性:从T+1提升到秒级监控,快速拦截问题数据。
  2. 灵活性:支持动态加载规则(如通过配置中心更新业务阈值)。
  3. 可扩展:模块化设计,可新增技术/业务指标。

适用场景:电商实时订单、金融风控、IoT设备数据等对时效性要求高的领域。

如果需要更具体的某环节实现代码(如Flink规则逻辑),可进一步细化!

离线数仓数据质量监控方案

1. 整体架构

2. 技术指标与校验规则

技术指标校验规则校验方法实现示例
完整性关键字段空值率<5%统计空值占比,分区级校验Hive SQL:
SELECT COUNT(CASE WHEN field IS NULL THEN 1 END)/COUNT(*) FROM table WHERE dt='${date}'
准确性数值字段在合理范围(如年龄≤150)范围校验、异常值检测Spark代码:
df.filter("age > 150").count() → 触发告警
一致性跨表同一主键数据差异数=0对比两张表的关联字段(如订单ID对应的金额)Hive SQL:
SELECT a.order_id FROM table_a a LEFT JOIN table_b b ON a.order_id=b.order_id WHERE a.amount != b.amount
唯一性主键重复数=0分组计数检查Hive SQL:
SELECT user_id, COUNT(*) FROM user_table GROUP BY user_id HAVING COUNT(*) > 1
及时性每日分区数据按时生成(如9:00前)检查分区是否存在及数据量是否正常Shell脚本:
hdfs dfs -ls /data/warehouse/table/dt=${date} | wc -l
有效性枚举值符合字典表(如省份∈省份列表)关联字典表校验Spark SQL:
SELECT invalid.* FROM raw_data invalid LEFT JOIN dim_province dim ON invalid.province=dim.code WHERE dim.code IS NULL

技术实现工具

  • SQL校验:Hive、Spark SQL
  • 调度系统:Airflow(集成质量检查任务)
  • 规则引擎:Griffin、Deequ(AWS开源工具)

3. 业务指标与校验规则

业务指标校验规则校验方法实现示例
业务规则合规性订单折扣率≤30%(防刷单)计算折扣订单占比Hive SQL:
SELECT COUNT(CASE WHEN discount_amount/order_amount > 0.3 THEN 1 END)/COUNT(*) FROM orders WHERE dt='${date}'
KPI可信度日活用户数波动<±10%(同比上周同日)对比历史同期数据Spark代码:
val today_dau = spark.sql("SELECT COUNT(DISTINCT user_id) FROM log WHERE dt='${date}'")
val last_week_dau = ...
IF (abs(today_dau - last_week_dau)/last_week_dau > 0.1) THEN ALERT
数据关联性用户购买记录必须对应会员表中的用户左关联验证用户是否存在Hive SQL:
SELECT a.user_id FROM orders a LEFT JOIN members b ON a.user_id=b.user_id WHERE b.user_id IS NULL AND a.dt='${date}'
数据价值密度日志表中无效请求占比<20%过滤无效状态码(如404)并计算比例Spark SQL:
SELECT COUNT(CASE WHEN status_code='404' THEN 1 END)/COUNT(*) FROM log WHERE dt='${date}'
风险性指标同一设备号当日登录次数>50(防爬虫)分组统计设备登录频次Hive SQL:
SELECT device_id, COUNT(*) FROM login_log WHERE dt='${date}' GROUP BY device_id HAVING COUNT(*) > 50

业务规则实现工具

  • 复杂逻辑:Spark代码(Scala/Python)
  • 调度集成:Airflow的PythonOperator调用业务校验脚本
  • 外部依赖:通过JDBC查询业务数据库比对

4. 告警与修复机制

告警分级

  • P0(阻塞):核心表数据缺失、主键重复 → 电话/企业微信告警,阻塞下游任务
  • P1(严重):业务KPI异常(如GMV下跌超阈值) → 邮件+钉钉告警
  • P2(提示):字段空值率超预期 → 日志记录,次日晨会同步

修复流程

  1. 自动修复
    • 重跑失败分区(通过Airflow回调机制)
    • 数据回滚(从备份表恢复问题分区)
  2. 人工干预
    • 修正源数据(协调业务系统补录)
    • 调整规则阈值(如放宽折扣率至35%)

5. 监控与可视化

质量报告

  • 日报
## 数据质量日报(${date})
- 技术指标合格率:98.5% ✅
  - 异常表:user_info(空值率8%,超阈值5%)  
- 业务指标异常:
  - 折扣订单占比35%(阈值30%),疑似刷单 ⚠️
  • 可视化看板
    • Superset/Grafana展示趋势(如空值率、KPI波动)
    • 血缘图谱标记问题表影响范围(如Apache Atlas)

根因分析

  • 通过Hive元数据+调度日志定位:
-- 检查异常数据来源
SELECT * FROM source_table WHERE dt='${date}' AND field IS NULL;

方案优势

  1. 低成本:基于SQL和开源工具,无需实时计算资源。
  2. 强适配:与离线调度系统(如Airflow)深度集成,天然契合T+1场景。
  3. 可追溯:分区级校验结果留存,便于回溯问题。

典型执行流程

  1. 每日8:00 ETL任务完成 → 触发质量检查作业
  2. 9:00生成质量报告 → 异常告警发送
  3. 10:00前人工确认/修复 → 下游任务继续执行

如果需要具体代码模板(如Airflow DAG配置或Spark校验脚本),可进一步补充!