Skip to content

Commit

Permalink
[INLONG-11400][Manager] Support Airflow schedule engine
Browse files Browse the repository at this point in the history
  • Loading branch information
ZKpLo committed Nov 12, 2024
1 parent 99dec05 commit 5bfe4d8
Show file tree
Hide file tree
Showing 39 changed files with 2,829 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.pojo.schedule.airflow;

import com.google.gson.annotations.SerializedName;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@AllArgsConstructor
@ApiModel(description = "Full representation of the connection.")
public class Connection {

@SerializedName("connection_id")
@ApiModelProperty("The connection ID.")
private String connectionId;

@SerializedName("conn_type")
@ApiModelProperty("The connection type.")
private String connType;

@SerializedName("description")
@ApiModelProperty("The description of the connection.")
private String description;

@SerializedName("host")
@ApiModelProperty("Host of the connection.")
private String host;

@SerializedName("login")
@ApiModelProperty("Login of the connection.")
private String login;

@SerializedName("schema")
@ApiModelProperty("Schema of the connection.")
private String schema;

@SerializedName("port")
@ApiModelProperty("Port of the connection.")
private Integer port;

@SerializedName("password")
@ApiModelProperty("Password of the connection.")
private String password;

@SerializedName("extra")
@ApiModelProperty("Additional information description of the connection.")
private String extra;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.pojo.schedule.airflow;

import com.google.gson.annotations.SerializedName;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;

import java.util.List;

@Data
@ApiModel(description = "Collection List of connections.")
public class ConnectionCollection {

@SerializedName("connections")
@ApiModelProperty("Airflow connection list.")
private List<Connection> connections;

@SerializedName("total_entries")
@ApiModelProperty("Number of Airflow Connections.")
private Integer totalEntries;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.pojo.schedule.airflow;

import com.google.gson.annotations.SerializedName;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;

@Data
@ApiModel(description = "DAG Description Information.")
public class DAG {

@SerializedName("dag_id")
@ApiModelProperty("The ID of the DAG.")
private String dagId;

@SerializedName("root_dag_id")
@ApiModelProperty("If the DAG is SubDAG then it is the top level DAG identifier. Otherwise, null.")
private String rootDagId;

@SerializedName("is_paused")
@ApiModelProperty("Whether the DAG is paused.")
private Boolean isPaused;

@SerializedName("is_active")
@ApiModelProperty("Whether the DAG is currently seen by the scheduler(s).")
private Boolean isActive;

@SerializedName("description")
@ApiModelProperty("User-provided DAG description, which can consist of several sentences or paragraphs that describe DAG contents.")
private String description;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.pojo.schedule.airflow;

import com.google.gson.annotations.SerializedName;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;

import java.util.List;

@Data
@ApiModel(description = "Collection of DAGs.")
public class DAGCollection {

@SerializedName("dags")
@ApiModelProperty("List of DAGs.")
private List<DAG> dags = null;

@SerializedName("total_entries")
@ApiModelProperty("The length of DAG list.")
private Integer totalEntries;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.pojo.schedule.airflow;

import com.google.gson.annotations.SerializedName;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;

@Data
@ApiModel(description = "DAGRun Description Information.")
public class DAGRun {

@SerializedName("conf")
@ApiModelProperty("JSON object describing additional configuration parameters.")
private Object conf;

@SerializedName("dag_id")
@ApiModelProperty("Airflow DAG id.")
private String dagId;

@SerializedName("dag_run_id")
@ApiModelProperty("Airflow DAGRun id (Nullable).")
private String dagRunId;

@SerializedName("end_date")
@ApiModelProperty("The end time of this DAGRun.")
private String endDate;

@SerializedName("start_date")
@ApiModelProperty("The start time of this DAGRun.")
private String startDate;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.pojo.schedule.airflow;

import com.google.gson.annotations.SerializedName;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@ApiModel(description = "DAGRunConf Description Information.")
public class DAGRunConf {

@SerializedName("inlong_group_id")
@ApiModelProperty("Specify the Inlong group ID")
private String inlongGroupId;

@SerializedName("start_time")
@ApiModelProperty("The start time of DAG scheduling.")
private long startTime;

@SerializedName("end_time")
@ApiModelProperty("The end time of DAG scheduling.")
private long endTime;

@SerializedName("boundary_type")
@ApiModelProperty("The offline task boundary type.")
private String boundaryType;

@SerializedName("cron_expr")
@ApiModelProperty("Cron expression.")
private String cronExpr;

@SerializedName("seconds_interval")
@ApiModelProperty("Time interval (in seconds).")
private String secondsInterval;

@SerializedName("connection_id")
@ApiModelProperty("Connection of Inlong Manager.")
private String connectionId;

@SerializedName("timezone")
@ApiModelProperty("The timezone.")
private String timezone;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.pojo.schedule.airflow;

import com.google.gson.annotations.SerializedName;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;

import java.math.BigDecimal;

@Data
@ApiModel(description = "[RFC7807](https://tools.ietf.org/html/rfc7807) compliant response. ")
public class Error {

@SerializedName("detail")
@ApiModelProperty("Error Details.")
private String detail;

@SerializedName("instance")
@ApiModelProperty("Error of the instance.")
private String instance;

@SerializedName("status")
@ApiModelProperty("Error of the status.")
private BigDecimal status;

@SerializedName("title")
@ApiModelProperty("Error of the title.")
private String title;

@SerializedName("type")
@ApiModelProperty("Error of the type.")
private String type;
}
6 changes: 6 additions & 0 deletions inlong-manager/manager-schedule/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -73,5 +73,11 @@
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
public enum ScheduleEngineType {

NONE("None"),
AIRFLOW("Airflow"),
QUARTZ("Quartz");

private final String type;
Expand Down
Loading

0 comments on commit 5bfe4d8

Please sign in to comment.