文章
python 实现任务依赖排序
该示例主要是为了解决数仓任务调度中目标表及依赖表在dolphinscheduler中的任务顺序编排问题,方便顺序执行。
方法一:
# coding: utf-8
def sort_tasks(task_refs: dict):
"""
task_refs 格式:
{
dwd_1: (ods_1, ods_2),
dwd_2: (ods_3, ods_4),
}
"""
# 目标表
target_table_list = task_refs.keys()
# 依赖表
source_tables_list = task_refs.values()
# 用于追加 最开始的没有依赖的任务
# 也就是ods任务也参与排序(当然也可以不添加 ods定时执行到点再执行其他层)
new_task_refs = dict(zip(target_table_list, source_tables_list))
result_list = []
for source_tables in source_tables_list:
if type(source_tables) is tuple:
for source_table in source_tables:
if source_table not in new_task_refs:
# 如果来源表 在任务依赖中找不到相关 信息 表示 当前表没有来源
new_task_refs[source_table] = ""
print(new_task_refs)
return clear_null_ref(list(new_task_refs.keys()), list(new_task_refs.values()), result_list)
def clear_null_ref(target_table_list, source_tables_list, result_list):
"""
逐个处理 "" 移出没有来源依赖的表 也就是优先执行的表
"""
if "" in source_tables_list: # 查看依赖表中 没有依赖是 "" 的
null_index = source_tables_list.index("") # 找到依赖是 "" 的索引位置
source_tables_list.pop(null_index) # 移出这个 "" 空依赖 source_tables_list 元素一直再减少
target_table = target_table_list.pop(null_index) # 找到 没有来源依赖 "" 对应的是哪个表 并移出
result_list.append(target_table) # 将这个表添加到 结果集中 优先执行
# 上面找到的表已优先执行 后面 依赖关系中 存在该表的 需要删除该 依赖表
# 将 target_table 从 source_tables_list 中移出
# 处理后 可能出现 source_tables 变为 ""
source_tables_list = clear_task_from_ref(target_table, source_tables_list)
# 继续查找没有来源依赖的表
return clear_null_ref(target_table_list, source_tables_list, result_list) # 注意这里要return结果
elif len(source_tables_list) == 0:
return result_list # 依赖表无元素的时候就是 所有 target_table排序结束的时候
else:
print("存在循环依赖: ", set(source_tables_list))
return
def clear_task_from_ref(target_table, source_tables_list):
"""
将 target_table 从 source_tables_list 中移出
"""
# 遍历 source_tables 列表
for source_tables in source_tables_list:
# 判断 需要移出的 target_table 是否在当前 依赖列表中
if type(source_tables) is tuple and target_table in source_tables:
# 如果存在 target_table 那么需要修改source_tables 移出target_table 所以需要转换成list类型
source_list = list(source_tables)
target_table_index = source_list.index(target_table) # 找到target_table索引位置
source_list.pop(target_table_index) # 移除 target_table 成为新的source_list
# 找到原来的 source_tables 在 source_tables_list 的位置 使用去掉target_table的新的source_list替换
source_tables_index = source_tables_list.index(source_tables)
if len(source_list) == 0: # 如果 移除 target_table后 没有其他依赖了 那么就设置 当前依赖来源为 ""
source_tables_list[source_tables_index] = "" # source_tables变为 "" 空依赖 也就可以优先执行对应的target_table了
else:
source_tables_list[source_tables_index] = tuple(source_list) # 使用去掉target_table的新的source_list替换
return source_tables_list
if __name__ == '__main__':
# 通过字典结构来表示依赖关系
data = {
"dws_1": ("dwd_1", "dwd_2"),
"dws_2": ("ods_4", "dwd_3"),
"dws_3": ("dwd_4",),
"dwd_1": ("ods_1", "ods_2"),
"dwd_2": ("ods_3", "ods_4"),
"dwd_3": ("ods_5",),
"dwd_4": ("ods_6", "dwd_3"),
}
res = sort_tasks(data)
print(res)
方法二:
# coding: utf-8
class Task:
def __init__(self, name):
self.name = name
self.dependencies = []
def __str__(self):
return self.name
class DAG:
def __init__(self):
self.tasks = {} # 存储所有任务
def add_task(self, task_name):
task = Task(task_name)
self.tasks[task_name] = task
def add_dependency(self, task_name, dependency_name):
if task_name in self.tasks and dependency_name in self.tasks:
self.tasks[task_name].dependencies.append(self.tasks[dependency_name]) # 添加依赖关系
def topological_sort(self):
visited = set() # 记录已访问的任务
output = [] # 存储拓扑排序的结果
def dfs(task):
# 先判断之前是否已经执行过 已经执行过不能再执行
if task.name in visited:
return
visited.add(task.name) # 标记为已访问
for dependency in task.dependencies:
# 从顶部节点开始往下遍历依赖节点
# 依赖节点 再遍历 依赖的依赖
# 当最后没有依赖的时候退出for循环 之后执行后面的output操作
dfs(dependency) # 遍历依赖任务
# 从最底层没有依赖的节点开始 退出for循环后 将任务名添加到output
# 也就是先添加最底层的节点 依次向上添加到output
output.append(task.name) # 将当前任务添加到结果列表
for task in self.tasks.values():
dfs(task) # 对所有任务进行dfs
return output
def execute_tasks(self):
sorted_tasks = self.topological_sort() # 获取拓扑排序结果
print(sorted_tasks)
for task_name in sorted_tasks:
print(f"Executing task: {task_name}") # 执行任务
if __name__ == '__main__':
tasks = ["dws_1", "ods_1", "ods_2", "ods_3", "ods_4", "ods_5", "dwd_1", "dwd_2", "dwd_3", "dwd_4", "dws_2",
"dws_3", ]
# 定义任务及其依赖关系
task_ref = {
"dwd_1": ("ods_1", "ods_2"),
"dwd_2": ("ods_3", "ods_4"),
"dwd_3": ("ods_5", "dws_1"),
"dwd_4": ("ods_5", "dwd_3"),
"dws_1": ("dwd_1", "dwd_2"),
"dws_2": ("ods_4", "dwd_3"),
"dws_3": ("dwd_4",),
}
# 生成依赖 关系列表 [(child, parent), (child2, parent)]
dps = []
for parent, child_list in task_ref.items():
for child in child_list:
dps.append((child, parent))
# 创建dag对象
dag = DAG()
# 添加任务
for task in tasks:
dag.add_task(task)
# 添加依赖关系
for dp in dps:
dag.add_dependency(dp[1], dp[0])
dag.execute_tasks()