文章
SQL解析工具sqllineage使用教程
python第三方包,该工具主要用于dml语句中表级血缘关系,字段级血缘关系的解析
from sqllineage.core.metadata.dummy import DummyMetaDataProvider
from sqllineage.runner import LineageRunner
metadata = {
"realtime_ods.user_info_sd": ["id", "position_id", "sd_name",],
"realtime_ods.user_info_ah": ["id", "ah_name", "position_id"],
"realtime_ods.position_info": ["id", "name"],
"realtime_ods.position_info2": ["id", "name"],
"realtime_dwd.user_dim": ["id", "name", "position_name", "status", "update_time"]
}
provider = DummyMetaDataProvider(metadata)
sql = """
WITH position_tmp as (
SELECT *
FROM realtime_ods.position_info
)
INSERT OVERWRITE TABLE realtime_dwd.user_dim
SELECT t1.id,
t1.name,
t2.name as position_name,
'1' as status,
current_timestamp() as update_time
FROM (
SELECT *
FROM (
SELECT id, sd_name as name, position_id
from realtime_ods.user_info_sd
UNION ALL
SELECT id, ah_name as name, position_id
from realtime_ods.user_info_ah
) a1
) t1 LEFT JOIN (
SELECT id, concat(name, id) as name
FROM position_tmp a1
WHERE a1.id <= 100
) t2 ON t1.position_id = t2.id
INNER JOIN (
SELECT a1.id
FROM (
SELECT id
FROM realtime_ods.position_info2
) a1 GROUP BY a1.id
) t3 ON t1.id = t3.id
;
"""
result = LineageRunner(sql, metadata_provider=provider, verbose=True, dialect="hive")
result.print_column_lineage()
print("-" * 50)
result.print_table_lineage()
print(result.target_tables)
print(result.source_tables)
打印内容如下:
realtime_dwd.user_dim.id <- t1.id <- a1.id <- realtime_ods.user_info_ah.id
realtime_dwd.user_dim.id <- t1.id <- a1.id <- realtime_ods.user_info_sd.id
realtime_dwd.user_dim.name <- t1.name <- a1.name <- realtime_ods.user_info_ah.ah_name
realtime_dwd.user_dim.name <- t1.name <- a1.name <- realtime_ods.user_info_sd.sd_name
realtime_dwd.user_dim.position_name <- t2.name <- position_tmp.id <- realtime_ods.position_info.id
realtime_dwd.user_dim.position_name <- t2.name <- position_tmp.name <- realtime_ods.position_info.name
realtime_dwd.user_dim.status
realtime_dwd.user_dim.update_time
--------------------------------------------------
Statement #1: WITH position_tmp as ( SELECT * FROM realtim...
table read: [Table: realtime_ods.position_info, Table: realtime_ods.position_info2, Table: realtime_ods.user_info_ah, Table: realtime_ods.user_info_sd]
table write: [Table: realtime_dwd.user_dim]
table cte: [SubQuery: position_tmp]
table drop: []
table rename: []
==========
Summary:
Statements(#): 1
Source Tables:
realtime_ods.position_info
realtime_ods.position_info2
realtime_ods.user_info_ah
realtime_ods.user_info_sd
Target Tables:
realtime_dwd.user_dim
[Table: realtime_dwd.user_dim]
[Table: realtime_ods.position_info, Table: realtime_ods.position_info2, Table: realtime_ods.user_info_ah, Table: realtime_ods.user_info_sd]
连接数据库操作
from sqllineage.config import SQLLineageConfig
from sqllineage.core.metadata.sqlalchemy import SQLAlchemyMetaDataProvider
from sqllineage.runner import LineageRunner
sql = """
INSERT INTO user_dim
SELECT t1.id,
t1.name,
t2.name as position_name
FROM (
SELECT id, name, position_id
from user_info
) t1 LEFT JOIN (
SELECT id, concat(name, position_id) as name
FROM (
SELECT *
FROM position_info
) a1
WHERE a1.id < 100
) t2 ON t1.position_id = t2.id
"""
with SQLLineageConfig(DEFAULT_SCHEMA="main"):
url = r"sqlite:///db.sqlite3"
provider = SQLAlchemyMetaDataProvider(url)
result = LineageRunner(sql, metadata_provider=provider)
result.print_column_lineage()