From c5b9950a2fa23b03498fadf96bcb55d47607b9e8 Mon Sep 17 00:00:00 2001 From: fuweng11 <76141879+fuweng11@users.noreply.github.com> Date: Tue, 8 Oct 2024 14:06:28 +0800 Subject: [PATCH] [INLONG-11153][Manager] Fix the problem of HTTP sink does not automatically allocate sort cluster (#11155) --- .../manager/common/consts/SinkType.java | 1 + .../sink/http/HttpResourceOperator.java | 57 +++++++++++++++++++ 2 files changed, 58 insertions(+) create mode 100644 inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/http/HttpResourceOperator.java diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java index 16a1bfd3d81..45241383c1d 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java @@ -91,6 +91,7 @@ public class SinkType extends StreamType { public static final Set SORT_STANDALONE_SINK = new HashSet<>(); static { + SINK_TO_CLUSTER.put(HTTP, ClusterType.SORT_HTTP); SINK_TO_CLUSTER.put(CLS, ClusterType.SORT_CLS); SINK_TO_CLUSTER.put(ES, ClusterType.SORT_ES); SINK_TO_CLUSTER.put(PULSAR, ClusterType.SORT_PULSAR); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/http/HttpResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/http/HttpResourceOperator.java new file mode 100644 index 00000000000..eb73b4afdcd --- /dev/null +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/http/HttpResourceOperator.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.service.resource.sink.http; + +import org.apache.inlong.manager.common.consts.InlongConstants; +import org.apache.inlong.manager.common.consts.SinkType; +import org.apache.inlong.manager.common.enums.SinkStatus; +import org.apache.inlong.manager.pojo.sink.SinkInfo; +import org.apache.inlong.manager.service.resource.sink.AbstractStandaloneSinkResourceOperator; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Service; + +/** + * Http resource operate for creating http resource + */ +@Service +public class HttpResourceOperator extends AbstractStandaloneSinkResourceOperator { + + private static final Logger LOG = LoggerFactory.getLogger(HttpResourceOperator.class); + + @Override + public Boolean accept(String sinkType) { + return SinkType.HTTP.equals(sinkType); + } + + @Override + public void createSinkResource(SinkInfo sinkInfo) { + LOG.info("begin to create sink resources sinkId={}", sinkInfo.getId()); + if (InlongConstants.DISABLE_CREATE_RESOURCE.equals(sinkInfo.getEnableCreateResource())) { + LOG.warn("create resource was disabled, skip to create for [" + sinkInfo.getId() + "]"); + return; + } else if (SinkStatus.CONFIG_SUCCESSFUL.getCode().equals(sinkInfo.getStatus())) { + LOG.warn("sink resource [" + sinkInfo.getId() + "] already success, skip to create"); + return; + } + this.checkTaskAndConsumerGroup(sinkInfo); + this.assignCluster(sinkInfo); + } + +}