大数据

SQL解析工具sqllineage使用教程

python第三方包,该工具主要用于dml语句中表级血缘关系,字段级血缘关系的解析

from sqllineage.core.metadata.dummy import DummyMetaDataProvider
from sqllineage.runner import LineageRunner

metadata = {
    "realtime_ods.user_info_sd": ["id",  "position_id", "sd_name",],
    "realtime_ods.user_info_ah": ["id", "ah_name", "position_id"],
    "realtime_ods.position_info": ["id", "name"],
    "realtime_ods.position_info2": ["id", "name"],
    "realtime_dwd.user_dim": ["id", "name", "position_name", "status", "update_time"]
}

provider = DummyMetaDataProvider(metadata)

sql = """
WITH position_tmp as ( 
    SELECT *
    FROM realtime_ods.position_info
)
INSERT OVERWRITE TABLE realtime_dwd.user_dim
SELECT t1.id,
       t1.name,
       t2.name as position_name,
       '1' as status,
       current_timestamp() as update_time
FROM (
    SELECT *
    FROM (
        SELECT id, sd_name as name, position_id
        from realtime_ods.user_info_sd
        UNION ALL
        SELECT id, ah_name as name, position_id
        from realtime_ods.user_info_ah
    ) a1
) t1 LEFT JOIN (
    SELECT id, concat(name, id) as name
    FROM position_tmp a1
    WHERE a1.id <= 100
) t2 ON t1.position_id = t2.id
INNER JOIN (
    SELECT a1.id
    FROM (
        SELECT id
        FROM realtime_ods.position_info2
    ) a1 GROUP BY a1.id
) t3 ON t1.id = t3.id
;
"""
result = LineageRunner(sql, metadata_provider=provider, verbose=True, dialect="hive")
result.print_column_lineage()

print("-" * 50)

result.print_table_lineage()

print(result.target_tables)
print(result.source_tables)

打印内容如下:

realtime_dwd.user_dim.id <- t1.id <- a1.id <- realtime_ods.user_info_ah.id
realtime_dwd.user_dim.id <- t1.id <- a1.id <- realtime_ods.user_info_sd.id
realtime_dwd.user_dim.name <- t1.name <- a1.name <- realtime_ods.user_info_ah.ah_name
realtime_dwd.user_dim.name <- t1.name <- a1.name <- realtime_ods.user_info_sd.sd_name
realtime_dwd.user_dim.position_name <- t2.name <- position_tmp.id <- realtime_ods.position_info.id
realtime_dwd.user_dim.position_name <- t2.name <- position_tmp.name <- realtime_ods.position_info.name
realtime_dwd.user_dim.status
realtime_dwd.user_dim.update_time
--------------------------------------------------
Statement #1: WITH position_tmp as (    SELECT *    FROM realtim...
    table read: [Table: realtime_ods.position_info, Table: realtime_ods.position_info2, Table: realtime_ods.user_info_ah, Table: realtime_ods.user_info_sd]
    table write: [Table: realtime_dwd.user_dim]
    table cte: [SubQuery: position_tmp]
    table drop: []
    table rename: []
==========
Summary:
Statements(#): 1
Source Tables:
    realtime_ods.position_info
    realtime_ods.position_info2
    realtime_ods.user_info_ah
    realtime_ods.user_info_sd
Target Tables:
    realtime_dwd.user_dim

[Table: realtime_dwd.user_dim]
[Table: realtime_ods.position_info, Table: realtime_ods.position_info2, Table: realtime_ods.user_info_ah, Table: realtime_ods.user_info_sd]

连接数据库操作

from sqllineage.config import SQLLineageConfig
from sqllineage.core.metadata.sqlalchemy import SQLAlchemyMetaDataProvider
from sqllineage.runner import LineageRunner

sql = """
INSERT INTO user_dim
SELECT t1.id,
       t1.name,
       t2.name as position_name
FROM (
    SELECT id, name, position_id
    from user_info
) t1 LEFT JOIN (
    SELECT id, concat(name, position_id) as name
    FROM (
        SELECT *
        FROM position_info
    ) a1
    WHERE a1.id < 100
) t2 ON t1.position_id = t2.id
"""

with SQLLineageConfig(DEFAULT_SCHEMA="main"):
    url = r"sqlite:///db.sqlite3"
    provider = SQLAlchemyMetaDataProvider(url)
    result = LineageRunner(sql, metadata_provider=provider)
    result.print_column_lineage()