文章
java操作dolphinscheduler
TaskNodeDto
import lombok.Data;
import java.util.ArrayList;
import java.util.List;
/**
* 任务节点 DTO,对应 DolphinScheduler 中的一个任务
*/
@Data
public class TaskNodeDto {
private String name; // 任务名称
private String sqlScript; // SQL 脚本
private Long datasourceId; // 数据源 ID(DolphinScheduler 中已注册的)
private List<String> params; // 参数列表,如 ["p_dt", "p1d"]
private Integer x = 100; // UI X 坐标
private Integer y = 100; // UI Y 坐标
private Integer order = 0; // 任务排序
public TaskNodeDto() {
this.params = new ArrayList<>();
}
}DolphinSchedulerService
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import com.datacenter.ds.model.TaskNodeDto;
import org.apache.hc.client5.http.classic.methods.*;
import org.apache.hc.client5.http.entity.UrlEncodedFormEntity;
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse;
import org.apache.hc.client5.http.impl.classic.HttpClients;
import org.apache.hc.core5.http.*;
import org.apache.hc.core5.http.io.entity.EntityUtils;
import org.apache.hc.core5.http.message.BasicNameValuePair;
import org.apache.hc.core5.net.URIBuilder;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.*;
/**
* DolphinScheduler API 客户端(完整版,支持 sessionId 复用与自动刷新)
* 功能对标 Python 版本:登录、查询、删除、创建、参数解析、依赖等
*
* 改进点:
* - 使用 cachedSessionId 缓存有效会话
* - 通过 /users/get-user-info 接口验证 sessionId 是否有效
* - 自动重新登录并更新缓存(线程安全)
*/
@Service
public class DolphinSchedulerService {
private final String dsApiBaseUrl;
private final String dsUsername;
private final String dsPassword;
private final Long defaultDatasourceId;
private final Long defaultProjectCode;
// 缓存当前有效的 sessionId,volatile 保证可见性
private volatile String cachedSessionId = null;
public DolphinSchedulerService(
@Value("${dolphinscheduler.api-base-url}") String dsApiBaseUrl,
@Value("${dolphinscheduler.username}") String dsUsername,
@Value("${dolphinscheduler.password}") String dsPassword,
@Value("${dolphinscheduler.datasource-id}") Long datasourceId,
@Value("${dolphinscheduler.project-code}") Long projectCode
) {
this.dsApiBaseUrl = dsApiBaseUrl;
this.dsUsername = dsUsername;
this.dsPassword = dsPassword;
this.defaultDatasourceId = datasourceId;
this.defaultProjectCode = projectCode;
}
// ==================== 1. 登录 & Session 管理 ====================
/**
* 执行登录并返回 sessionId
*/
private String doLogin() throws Exception {
try (CloseableHttpClient client = HttpClients.createDefault()) {
HttpPost post = new HttpPost(dsApiBaseUrl + "/login");
List<NameValuePair> form = Arrays.asList(
new BasicNameValuePair("userName", dsUsername),
new BasicNameValuePair("userPassword", dsPassword)
);
post.setEntity(new UrlEncodedFormEntity(form, StandardCharsets.UTF_8));
try (CloseableHttpResponse resp = client.execute(post)) {
String body = EntityUtils.toString(resp.getEntity(), StandardCharsets.UTF_8);
JSONObject json = JSONObject.parseObject(body);
if (json.getInteger("code") == 0) {
return json.getJSONObject("data").getString("sessionId");
} else {
throw new RuntimeException("登录失败: " + json.getString("msg"));
}
}
}
}
/**
* 验证当前 sessionId 是否有效(通过 /users/get-user-info 接口)
*/
private boolean isSessionValid(String sessionId) {
if (sessionId == null || sessionId.isEmpty()) {
return false;
}
try (CloseableHttpClient client = HttpClients.createDefault()) {
HttpGet get = new HttpGet(dsApiBaseUrl + "/users/get-user-info");
get.setHeader("Sessionid", sessionId);
try (CloseableHttpResponse resp = client.execute(get)) {
String body = EntityUtils.toString(resp.getEntity(), StandardCharsets.UTF_8);
JSONObject json = JSONObject.parseObject(body);
// code == 0 表示成功,说明 session 有效
return json.getInteger("code") == 0;
}
} catch (Exception e) {
// 任何异常(网络、解析、401等)都视为无效
return false;
}
}
/**
* 获取有效的 sessionId(自动复用或刷新)
* 线程安全:使用 synchronized 防止多线程重复登录
*/
private synchronized String getSessionId() throws Exception {
// 先检查缓存是否有效
if (cachedSessionId != null && isSessionValid(cachedSessionId)) {
return cachedSessionId;
}
// 缓存无效,重新登录
String newSessionId = doLogin();
cachedSessionId = newSessionId; // 更新缓存
return newSessionId;
}
// ==================== 2. 工具方法 ====================
private void sendAndCheck(CloseableHttpClient client, ClassicHttpRequest req, String action) throws Exception {
try (CloseableHttpResponse resp = client.execute(req)) {
String body = EntityUtils.toString(resp.getEntity(), StandardCharsets.UTF_8);
JSONObject result = JSONObject.parseObject(body);
if (result.getInteger("code") != 0) {
throw new RuntimeException(action + "失败: " + result.getString("msg"));
}
System.out.println("✅ " + action + "成功");
}
}
// ==================== 3. 查询工作流 code ====================
public Long getProcessDefinitionCode(String processName) throws Exception {
String sessionId = getSessionId(); // 使用统一获取方式
URIBuilder builder = new URIBuilder(dsApiBaseUrl + "/projects/" + this.defaultProjectCode + "/process-definition")
.setParameter("pageSize", "10")
.setParameter("pageNo", "1")
.setParameter("searchVal", processName);
HttpGet get = new HttpGet(builder.build());
get.setHeader("Sessionid", sessionId);
try (CloseableHttpClient client = HttpClients.createDefault();
CloseableHttpResponse resp = client.execute(get)) {
String body = EntityUtils.toString(resp.getEntity(), StandardCharsets.UTF_8);
JSONObject json = JSONObject.parseObject(body);
if (json.getInteger("code") == 0) {
JSONArray list = json.getJSONObject("data").getJSONArray("totalList");
for (int i = 0; i < list.size(); i++) {
JSONObject item = list.getJSONObject(i);
if (processName.equals(item.getString("name"))) {
return item.getLong("code");
}
}
}
}
return null;
}
// ==================== 4. 删除工作流 ====================
public void deleteProcessDefinition(Long processCode) throws Exception {
String sessionId = getSessionId();
try (CloseableHttpClient client = HttpClients.createDefault()) {
HttpDelete delete = new HttpDelete(dsApiBaseUrl + "/projects/" + this.defaultProjectCode + "/process-definition/" + processCode);
delete.setHeader("Sessionid", sessionId);
sendAndCheck(client, delete, "删除工作流");
}
}
// ==================== 5. 验证工作流名称是否可用 ====================
public void verifyProcessName(String processName) throws Exception {
String sessionId = getSessionId();
URIBuilder builder = new URIBuilder(dsApiBaseUrl + "/projects/" + this.defaultProjectCode + "/process-definition/verify-name")
.setParameter("name", processName);
HttpGet get = new HttpGet(builder.build());
get.setHeader("Sessionid", sessionId);
try (CloseableHttpClient client = HttpClients.createDefault();
CloseableHttpResponse resp = client.execute(get)) {
String body = EntityUtils.toString(resp.getEntity(), StandardCharsets.UTF_8);
JSONObject json = JSONObject.parseObject(body);
if (json.getInteger("code") != 0) {
throw new RuntimeException("工作流名称验证失败: " + json.getString("msg"));
}
}
}
// ==================== 6. 生成任务 code ====================
public Long genTaskCode() throws Exception {
String sessionId = getSessionId();
URIBuilder builder = new URIBuilder(dsApiBaseUrl + "/projects/" + this.defaultProjectCode + "/task-definition/gen-task-codes")
.setParameter("genNum", "1");
HttpGet get = new HttpGet(builder.build());
get.setHeader("Sessionid", sessionId);
try (CloseableHttpClient client = HttpClients.createDefault();
CloseableHttpResponse resp = client.execute(get)) {
String body = EntityUtils.toString(resp.getEntity(), StandardCharsets.UTF_8);
JSONObject json = JSONObject.parseObject(body);
if (json.getInteger("code") == 0) {
return json.getJSONArray("data").getLong(0);
} else {
throw new RuntimeException("生成 task code 失败: " + json.getString("msg"));
}
}
}
// ==================== 7. 构建任务参数(含参数解析) ====================
private JSONObject buildTaskParams(TaskNodeDto node) {
JSONObject params = new JSONObject();
JSONArray localParams = new JSONArray();
for (String param : node.getParams()) {
JSONObject p = new JSONObject();
p.put("prop", param);
p.put("direct", "IN");
switch (param) {
case "p1d":
p.put("type", "VARCHAR");
p.put("value", "$[yyyy-MM-dd-1]");
break;
case "p0d":
p.put("type", "VARCHAR");
p.put("value", "$[yyyy-MM-dd]");
break;
case "p7d":
p.put("type", "VARCHAR");
p.put("value", "$[yyyy-MM-dd-7]");
break;
case "p30d":
p.put("type", "VARCHAR");
p.put("value", "$[yyyy-MM-dd-30]");
break;
default:
throw new IllegalArgumentException("不支持的参数: " + param);
}
localParams.add(p);
}
params.put("localParams", localParams);
params.put("resourceList", new JSONArray());
params.put("type", "MYSQL");
params.put("datasource", this.defaultDatasourceId);
params.put("sql", node.getSqlScript());
params.put("sqlType", "1");
params.put("preStatements", new JSONArray());
params.put("postStatements", new JSONArray());
params.put("segmentSeparator", ";");
params.put("display_rows", 10);
return params;
}
// ==================== 8. 创建工作流 ====================
public Long createProcess(String processName, boolean deletePre) throws Exception {
String sessionId = getSessionId();
if (deletePre) {
Long existingCode = getProcessDefinitionCode(processName);
if (existingCode != null) {
deleteProcessDefinition(existingCode);
}
}
verifyProcessName(processName);
JSONArray taskDefinitions = new JSONArray();
JSONArray taskRelations = new JSONArray();
JSONArray locations = new JSONArray();
int x = 100, y = 100;
List<Long> taskCodes = new ArrayList<>();
List<TaskNodeDto> tasks = loadJobInfos(processName);
tasks.sort(Comparator.comparing(TaskNodeDto::getOrder));
for (int i = 0; i < tasks.size(); i++) {
TaskNodeDto node = tasks.get(i);
Long taskCode = genTaskCode(); // 不再传 sessionId
taskCodes.add(taskCode);
JSONObject taskDef = new JSONObject();
taskDef.put("code", taskCode);
taskDef.put("name", node.getName());
taskDef.put("taskType", "SQL");
taskDef.put("failRetryTimes", 3);
taskDef.put("failRetryInterval", 1);
taskDef.put("timeoutFlag", "CLOSE");
taskDef.put("timeout", 0);
taskDef.put("flag", "YES");
taskDef.put("taskParams", buildTaskParams(node));
taskDefinitions.add(taskDef);
Long preTaskCode = (i == 0) ? 0L : taskCodes.get(i - 1);
JSONObject relation = new JSONObject();
relation.put("name", "");
relation.put("preTaskCode", preTaskCode);
relation.put("preTaskVersion", 0);
relation.put("postTaskCode", taskCode);
relation.put("postTaskVersion", 0);
relation.put("conditionType", "NONE");
relation.put("conditionParams", new JSONObject());
taskRelations.add(relation);
JSONObject loc = new JSONObject();
loc.put("taskCode", taskCode);
loc.put("x", x);
loc.put("y", y);
locations.add(loc);
y += 100;
if (y > 2000) {
x += 270;
y = 100;
}
}
List<NameValuePair> formData = new ArrayList<>();
formData.add(new BasicNameValuePair("name", processName));
formData.add(new BasicNameValuePair("description", ""));
formData.add(new BasicNameValuePair("tenantCode", "default"));
formData.add(new BasicNameValuePair("executionType", "PARALLEL"));
formData.add(new BasicNameValuePair("timeout", "0"));
formData.add(new BasicNameValuePair("globalParams", "[]"));
formData.add(new BasicNameValuePair("taskDefinitionJson", taskDefinitions.toJSONString()));
formData.add(new BasicNameValuePair("taskRelationJson", taskRelations.toJSONString()));
formData.add(new BasicNameValuePair("locations", locations.toJSONString()));
formData.add(new BasicNameValuePair("releaseState", "OFFLINE"));
try (CloseableHttpClient client = HttpClients.createDefault()) {
HttpPost post = new HttpPost(dsApiBaseUrl + "/projects/" + this.defaultProjectCode + "/process-definition");
post.setHeader("Sessionid", sessionId);
post.setHeader("Content-Type", "application/x-www-form-urlencoded");
post.setEntity(new UrlEncodedFormEntity(formData, StandardCharsets.UTF_8));
try (CloseableHttpResponse resp = client.execute(post)) {
String body = EntityUtils.toString(resp.getEntity(), StandardCharsets.UTF_8);
JSONObject result = JSONObject.parseObject(body);
if (result.getInteger("code") != 0) {
throw new RuntimeException("创建工作流失败: " + result.getString("msg"));
}
Long newCode = result.getJSONObject("data").getLong("code");
System.out.println("✅ 工作流 [" + processName + "] 创建成功,code=" + newCode);
return newCode;
}
}
}
// ==================== 9. 模拟从数据库加载任务 ====================
private List<TaskNodeDto> loadJobInfos(String processName) {
List<TaskNodeDto> tasks = new ArrayList<>();
TaskNodeDto t1 = new TaskNodeDto();
t1.setName("task_1");
t1.setSqlScript("SELECT 1;");
t1.setParams(Arrays.asList("p1d"));
t1.setOrder(2);
tasks.add(t1);
TaskNodeDto t2 = new TaskNodeDto();
t2.setName("task_2");
t2.setSqlScript("INSERT INTO test VALUES (2);");
t2.setParams(Arrays.asList("p0d", "p7d"));
t2.setOrder(1);
tasks.add(t2);
return tasks;
}
// ==================== 10. 上线工作流 ====================
public void releaseProcess(Long processCode, String processName, String state) throws Exception {
String sessionId = getSessionId();
try (CloseableHttpClient client = HttpClients.createDefault()) {
HttpPost post = new HttpPost(dsApiBaseUrl + "/projects/" + this.defaultProjectCode + "/process-definition/" + processCode + "/release");
post.setHeader("Sessionid", sessionId);
List<NameValuePair> form = Arrays.asList(
new BasicNameValuePair("name", processName),
new BasicNameValuePair("releaseState", state)
);
post.setEntity(new UrlEncodedFormEntity(form, StandardCharsets.UTF_8));
sendAndCheck(client, post, "设置工作流状态为 " + state);
}
}
public void createSchedule(Long processCode, String cronExp) throws Exception {
String sessionId = getSessionId();
try (CloseableHttpClient client = HttpClients.createDefault()) {
HttpPost post = new HttpPost(dsApiBaseUrl + "/projects/" + this.defaultProjectCode + "/schedules/");
post.setHeader("Sessionid", sessionId);
LocalDateTime now = LocalDateTime.now();
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
String startTime = now.format(formatter);
String endTime = now.plusYears(100).format(formatter);
JSONObject schedule = new JSONObject();
schedule.put("startTime", startTime);
schedule.put("endTime", endTime);
schedule.put("crontab", cronExp);
schedule.put("timezoneId", "Asia/Shanghai");
List<NameValuePair> form = Arrays.asList(
new BasicNameValuePair("processDefinitionCode", processCode.toString()),
new BasicNameValuePair("environmentCode", ""),
new BasicNameValuePair("workerGroup", "default"),
new BasicNameValuePair("warningGroupId", "0"),
new BasicNameValuePair("processInstancePriority", "MEDIUM"),
new BasicNameValuePair("warningType", "NONE"),
new BasicNameValuePair("failureStrategy", "CONTINUE"),
new BasicNameValuePair("schedule", schedule.toString())
);
post.setEntity(new UrlEncodedFormEntity(form, StandardCharsets.UTF_8));
sendAndCheck(client, post, processCode + " 添加调度: " + cronExp);
}
}
public Integer getScheduleId(Long processCode) throws Exception {
String sessionId = getSessionId();
URIBuilder builder = new URIBuilder(dsApiBaseUrl + "/projects/" + this.defaultProjectCode + "/schedules")
.setParameter("pageSize", "10")
.setParameter("pageNo", "1")
.setParameter("processDefinitionCode", processCode.toString());
HttpGet get = new HttpGet(builder.build());
get.setHeader("Sessionid", sessionId);
try (CloseableHttpClient client = HttpClients.createDefault();
CloseableHttpResponse resp = client.execute(get)) {
String body = EntityUtils.toString(resp.getEntity(), StandardCharsets.UTF_8);
JSONObject json = JSONObject.parseObject(body);
if (json.getInteger("code") == 0) {
JSONArray list = json.getJSONObject("data").getJSONArray("totalList");
if (!list.isEmpty()) {
JSONObject obj = list.getJSONObject(0);
return obj.getInteger("id");
}
}
}
return null;
}
public void scheduleOnline(Integer scheduleId) throws Exception {
String sessionId = getSessionId();
try (CloseableHttpClient client = HttpClients.createDefault()) {
HttpPost post = new HttpPost(dsApiBaseUrl + "/projects/" + this.defaultProjectCode + "/schedules/" + scheduleId + "/online");
post.setHeader("Sessionid", sessionId);
sendAndCheck(client, post, "调度Id: " + scheduleId + "上线");
}
}
public Integer getWorkflowInstanceId(String processName) throws Exception {
String sessionId = getSessionId();
URIBuilder builder = new URIBuilder(dsApiBaseUrl + "/projects/" + this.defaultProjectCode + "/task-instances")
.setParameter("pageSize", "10")
.setParameter("pageNo", "1")
.setParameter("searchVal", "")
.setParameter("processInstanceId", "")
.setParameter("host", "")
.setParameter("stateType", "")
.setParameter("startDate", "")
.setParameter("endDate", "")
.setParameter("executorName", "")
.setParameter("processInstanceName", processName);
HttpGet get = new HttpGet(builder.build());
get.setHeader("Sessionid", sessionId);
try (CloseableHttpClient client = HttpClients.createDefault();
CloseableHttpResponse resp = client.execute(get)) {
String body = EntityUtils.toString(resp.getEntity(), StandardCharsets.UTF_8);
JSONObject json = JSONObject.parseObject(body);
if (json.getInteger("code") == 0) {
JSONArray list = json.getJSONObject("data").getJSONArray("totalList");
if (!list.isEmpty()) {
for (Object o : list) {
JSONObject obj = JSONObject.from(o);
String processInstanceName = obj.getString("processInstanceName");
String instanceName = processInstanceName.replaceAll("-\\d+-\\d{17}$", "");
if (instanceName.equals(processName)) {
return obj.getInteger("processInstanceId");
}
}
return null;
}
}
}
return null;
}
public Boolean runProcess(Long processCode) throws Exception{
String sessionId = getSessionId();
try (CloseableHttpClient client = HttpClients.createDefault()) {
HttpPost post = new HttpPost(dsApiBaseUrl + "/projects/" + this.defaultProjectCode + "/executors/start-process-instance");
post.setHeader("Sessionid", sessionId);
LocalDateTime now = LocalDateTime.now();
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
String startDate = now.format(formatter) + " 00:00:00";
String endDate = now.format(formatter) + " 00:00:00";
JSONObject scheduleTime = new JSONObject();
scheduleTime.put("complementStartDate", startDate);
scheduleTime.put("complementEndDate", endDate);
List<NameValuePair> form = Arrays.asList(
new BasicNameValuePair("processDefinitionCode", processCode.toString()),
new BasicNameValuePair("failureStrategy", "CONTINUE"),
new BasicNameValuePair("warningType", "NONE"),
new BasicNameValuePair("warningGroupId", ""),
new BasicNameValuePair("execType", "START_PROCESS"),
new BasicNameValuePair("startNodeList", ""),
new BasicNameValuePair("taskDependType", "TASK_POST"),
new BasicNameValuePair("complementDependentMode", "OFF_MODE"),
new BasicNameValuePair("runMode", "RUN_MODE_SERIAL"),
new BasicNameValuePair("processInstancePriority", "MEDIUM"),
new BasicNameValuePair("workerGroup", "default"),
new BasicNameValuePair("environmentCode", ""),
new BasicNameValuePair("startParams", ""),
new BasicNameValuePair("expectedParallelismNumber", ""),
new BasicNameValuePair("dryRun", ""),
new BasicNameValuePair("scheduleTime", scheduleTime.toJSONString())
);
post.setEntity(new UrlEncodedFormEntity(form, StandardCharsets.UTF_8));
sendAndCheck(client, post, "运行任务:" + processCode);
return true;
} catch (Exception e) {
return false;
}
}
}TestDolphinScheduler
import com.datacenter.ds.service.DolphinSchedulerService;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
@SpringBootApplication(scanBasePackages = "com.datacenter.ds")
public class TestDolphinScheduler {
public static void main(String[] args) throws Exception {
SpringApplication app = new SpringApplication(TestDolphinScheduler.class);
app.setWebApplicationType(WebApplicationType.NONE);
ConfigurableApplicationContext context = app.run(args);
DolphinSchedulerService ds = context.getBean(DolphinSchedulerService.class);
Long code = ds.createProcess("测试Test-01", true);
// 可选:上线
if (code != null) {
ds.releaseProcess(code, "测试Test-01", "ONLINE");
ds.createSchedule(code, "0 0 * * 1 ? *");
Integer scheduleId = ds.getScheduleId(code);
System.out.println("scheduleId: " + scheduleId);
ds.scheduleOnline(scheduleId);
}
context.close();
}
}application.yml
dolphinscheduler:
api-base-url: http://xxx.xxx.xxx.xxx:12345/dolphinscheduler
username: xxx
password: xxx
project-code: 123456789
datasource-id: 1