大数据

python 实现任务依赖排序

该示例主要是为了解决数仓任务调度中目标表及依赖表在dolphinscheduler中的任务顺序编排问题,方便顺序执行。

方法一:

# coding: utf-8
def sort_tasks(task_refs: dict):
    """
    task_refs 格式:
     {
        dwd_1: (ods_1, ods_2),
        dwd_2: (ods_3, ods_4),
    }
    """
    # 目标表
    target_table_list = task_refs.keys()
    # 依赖表
    source_tables_list = task_refs.values()
    # 用于追加 最开始的没有依赖的任务
    # 也就是ods任务也参与排序(当然也可以不添加 ods定时执行到点再执行其他层)
    new_task_refs = dict(zip(target_table_list, source_tables_list))
    result_list = []
    for source_tables in source_tables_list:
        if type(source_tables) is tuple:
            for source_table in source_tables:
                if source_table not in new_task_refs:
                    # 如果来源表 在任务依赖中找不到相关 信息 表示 当前表没有来源
                    new_task_refs[source_table] = ""
    print(new_task_refs)
    return clear_null_ref(list(new_task_refs.keys()), list(new_task_refs.values()), result_list)


def clear_null_ref(target_table_list, source_tables_list, result_list):
    """
    逐个处理 "" 移出没有来源依赖的表 也就是优先执行的表
    """
    if "" in source_tables_list:  # 查看依赖表中 没有依赖是 "" 的
        null_index = source_tables_list.index("")  # 找到依赖是 "" 的索引位置
        source_tables_list.pop(null_index)  # 移出这个 "" 空依赖 source_tables_list 元素一直再减少
        target_table = target_table_list.pop(null_index)  # 找到 没有来源依赖 "" 对应的是哪个表 并移出
        result_list.append(target_table)  # 将这个表添加到 结果集中 优先执行
        # 上面找到的表已优先执行 后面 依赖关系中 存在该表的 需要删除该 依赖表
        # 将 target_table 从 source_tables_list 中移出
        # 处理后 可能出现 source_tables 变为 ""
        source_tables_list = clear_task_from_ref(target_table, source_tables_list)
        # 继续查找没有来源依赖的表
        return clear_null_ref(target_table_list, source_tables_list, result_list)  # 注意这里要return结果
    elif len(source_tables_list) == 0:
        return result_list  # 依赖表无元素的时候就是 所有 target_table排序结束的时候
    else:
        print("存在循环依赖: ", set(source_tables_list))
        return


def clear_task_from_ref(target_table, source_tables_list):
    """
    将 target_table 从 source_tables_list 中移出
    """
    # 遍历 source_tables 列表
    for source_tables in source_tables_list:
        # 判断 需要移出的 target_table 是否在当前 依赖列表中
        if type(source_tables) is tuple and target_table in source_tables:
            # 如果存在 target_table 那么需要修改source_tables 移出target_table 所以需要转换成list类型
            source_list = list(source_tables)
            target_table_index = source_list.index(target_table)  # 找到target_table索引位置
            source_list.pop(target_table_index)  # 移除 target_table 成为新的source_list
            # 找到原来的 source_tables 在 source_tables_list 的位置 使用去掉target_table的新的source_list替换
            source_tables_index = source_tables_list.index(source_tables)
            if len(source_list) == 0:  # 如果 移除 target_table后 没有其他依赖了 那么就设置 当前依赖来源为 ""
                source_tables_list[source_tables_index] = ""  # source_tables变为 "" 空依赖 也就可以优先执行对应的target_table了
            else:
                source_tables_list[source_tables_index] = tuple(source_list)  # 使用去掉target_table的新的source_list替换
    return source_tables_list


if __name__ == '__main__':
    # 通过字典结构来表示依赖关系
    data = {
        "dws_1": ("dwd_1", "dwd_2"),
        "dws_2": ("ods_4", "dwd_3"),
        "dws_3": ("dwd_4",),
        "dwd_1": ("ods_1", "ods_2"),
        "dwd_2": ("ods_3", "ods_4"),
        "dwd_3": ("ods_5",),
        "dwd_4": ("ods_6", "dwd_3"),
    }
    res = sort_tasks(data)
    print(res)

方法二:

# coding: utf-8
class Task:
    def __init__(self, name):
        self.name = name
        self.dependencies = []

    def __str__(self):
        return self.name


class DAG:
    def __init__(self):
        self.tasks = {}  # 存储所有任务

    def add_task(self, task_name):
        task = Task(task_name)
        self.tasks[task_name] = task

    def add_dependency(self, task_name, dependency_name):
        if task_name in self.tasks and dependency_name in self.tasks:
            self.tasks[task_name].dependencies.append(self.tasks[dependency_name])  # 添加依赖关系

    def topological_sort(self):
        visited = set()  # 记录已访问的任务
        output = []  # 存储拓扑排序的结果

        def dfs(task):
            # 先判断之前是否已经执行过 已经执行过不能再执行
            if task.name in visited:
                return
            visited.add(task.name)  # 标记为已访问
            for dependency in task.dependencies:
                # 从顶部节点开始往下遍历依赖节点
                # 依赖节点 再遍历 依赖的依赖
                # 当最后没有依赖的时候退出for循环 之后执行后面的output操作
                dfs(dependency)  # 遍历依赖任务
            # 从最底层没有依赖的节点开始 退出for循环后 将任务名添加到output
            # 也就是先添加最底层的节点 依次向上添加到output
            output.append(task.name)  # 将当前任务添加到结果列表

        for task in self.tasks.values():
            dfs(task)  # 对所有任务进行dfs
        return output

    def execute_tasks(self):
        sorted_tasks = self.topological_sort()  # 获取拓扑排序结果
        print(sorted_tasks)
        for task_name in sorted_tasks:
            print(f"Executing task: {task_name}")  # 执行任务


if __name__ == '__main__':
    tasks = ["dws_1", "ods_1", "ods_2", "ods_3", "ods_4", "ods_5", "dwd_1", "dwd_2", "dwd_3", "dwd_4", "dws_2",
             "dws_3", ]
    # 定义任务及其依赖关系
    task_ref = {
        "dwd_1": ("ods_1", "ods_2"),
        "dwd_2": ("ods_3", "ods_4"),
        "dwd_3": ("ods_5", "dws_1"),
        "dwd_4": ("ods_5", "dwd_3"),
        "dws_1": ("dwd_1", "dwd_2"),
        "dws_2": ("ods_4", "dwd_3"),
        "dws_3": ("dwd_4",),
    }
    # 生成依赖 关系列表 [(child, parent), (child2, parent)]
    dps = []
    for parent, child_list in task_ref.items():
        for child in child_list:
            dps.append((child, parent))
    # 创建dag对象
    dag = DAG()
    # 添加任务
    for task in tasks:
        dag.add_task(task)
    # 添加依赖关系
    for dp in dps:
        dag.add_dependency(dp[1], dp[0])
    dag.execute_tasks()