Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[INLONG-11400][Manager] Support Airflow schedule engine #11479

Merged
merged 2 commits into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.pojo.schedule.airflow;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@AllArgsConstructor
@JsonIgnoreProperties(ignoreUnknown = true)
@ApiModel(description = "Full representation of the connection.")
public class AirflowConnection {

@JsonProperty("connection_id")
@ApiModelProperty("The connection ID.")
private String connectionId;

@JsonProperty("conn_type")
@ApiModelProperty("The connection type.")
private String connType;

@JsonProperty("description")
@ApiModelProperty("The description of the connection.")
private String description;

@JsonProperty("host")
@ApiModelProperty("Host of the connection.")
private String host;

@JsonProperty("login")
@ApiModelProperty("Login of the connection.")
private String login;

@JsonProperty("schema")
@ApiModelProperty("Schema of the connection.")
private String schema;

@JsonProperty("port")
@ApiModelProperty("Port of the connection.")
private Integer port;

@JsonProperty("password")
@ApiModelProperty("Password of the connection.")
private String password;

@JsonProperty("extra")
@ApiModelProperty("Additional information description of the connection.")
private String extra;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.pojo.schedule.airflow;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;

@Data
@JsonIgnoreProperties(ignoreUnknown = true)
@ApiModel(description = "DAG Description Information.")
public class DAG {

@JsonProperty("dag_id")
@ApiModelProperty("The ID of the DAG.")
private String dagId;

@JsonProperty("root_dag_id")
@ApiModelProperty("If the DAG is SubDAG then it is the top level DAG identifier. Otherwise, null.")
private String rootDagId;

@JsonProperty("is_paused")
@ApiModelProperty("Whether the DAG is paused.")
private Boolean isPaused;

@JsonProperty("is_active")
@ApiModelProperty("Whether the DAG is currently seen by the scheduler(s).")
private Boolean isActive;

@JsonProperty("description")
@ApiModelProperty("User-provided DAG description, which can consist of several sentences or paragraphs that describe DAG contents.")
private String description;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.pojo.schedule.airflow;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;

import java.util.List;

@Data
@JsonIgnoreProperties(ignoreUnknown = true)
@ApiModel(description = "Collection of DAGs.")
public class DAGCollection {

@JsonProperty("dags")
@ApiModelProperty("List of DAGs.")
private List<DAG> dags = null;

@JsonProperty("total_entries")
@ApiModelProperty("The length of DAG list.")
private Integer totalEntries;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.pojo.schedule.airflow;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;

@Data
@JsonIgnoreProperties(ignoreUnknown = true)
@ApiModel(description = "DAGRun Description Information.")
public class DAGRun {

@JsonProperty("conf")
@ApiModelProperty("JSON object describing additional configuration parameters.")
private Object conf;

@JsonProperty("dag_id")
@ApiModelProperty("Airflow DAG id.")
private String dagId;

@JsonProperty("dag_run_id")
@ApiModelProperty("Airflow DAGRun id (Nullable).")
private String dagRunId;

@JsonProperty("end_date")
@ApiModelProperty("The end time of this DAGRun.")
private String endDate;

@JsonProperty("start_date")
@ApiModelProperty("The start time of this DAGRun.")
private String startDate;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.pojo.schedule.airflow;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@JsonIgnoreProperties(ignoreUnknown = true)
@ApiModel(description = "DAGRunConf Description Information.")
public class DAGRunConf {

@JsonProperty("inlong_group_id")
@ApiModelProperty("Specify the Inlong group ID")
private String inlongGroupId;

@JsonProperty("start_time")
@ApiModelProperty("The start time of DAG scheduling.")
private long startTime;

@JsonProperty("end_time")
@ApiModelProperty("The end time of DAG scheduling.")
private long endTime;

@JsonProperty("boundary_type")
@ApiModelProperty("The offline task boundary type.")
private String boundaryType;

@JsonProperty("cron_expr")
@ApiModelProperty("Cron expression.")
private String cronExpr;

@JsonProperty("seconds_interval")
@ApiModelProperty("Time interval (in seconds).")
private String secondsInterval;

@JsonProperty("connection_id")
@ApiModelProperty("Airflow Connection Id of Inlong Manager.")
private String connectionId;

@JsonProperty("timezone")
@ApiModelProperty("The timezone.")
private String timezone;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.pojo.schedule.airflow;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;

import java.math.BigDecimal;

@Data
@JsonIgnoreProperties(ignoreUnknown = true)
@ApiModel(description = "[RFC7807](https://tools.ietf.org/html/rfc7807) compliant response. ")
public class Error {

@JsonProperty("detail")
@ApiModelProperty("Error Details.")
private String detail;

@JsonProperty("instance")
@ApiModelProperty("Error of the instance.")
private String instance;

@JsonProperty("status")
@ApiModelProperty("Error of the status.")
private BigDecimal status;

@JsonProperty("title")
@ApiModelProperty("Error of the title.")
private String title;

@JsonProperty("type")
@ApiModelProperty("Error of the type.")
private String type;
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ public enum ScheduleEngineType {

NONE("None"),
QUARTZ("Quartz"),
AIRFLOW("Airflow"),
DOLPHINSCHEDULER("DolphinScheduler");

private final String type;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.schedule.airflow;

/**
* Contains constants for interacting with the Airflow API.
*/
public class AirFlowAPIConstant {

public static final String DEFAULT_TIMEZONE = "Asia/Shanghai";
public static final String INLONG_OFFLINE_DAG_TASK_PREFIX = "inlong_offline_task_";
public static final String SUBMIT_OFFLINE_JOB_URI = "/inlong/manager/api/group/submitOfflineJob";

// AirflowConnection
public static final String LIST_CONNECTIONS_URI = "/api/v1/connections";
public static final String GET_CONNECTION_URI = "/api/v1/connections/{connection_id}";

// DAG
public static final String LIST_DAGS_URI = "/api/v1/dags";
public static final String UPDATE_DAG_URI = "/api/v1/dags/{dag_id}";

// DAGRun
public static final String TRIGGER_NEW_DAG_RUN_URI = "/api/v1/dags/{dag_id}/dagRuns";

}
Loading
Loading