大数据

java操作dolphinscheduler

TaskNodeDto

import lombok.Data;

import java.util.ArrayList;
import java.util.List;

/**
 * 任务节点 DTO,对应 DolphinScheduler 中的一个任务
 */
@Data
public class TaskNodeDto {
    private String name;                // 任务名称
    private String sqlScript;           // SQL 脚本
    private Long datasourceId;          // 数据源 ID(DolphinScheduler 中已注册的)
    private List<String> params;        // 参数列表,如 ["p_dt", "p1d"]
    private Integer x = 100;            // UI X 坐标
    private Integer y = 100;            // UI Y 坐标
    private Integer order = 0;  // 任务排序

    public TaskNodeDto() {
        this.params = new ArrayList<>();
    }
}

DolphinSchedulerService

import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import com.datacenter.ds.model.TaskNodeDto;
import org.apache.hc.client5.http.classic.methods.*;
import org.apache.hc.client5.http.entity.UrlEncodedFormEntity;
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse;
import org.apache.hc.client5.http.impl.classic.HttpClients;
import org.apache.hc.core5.http.*;
import org.apache.hc.core5.http.io.entity.EntityUtils;
import org.apache.hc.core5.http.message.BasicNameValuePair;
import org.apache.hc.core5.net.URIBuilder;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.*;

/**
 * DolphinScheduler API 客户端(完整版,支持 sessionId 复用与自动刷新)
 * 功能对标 Python 版本:登录、查询、删除、创建、参数解析、依赖等
 *
 * 改进点:
 * - 使用 cachedSessionId 缓存有效会话
 * - 通过 /users/get-user-info 接口验证 sessionId 是否有效
 * - 自动重新登录并更新缓存(线程安全)
 */
@Service
public class DolphinSchedulerService {

    private final String dsApiBaseUrl;
    private final String dsUsername;
    private final String dsPassword;
    private final Long defaultDatasourceId;
    private final Long defaultProjectCode;

    // 缓存当前有效的 sessionId,volatile 保证可见性
    private volatile String cachedSessionId = null;

    public DolphinSchedulerService(
            @Value("${dolphinscheduler.api-base-url}") String dsApiBaseUrl,
            @Value("${dolphinscheduler.username}") String dsUsername,
            @Value("${dolphinscheduler.password}") String dsPassword,
            @Value("${dolphinscheduler.datasource-id}") Long datasourceId,
            @Value("${dolphinscheduler.project-code}") Long projectCode
    ) {
        this.dsApiBaseUrl = dsApiBaseUrl;
        this.dsUsername = dsUsername;
        this.dsPassword = dsPassword;
        this.defaultDatasourceId = datasourceId;
        this.defaultProjectCode = projectCode;
    }

    // ==================== 1. 登录 & Session 管理 ====================

    /**
     * 执行登录并返回 sessionId
     */
    private String doLogin() throws Exception {
        try (CloseableHttpClient client = HttpClients.createDefault()) {
            HttpPost post = new HttpPost(dsApiBaseUrl + "/login");
            List<NameValuePair> form = Arrays.asList(
                    new BasicNameValuePair("userName", dsUsername),
                    new BasicNameValuePair("userPassword", dsPassword)
            );
            post.setEntity(new UrlEncodedFormEntity(form, StandardCharsets.UTF_8));
            try (CloseableHttpResponse resp = client.execute(post)) {
                String body = EntityUtils.toString(resp.getEntity(), StandardCharsets.UTF_8);
                JSONObject json = JSONObject.parseObject(body);
                if (json.getInteger("code") == 0) {
                    return json.getJSONObject("data").getString("sessionId");
                } else {
                    throw new RuntimeException("登录失败: " + json.getString("msg"));
                }
            }
        }
    }

    /**
     * 验证当前 sessionId 是否有效(通过 /users/get-user-info 接口)
     */
    private boolean isSessionValid(String sessionId) {
        if (sessionId == null || sessionId.isEmpty()) {
            return false;
        }
        try (CloseableHttpClient client = HttpClients.createDefault()) {
            HttpGet get = new HttpGet(dsApiBaseUrl + "/users/get-user-info");
            get.setHeader("Sessionid", sessionId);
            try (CloseableHttpResponse resp = client.execute(get)) {
                String body = EntityUtils.toString(resp.getEntity(), StandardCharsets.UTF_8);
                JSONObject json = JSONObject.parseObject(body);
                // code == 0 表示成功,说明 session 有效
                return json.getInteger("code") == 0;
            }
        } catch (Exception e) {
            // 任何异常(网络、解析、401等)都视为无效
            return false;
        }
    }

    /**
     * 获取有效的 sessionId(自动复用或刷新)
     * 线程安全:使用 synchronized 防止多线程重复登录
     */
    private synchronized String getSessionId() throws Exception {
        // 先检查缓存是否有效
        if (cachedSessionId != null && isSessionValid(cachedSessionId)) {
            return cachedSessionId;
        }
        // 缓存无效,重新登录
        String newSessionId = doLogin();
        cachedSessionId = newSessionId; // 更新缓存
        return newSessionId;
    }

    // ==================== 2. 工具方法 ====================

    private void sendAndCheck(CloseableHttpClient client, ClassicHttpRequest req, String action) throws Exception {
        try (CloseableHttpResponse resp = client.execute(req)) {
            String body = EntityUtils.toString(resp.getEntity(), StandardCharsets.UTF_8);
            JSONObject result = JSONObject.parseObject(body);
            if (result.getInteger("code") != 0) {
                throw new RuntimeException(action + "失败: " + result.getString("msg"));
            }
            System.out.println("" + action + "成功");
        }
    }

    // ==================== 3. 查询工作流 code ====================

    public Long getProcessDefinitionCode(String processName) throws Exception {
        String sessionId = getSessionId(); // 使用统一获取方式
        URIBuilder builder = new URIBuilder(dsApiBaseUrl + "/projects/" + this.defaultProjectCode + "/process-definition")
                .setParameter("pageSize", "10")
                .setParameter("pageNo", "1")
                .setParameter("searchVal", processName);
        HttpGet get = new HttpGet(builder.build());
        get.setHeader("Sessionid", sessionId);

        try (CloseableHttpClient client = HttpClients.createDefault();
             CloseableHttpResponse resp = client.execute(get)) {
            String body = EntityUtils.toString(resp.getEntity(), StandardCharsets.UTF_8);
            JSONObject json = JSONObject.parseObject(body);
            if (json.getInteger("code") == 0) {
                JSONArray list = json.getJSONObject("data").getJSONArray("totalList");
                for (int i = 0; i < list.size(); i++) {
                    JSONObject item = list.getJSONObject(i);
                    if (processName.equals(item.getString("name"))) {
                        return item.getLong("code");
                    }
                }
            }
        }
        return null;
    }

    // ==================== 4. 删除工作流 ====================

    public void deleteProcessDefinition(Long processCode) throws Exception {
        String sessionId = getSessionId();
        try (CloseableHttpClient client = HttpClients.createDefault()) {
            HttpDelete delete = new HttpDelete(dsApiBaseUrl + "/projects/" + this.defaultProjectCode + "/process-definition/" + processCode);
            delete.setHeader("Sessionid", sessionId);
            sendAndCheck(client, delete, "删除工作流");
        }
    }

    // ==================== 5. 验证工作流名称是否可用 ====================

    public void verifyProcessName(String processName) throws Exception {
        String sessionId = getSessionId();
        URIBuilder builder = new URIBuilder(dsApiBaseUrl + "/projects/" + this.defaultProjectCode + "/process-definition/verify-name")
                .setParameter("name", processName);
        HttpGet get = new HttpGet(builder.build());
        get.setHeader("Sessionid", sessionId);

        try (CloseableHttpClient client = HttpClients.createDefault();
             CloseableHttpResponse resp = client.execute(get)) {
            String body = EntityUtils.toString(resp.getEntity(), StandardCharsets.UTF_8);
            JSONObject json = JSONObject.parseObject(body);
            if (json.getInteger("code") != 0) {
                throw new RuntimeException("工作流名称验证失败: " + json.getString("msg"));
            }
        }
    }

    // ==================== 6. 生成任务 code ====================

    public Long genTaskCode() throws Exception {
        String sessionId = getSessionId();
        URIBuilder builder = new URIBuilder(dsApiBaseUrl + "/projects/" + this.defaultProjectCode + "/task-definition/gen-task-codes")
                .setParameter("genNum", "1");
        HttpGet get = new HttpGet(builder.build());
        get.setHeader("Sessionid", sessionId);

        try (CloseableHttpClient client = HttpClients.createDefault();
             CloseableHttpResponse resp = client.execute(get)) {
            String body = EntityUtils.toString(resp.getEntity(), StandardCharsets.UTF_8);
            JSONObject json = JSONObject.parseObject(body);
            if (json.getInteger("code") == 0) {
                return json.getJSONArray("data").getLong(0);
            } else {
                throw new RuntimeException("生成 task code 失败: " + json.getString("msg"));
            }
        }
    }

    // ==================== 7. 构建任务参数(含参数解析) ====================

    private JSONObject buildTaskParams(TaskNodeDto node) {
        JSONObject params = new JSONObject();

        JSONArray localParams = new JSONArray();
        for (String param : node.getParams()) {
            JSONObject p = new JSONObject();
            p.put("prop", param);
            p.put("direct", "IN");
            switch (param) {
                case "p1d":
                    p.put("type", "VARCHAR");
                    p.put("value", "$[yyyy-MM-dd-1]");
                    break;
                case "p0d":
                    p.put("type", "VARCHAR");
                    p.put("value", "$[yyyy-MM-dd]");
                    break;
                case "p7d":
                    p.put("type", "VARCHAR");
                    p.put("value", "$[yyyy-MM-dd-7]");
                    break;
                case "p30d":
                    p.put("type", "VARCHAR");
                    p.put("value", "$[yyyy-MM-dd-30]");
                    break;
                default:
                    throw new IllegalArgumentException("不支持的参数: " + param);
            }
            localParams.add(p);
        }
        params.put("localParams", localParams);
        params.put("resourceList", new JSONArray());

        params.put("type", "MYSQL");
        params.put("datasource", this.defaultDatasourceId);
        params.put("sql", node.getSqlScript());
        params.put("sqlType", "1");
        params.put("preStatements", new JSONArray());
        params.put("postStatements", new JSONArray());
        params.put("segmentSeparator", ";");
        params.put("display_rows", 10);

        return params;
    }

    // ==================== 8. 创建工作流 ====================

    public Long createProcess(String processName, boolean deletePre) throws Exception {
        String sessionId = getSessionId();

        if (deletePre) {
            Long existingCode = getProcessDefinitionCode(processName);
            if (existingCode != null) {
                deleteProcessDefinition(existingCode);
            }
        }

        verifyProcessName(processName);

        JSONArray taskDefinitions = new JSONArray();
        JSONArray taskRelations = new JSONArray();
        JSONArray locations = new JSONArray();

        int x = 100, y = 100;
        List<Long> taskCodes = new ArrayList<>();

        List<TaskNodeDto> tasks = loadJobInfos(processName);
        tasks.sort(Comparator.comparing(TaskNodeDto::getOrder));

        for (int i = 0; i < tasks.size(); i++) {
            TaskNodeDto node = tasks.get(i);
            Long taskCode = genTaskCode(); // 不再传 sessionId
            taskCodes.add(taskCode);

            JSONObject taskDef = new JSONObject();
            taskDef.put("code", taskCode);
            taskDef.put("name", node.getName());
            taskDef.put("taskType", "SQL");
            taskDef.put("failRetryTimes", 3);
            taskDef.put("failRetryInterval", 1);
            taskDef.put("timeoutFlag", "CLOSE");
            taskDef.put("timeout", 0);
            taskDef.put("flag", "YES");
            taskDef.put("taskParams", buildTaskParams(node));
            taskDefinitions.add(taskDef);

            Long preTaskCode = (i == 0) ? 0L : taskCodes.get(i - 1);
            JSONObject relation = new JSONObject();
            relation.put("name", "");
            relation.put("preTaskCode", preTaskCode);
            relation.put("preTaskVersion", 0);
            relation.put("postTaskCode", taskCode);
            relation.put("postTaskVersion", 0);
            relation.put("conditionType", "NONE");
            relation.put("conditionParams", new JSONObject());
            taskRelations.add(relation);

            JSONObject loc = new JSONObject();
            loc.put("taskCode", taskCode);
            loc.put("x", x);
            loc.put("y", y);
            locations.add(loc);

            y += 100;
            if (y > 2000) {
                x += 270;
                y = 100;
            }
        }

        List<NameValuePair> formData = new ArrayList<>();
        formData.add(new BasicNameValuePair("name", processName));
        formData.add(new BasicNameValuePair("description", ""));
        formData.add(new BasicNameValuePair("tenantCode", "default"));
        formData.add(new BasicNameValuePair("executionType", "PARALLEL"));
        formData.add(new BasicNameValuePair("timeout", "0"));
        formData.add(new BasicNameValuePair("globalParams", "[]"));
        formData.add(new BasicNameValuePair("taskDefinitionJson", taskDefinitions.toJSONString()));
        formData.add(new BasicNameValuePair("taskRelationJson", taskRelations.toJSONString()));
        formData.add(new BasicNameValuePair("locations", locations.toJSONString()));
        formData.add(new BasicNameValuePair("releaseState", "OFFLINE"));

        try (CloseableHttpClient client = HttpClients.createDefault()) {
            HttpPost post = new HttpPost(dsApiBaseUrl + "/projects/" + this.defaultProjectCode + "/process-definition");
            post.setHeader("Sessionid", sessionId);
            post.setHeader("Content-Type", "application/x-www-form-urlencoded");
            post.setEntity(new UrlEncodedFormEntity(formData, StandardCharsets.UTF_8));

            try (CloseableHttpResponse resp = client.execute(post)) {
                String body = EntityUtils.toString(resp.getEntity(), StandardCharsets.UTF_8);
                JSONObject result = JSONObject.parseObject(body);
                if (result.getInteger("code") != 0) {
                    throw new RuntimeException("创建工作流失败: " + result.getString("msg"));
                }
                Long newCode = result.getJSONObject("data").getLong("code");
                System.out.println("✅ 工作流 [" + processName + "] 创建成功,code=" + newCode);
                return newCode;
            }
        }
    }

    // ==================== 9. 模拟从数据库加载任务 ====================

    private List<TaskNodeDto> loadJobInfos(String processName) {
        List<TaskNodeDto> tasks = new ArrayList<>();

        TaskNodeDto t1 = new TaskNodeDto();
        t1.setName("task_1");
        t1.setSqlScript("SELECT 1;");
        t1.setParams(Arrays.asList("p1d"));
        t1.setOrder(2);
        tasks.add(t1);

        TaskNodeDto t2 = new TaskNodeDto();
        t2.setName("task_2");
        t2.setSqlScript("INSERT INTO test VALUES (2);");
        t2.setParams(Arrays.asList("p0d", "p7d"));
        t2.setOrder(1);
        tasks.add(t2);

        return tasks;
    }

    // ==================== 10. 上线工作流 ====================

    public void releaseProcess(Long processCode, String processName, String state) throws Exception {
        String sessionId = getSessionId();
        try (CloseableHttpClient client = HttpClients.createDefault()) {
            HttpPost post = new HttpPost(dsApiBaseUrl + "/projects/" + this.defaultProjectCode + "/process-definition/" + processCode + "/release");
            post.setHeader("Sessionid", sessionId);
            List<NameValuePair> form = Arrays.asList(
                    new BasicNameValuePair("name", processName),
                    new BasicNameValuePair("releaseState", state)
            );
            post.setEntity(new UrlEncodedFormEntity(form, StandardCharsets.UTF_8));
            sendAndCheck(client, post, "设置工作流状态为 " + state);
        }
    }

    public void createSchedule(Long processCode, String cronExp) throws Exception {
        String sessionId = getSessionId();
        try (CloseableHttpClient client = HttpClients.createDefault()) {
            HttpPost post = new HttpPost(dsApiBaseUrl + "/projects/" + this.defaultProjectCode + "/schedules/");
            post.setHeader("Sessionid", sessionId);

            LocalDateTime now = LocalDateTime.now();
            DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
            String startTime = now.format(formatter);
            String endTime = now.plusYears(100).format(formatter);

            JSONObject schedule = new JSONObject();
            schedule.put("startTime", startTime);
            schedule.put("endTime", endTime);
            schedule.put("crontab", cronExp);
            schedule.put("timezoneId", "Asia/Shanghai");

            List<NameValuePair> form = Arrays.asList(
                    new BasicNameValuePair("processDefinitionCode", processCode.toString()),
                    new BasicNameValuePair("environmentCode", ""),
                    new BasicNameValuePair("workerGroup", "default"),
                    new BasicNameValuePair("warningGroupId", "0"),
                    new BasicNameValuePair("processInstancePriority", "MEDIUM"),
                    new BasicNameValuePair("warningType", "NONE"),
                    new BasicNameValuePair("failureStrategy", "CONTINUE"),
                    new BasicNameValuePair("schedule", schedule.toString())
            );
            post.setEntity(new UrlEncodedFormEntity(form, StandardCharsets.UTF_8));
            sendAndCheck(client, post, processCode + " 添加调度: " + cronExp);
        }
    }

    public Integer getScheduleId(Long processCode) throws Exception {
        String sessionId = getSessionId();
        URIBuilder builder = new URIBuilder(dsApiBaseUrl + "/projects/" + this.defaultProjectCode + "/schedules")
                .setParameter("pageSize", "10")
                .setParameter("pageNo", "1")
                .setParameter("processDefinitionCode", processCode.toString());
        HttpGet get = new HttpGet(builder.build());
        get.setHeader("Sessionid", sessionId);

        try (CloseableHttpClient client = HttpClients.createDefault();
             CloseableHttpResponse resp = client.execute(get)) {
            String body = EntityUtils.toString(resp.getEntity(), StandardCharsets.UTF_8);
            JSONObject json = JSONObject.parseObject(body);
            if (json.getInteger("code") == 0) {
                JSONArray list = json.getJSONObject("data").getJSONArray("totalList");
                if (!list.isEmpty()) {
                    JSONObject obj = list.getJSONObject(0);
                    return obj.getInteger("id");
                }
            }
        }
        return null;
    }

    public void scheduleOnline(Integer scheduleId) throws Exception {
        String sessionId = getSessionId();
        try (CloseableHttpClient client = HttpClients.createDefault()) {
            HttpPost post = new HttpPost(dsApiBaseUrl + "/projects/" + this.defaultProjectCode + "/schedules/" + scheduleId + "/online");
            post.setHeader("Sessionid", sessionId);
            sendAndCheck(client, post, "调度Id: " + scheduleId + "上线");
        }
    }
    
    public Integer getWorkflowInstanceId(String processName) throws Exception {
        String sessionId = getSessionId();
        URIBuilder builder = new URIBuilder(dsApiBaseUrl + "/projects/" + this.defaultProjectCode + "/task-instances")
                .setParameter("pageSize", "10")
                .setParameter("pageNo", "1")
                .setParameter("searchVal", "")
                .setParameter("processInstanceId", "")
                .setParameter("host", "")
                .setParameter("stateType", "")
                .setParameter("startDate", "")
                .setParameter("endDate", "")
                .setParameter("executorName", "")
                .setParameter("processInstanceName", processName);
        HttpGet get = new HttpGet(builder.build());
        get.setHeader("Sessionid", sessionId);

        try (CloseableHttpClient client = HttpClients.createDefault();
             CloseableHttpResponse resp = client.execute(get)) {
            String body = EntityUtils.toString(resp.getEntity(), StandardCharsets.UTF_8);
            JSONObject json = JSONObject.parseObject(body);
            if (json.getInteger("code") == 0) {
                JSONArray list = json.getJSONObject("data").getJSONArray("totalList");
                if (!list.isEmpty()) {
                    for (Object o : list) {
                        JSONObject obj = JSONObject.from(o);
                        String processInstanceName = obj.getString("processInstanceName");
                        String instanceName = processInstanceName.replaceAll("-\\d+-\\d{17}$", "");
                        if (instanceName.equals(processName)) {
                            return obj.getInteger("processInstanceId");
                        }
                    }
                    return null;
                }
            }
        }
        return null;
    }
    
    public Boolean runProcess(Long processCode) throws Exception{
        String sessionId = getSessionId();
        try (CloseableHttpClient client = HttpClients.createDefault()) {
            HttpPost post = new HttpPost(dsApiBaseUrl + "/projects/" + this.defaultProjectCode + "/executors/start-process-instance");
            post.setHeader("Sessionid", sessionId);
            LocalDateTime now = LocalDateTime.now();
            DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
            String startDate = now.format(formatter) + " 00:00:00";
            String endDate = now.format(formatter) + " 00:00:00";
            JSONObject scheduleTime = new JSONObject();
            scheduleTime.put("complementStartDate", startDate);
            scheduleTime.put("complementEndDate", endDate);
            List<NameValuePair> form = Arrays.asList(
                    new BasicNameValuePair("processDefinitionCode", processCode.toString()),
                    new BasicNameValuePair("failureStrategy", "CONTINUE"),
                    new BasicNameValuePair("warningType", "NONE"),
                    new BasicNameValuePair("warningGroupId", ""),
                    new BasicNameValuePair("execType", "START_PROCESS"),
                    new BasicNameValuePair("startNodeList", ""),
                    new BasicNameValuePair("taskDependType", "TASK_POST"),
                    new BasicNameValuePair("complementDependentMode", "OFF_MODE"),
                    new BasicNameValuePair("runMode", "RUN_MODE_SERIAL"),
                    new BasicNameValuePair("processInstancePriority", "MEDIUM"),
                    new BasicNameValuePair("workerGroup", "default"),
                    new BasicNameValuePair("environmentCode", ""),
                    new BasicNameValuePair("startParams", ""),
                    new BasicNameValuePair("expectedParallelismNumber", ""),
                    new BasicNameValuePair("dryRun", ""),
                    new BasicNameValuePair("scheduleTime", scheduleTime.toJSONString())
            );
            post.setEntity(new UrlEncodedFormEntity(form, StandardCharsets.UTF_8));
            sendAndCheck(client, post, "运行任务:" + processCode);
            return true;
        } catch (Exception e) {
            return false;
        }
    }
}

TestDolphinScheduler

import com.datacenter.ds.service.DolphinSchedulerService;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;

@SpringBootApplication(scanBasePackages = "com.datacenter.ds")
public class TestDolphinScheduler {
    public static void main(String[] args) throws Exception {
        SpringApplication app = new SpringApplication(TestDolphinScheduler.class);
        app.setWebApplicationType(WebApplicationType.NONE);
        ConfigurableApplicationContext context = app.run(args);

        DolphinSchedulerService ds = context.getBean(DolphinSchedulerService.class);
        Long code = ds.createProcess("测试Test-01", true);
        // 可选:上线
        if (code != null) {
            ds.releaseProcess(code, "测试Test-01", "ONLINE");
            ds.createSchedule(code, "0 0 * * 1 ? *");
            Integer scheduleId = ds.getScheduleId(code);
            System.out.println("scheduleId: " + scheduleId);
            ds.scheduleOnline(scheduleId);
        }

        context.close();
    }
}

application.yml

dolphinscheduler:
  api-base-url: http://xxx.xxx.xxx.xxx:12345/dolphinscheduler
  username: xxx
  password: xxx
  project-code: 123456789
  datasource-id: 1