Skip to content

Commit

Permalink
[INLONG-11401][Manager] Support Dolphinscheduler schedule engine
Browse files Browse the repository at this point in the history
  • Loading branch information
emptyOVO committed Nov 7, 2024
1 parent 58fe6ee commit 41cf5ad
Show file tree
Hide file tree
Showing 17 changed files with 1,747 additions and 7 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.pojo.schedule.dolphinschedule;

import com.google.gson.annotations.SerializedName;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;

@Data
public class DSTaskDefinition {

@ApiModelProperty("DolphinScheduler task definition code")
@SerializedName("code")
private long code;

@ApiModelProperty("DolphinScheduler task definition code")
@SerializedName("delayTime")
private String delayTime;

@ApiModelProperty("DolphinScheduler task definition description")
@SerializedName("description")
private String description;

@ApiModelProperty("DolphinScheduler task definition environment code")
@SerializedName("environmentCode")
private int environmentCode;

@ApiModelProperty("DolphinScheduler task fail retry interval")
@SerializedName("failRetryInterval")
private String failRetryInterval;

@ApiModelProperty("DolphinScheduler task definition fail retry times")
@SerializedName("failRetryTimes")
private String failRetryTimes;

@ApiModelProperty("DolphinScheduler task definition flag")
@SerializedName("flag")
private String flag;

@ApiModelProperty("DolphinScheduler task definition isCache")
@SerializedName("isCache")
private String isCache;

@ApiModelProperty("DolphinScheduler task definition name")
@SerializedName("name")
private String name;

@ApiModelProperty("DolphinScheduler task definition params")
@SerializedName("taskParams")
private DSTaskParams taskParams;

@ApiModelProperty("DolphinScheduler task definition priority")
@SerializedName("taskPriority")
private String taskPriority;

@ApiModelProperty("DolphinScheduler task definition type")
@SerializedName("taskType")
private String taskType;

@ApiModelProperty("DolphinScheduler task definition timeout")
@SerializedName("timeout")
private int timeout;

@ApiModelProperty("DolphinScheduler task definition timeout flag")
@SerializedName("timeoutFlag")
private String timeoutFlag;

@ApiModelProperty("DolphinScheduler task definition timeout notify strategy")
@SerializedName("timeoutNotifyStrategy")
private String timeoutNotifyStrategy;

@ApiModelProperty("DolphinScheduler task definition worker group")
@SerializedName("workerGroup")
private String workerGroup;

@ApiModelProperty("DolphinScheduler task definition apu quota")
@SerializedName("cpuQuota")
private int cpuQuota;

@ApiModelProperty("DolphinScheduler task definition memory max")
@SerializedName("memoryMax")
private int memoryMax;

@ApiModelProperty("DolphinScheduler task definition execute type")
@SerializedName("taskExecuteType")
private String taskExecuteType;

public DSTaskDefinition() {
this.delayTime = "0";
this.description = "";
this.environmentCode = -1;
this.failRetryInterval = "1";
this.failRetryTimes = "0";
this.flag = "YES";
this.isCache = "NO";
this.taskPriority = "MEDIUM";
this.taskType = "SHELL";
this.timeoutFlag = "CLOSE";
this.timeoutNotifyStrategy = "";
this.workerGroup = "default";
this.cpuQuota = -1;
this.memoryMax = -1;
this.taskExecuteType = "BATCH";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.pojo.schedule.dolphinschedule;

import com.fasterxml.jackson.annotation.JsonProperty;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;

import java.util.ArrayList;
import java.util.List;
@Data
public class DSTaskParams {

@ApiModelProperty("DolphinScheduler task params local params")
@JsonProperty("localParams")
private List<Object> localParams;

@ApiModelProperty("DolphinScheduler task params raw script")
@JsonProperty("rawScript")
private String rawScript;

@ApiModelProperty("DolphinScheduler task params resource list")
@JsonProperty("resourceList")
private List<Object> resourceList;

public DSTaskParams() {
this.localParams = new ArrayList<>();
this.resourceList = new ArrayList<>();
this.rawScript = "";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.pojo.schedule.dolphinschedule;

import com.google.gson.annotations.SerializedName;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;

@Data
public class DSTaskRelation {

@ApiModelProperty("DolphinScheduler task relation name")
@SerializedName("name")
private String name;

@ApiModelProperty("DolphinScheduler task relation pre-task code")
@SerializedName("preTaskCode")
private int preTaskCode;

@ApiModelProperty("DolphinScheduler task relation pre-task version")
@SerializedName("preTaskVersion")
private int preTaskVersion;

@ApiModelProperty("DolphinScheduler task relation post-task code")
@SerializedName("postTaskCode")
private long postTaskCode;

@ApiModelProperty("DolphinScheduler task relation post-task version")
@SerializedName("postTaskVersion")
private int postTaskVersion;

@ApiModelProperty("DolphinScheduler task relation condition type")
@SerializedName("conditionType")
private String conditionType;

@ApiModelProperty("DolphinScheduler task relation condition params")
@SerializedName("conditionParams")
private Object conditionParams;

public DSTaskRelation() {
this.name = "";
this.conditionType = "NONE";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.pojo.schedule.dolphinschedule;

import com.fasterxml.jackson.annotation.JsonProperty;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;

@Data
public class DScheduleInfo {

@ApiModelProperty("DolphinScheduler schedule start time")
@JsonProperty("startTime")
private String startTime;

@ApiModelProperty("DolphinScheduler schedule end time")
@JsonProperty("endTime")
private String endTime;

@ApiModelProperty("DolphinScheduler schedule crontab expression")
@JsonProperty("crontab")
private String crontab;

@ApiModelProperty("DolphinScheduler schedule timezone id")
@JsonProperty("timezoneId")
private String timezoneId;

}
6 changes: 6 additions & 0 deletions inlong-manager/manager-schedule/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -73,5 +73,11 @@
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
public enum ScheduleEngineType {

NONE("None"),
QUARTZ("Quartz");
QUARTZ("Quartz"),
DOLPHINSCHEDULER("DolphinScheduler");

private final String type;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.schedule.dolphinscheduler;

import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;
import org.apache.inlong.manager.schedule.ScheduleEngineClient;
import org.apache.inlong.manager.schedule.ScheduleEngineType;

import org.springframework.stereotype.Service;

import javax.annotation.Resource;

/**
* Built-in implementation of third-party schedule engine client corresponding with {@link DolphinScheduleEngine}.
* DolphinScheduleClient simply invokes the {@link DolphinScheduleEngine} to register/unregister/update
* schedule info, all the logic for invoking the remote scheduling service is implemented in {@link DolphinScheduleEngine}
*/
@Service
public class DolphinScheduleClient implements ScheduleEngineClient {

@Resource
public DolphinScheduleEngine scheduleEngine;

@Override
public boolean accept(String engineType) {
return ScheduleEngineType.DOLPHINSCHEDULER.getType().equalsIgnoreCase(engineType);
}

@Override
public boolean register(ScheduleInfo scheduleInfo) {
return scheduleEngine.handleRegister(scheduleInfo);
}

@Override
public boolean unregister(String groupId) {
return scheduleEngine.handleUnregister(groupId);
}

@Override
public boolean update(ScheduleInfo scheduleInfo) {
return scheduleEngine.handleUpdate(scheduleInfo);
}
}
Loading

0 comments on commit 41cf5ad

Please sign in to comment.