diff --git a/docker/docker-compose/sql/apache_inlong_manager.sql b/docker/docker-compose/sql/apache_inlong_manager.sql index 0173e596eb9..97a8aef8206 100644 --- a/docker/docker-compose/sql/apache_inlong_manager.sql +++ b/docker/docker-compose/sql/apache_inlong_manager.sql @@ -851,4 +851,225 @@ CREATE TABLE `wf_task_instance` AUTO_INCREMENT = 704 DEFAULT CHARSET = utf8mb4 COMMENT ='Task instance'; +-- ---------------------------- +-- Table structure for cluster_set +-- ---------------------------- +DROP TABLE IF EXISTS `cluster_set`; +CREATE TABLE `cluster_set` +( + `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key', + `set_name` varchar(128) NOT NULL COMMENT 'ClusterSet name, English, numbers and underscore', + `cn_name` varchar(256) DEFAULT NULL COMMENT 'Chinese display name', + `description` varchar(256) DEFAULT NULL COMMENT 'ClusterSet Introduction', + `middleware_type` varchar(10) DEFAULT 'Pulsar' COMMENT 'The middleware type of data storage, high throughput: Pulsar', + `in_charges` varchar(512) DEFAULT NULL COMMENT 'Name of responsible person, separated by commas', + `followers` varchar(512) DEFAULT NULL COMMENT 'List of names of business followers, separated by commas', + `status` int(11) DEFAULT '21' COMMENT 'ClusterSet status', + `is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted', + `creator` varchar(64) DEFAULT NULL COMMENT 'creator name', + `modifier` varchar(64) DEFAULT NULL COMMENT 'modifier name', + `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'create time', + `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'modify time', + PRIMARY KEY (`id`), + UNIQUE KEY `unique_cluster_set` (`set_name`) +) ENGINE = InnoDB + AUTO_INCREMENT = 16 + DEFAULT CHARSET = utf8mb4 COMMENT ='ClusterSet table'; + +-- ---------------------------- +-- Table structure for cluster_set_inlongid +-- ---------------------------- +DROP TABLE IF EXISTS `cluster_set_inlongid`; +CREATE TABLE `cluster_set_inlongid` +( + `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key', + `set_name` varchar(128) NOT NULL COMMENT 'ClusterSet name, English, numbers and underscore', + `business_identifier` varchar(128) NOT NULL COMMENT 'Business identifier, filled in by the user, undeleted ones cannot be repeated', + PRIMARY KEY (`id`), + UNIQUE KEY `unique_cluster_set_inlongid` (`set_name`,`business_identifier`) +) ENGINE = InnoDB + AUTO_INCREMENT = 16 + DEFAULT CHARSET = utf8mb4 COMMENT ='InlongId table'; + +-- ---------------------------- +-- Table structure for cache_cluster +-- ---------------------------- +DROP TABLE IF EXISTS `cache_cluster`; +CREATE TABLE `cache_cluster` +( + `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key', + `cluster_name` varchar(128) NOT NULL COMMENT 'CacheCluster name, English, numbers and underscore', + `set_name` varchar(128) NOT NULL COMMENT 'ClusterSet name, English, numbers and underscore', + `zone` varchar(128) NOT NULL COMMENT 'Zone, sz/sh/tj', + PRIMARY KEY (`id`), + UNIQUE KEY `unique_cache_cluster` (`cluster_name`) +) ENGINE = InnoDB + AUTO_INCREMENT = 16 + DEFAULT CHARSET = utf8mb4 COMMENT ='CacheCluster table'; + +-- ---------------------------- +-- Table structure for cache_cluster_ext +-- ---------------------------- +DROP TABLE IF EXISTS `cache_cluster_ext`; +CREATE TABLE `cache_cluster_ext` +( + `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key', + `cluster_name` varchar(128) NOT NULL COMMENT 'CacheCluster name, English, numbers and underscore', + `key_name` varchar(64) NOT NULL COMMENT 'Configuration item name', + `key_value` varchar(256) DEFAULT NULL COMMENT 'The value of the configuration item', + `is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted', + `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'modify time', + PRIMARY KEY (`id`), + KEY `index_cache_cluster` (`cluster_name`) +) ENGINE = InnoDB + DEFAULT CHARSET = utf8mb4 COMMENT ='CacheCluster extension table'; + +-- ---------------------------- +-- Table structure for cache_topic +-- ---------------------------- +DROP TABLE IF EXISTS `cache_topic`; +CREATE TABLE `cache_topic` +( + `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key', + `topic_name` varchar(128) NOT NULL COMMENT 'Topic name, English, numbers and underscore', + `set_name` varchar(128) NOT NULL COMMENT 'ClusterSet name, English, numbers and underscore', + `partition_num` int(11) NOT NULL COMMENT 'Partition number', + PRIMARY KEY (`id`), + UNIQUE KEY `unique_cache_topic` (`topic_name`,`set_name`) +) ENGINE = InnoDB + AUTO_INCREMENT = 16 + DEFAULT CHARSET = utf8mb4 COMMENT ='CacheTopic table'; + +-- ---------------------------- +-- Table structure for proxy_cluster +-- ---------------------------- +DROP TABLE IF EXISTS `proxy_cluster`; +CREATE TABLE `proxy_cluster` +( + `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key', + `cluster_name` varchar(128) NOT NULL COMMENT 'ProxyCluster name, English, numbers and underscore', + `set_name` varchar(128) NOT NULL COMMENT 'ClusterSet name, English, numbers and underscore', + `zone` varchar(128) NOT NULL COMMENT 'Zone, sz/sh/tj', + PRIMARY KEY (`id`), + UNIQUE KEY `unique_proxy_cluster` (`cluster_name`,`set_name`) +) ENGINE = InnoDB + AUTO_INCREMENT = 16 + DEFAULT CHARSET = utf8mb4 COMMENT ='ProxyCluster table'; + +-- ---------------------------- +-- Table structure for proxy_cluster_to_cache_cluster +-- ---------------------------- +DROP TABLE IF EXISTS `proxy_cluster_to_cache_cluster`; +CREATE TABLE `proxy_cluster_to_cache_cluster` +( + `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key', + `proxy_cluster_name` varchar(128) NOT NULL COMMENT 'ProxyCluster name, English, numbers and underscore', + `cache_cluster_name` varchar(128) NOT NULL COMMENT 'CacheCluster name, English, numbers and underscore', + PRIMARY KEY (`id`), + UNIQUE KEY `unique_proxy_cluster_to_cache_cluster` (`proxy_cluster_name`,`cache_cluster_name`) +) ENGINE = InnoDB + DEFAULT CHARSET = utf8mb4 COMMENT ='The relation table of ProxyCluster and CacheCluster'; + +-- ---------------------------- +-- Table structure for flume_source +-- ---------------------------- +DROP TABLE IF EXISTS `flume_source`; +CREATE TABLE `flume_source` +( + `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key', + `source_name` varchar(128) NOT NULL COMMENT 'FlumeSource name, English, numbers and underscore', + `set_name` varchar(128) NOT NULL COMMENT 'ClusterSet name, English, numbers and underscore', + `type` varchar(128) NOT NULL COMMENT 'FlumeSource classname', + `channels` varchar(128) NOT NULL COMMENT 'The channels of FlumeSource, separated by space', + `selector_type` varchar(128) NOT NULL COMMENT 'FlumeSource channel selector classname', + PRIMARY KEY (`id`), + UNIQUE KEY `unique_flume_source` (`source_name`,`set_name`) +) ENGINE = InnoDB + DEFAULT CHARSET = utf8mb4 COMMENT ='FlumeSource table'; + +-- ---------------------------- +-- Table structure for flume_source_ext +-- ---------------------------- +DROP TABLE IF EXISTS `flume_source_ext`; +CREATE TABLE `flume_source_ext` +( + `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key', + `parent_name` varchar(128) NOT NULL COMMENT 'FlumeSource name, English, numbers and underscore', + `set_name` varchar(128) NOT NULL COMMENT 'ClusterSet name, English, numbers and underscore', + `key_name` varchar(64) NOT NULL COMMENT 'Configuration item name', + `key_value` varchar(256) DEFAULT NULL COMMENT 'The value of the configuration item', + `is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted', + `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'modify time', + PRIMARY KEY (`id`), + KEY `index_flume_source_ext` (`parent_name`) +) ENGINE = InnoDB + DEFAULT CHARSET = utf8mb4 COMMENT ='FlumeSource extension table'; + +-- ---------------------------- +-- Table structure for flume_channel +-- ---------------------------- +DROP TABLE IF EXISTS `flume_channel`; +CREATE TABLE `flume_channel` +( + `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key', + `channel_name` varchar(128) NOT NULL COMMENT 'FlumeChannel name, English, numbers and underscore', + `set_name` varchar(128) NOT NULL COMMENT 'ClusterSet name, English, numbers and underscore', + `type` varchar(128) NOT NULL COMMENT 'FlumeChannel classname', + PRIMARY KEY (`id`), + UNIQUE KEY `unique_flume_channel` (`channel_name`,`set_name`) +) ENGINE = InnoDB + DEFAULT CHARSET = utf8mb4 COMMENT ='FlumeChannel table'; + +-- ---------------------------- +-- Table structure for flume_channel_ext +-- ---------------------------- +DROP TABLE IF EXISTS `flume_channel_ext`; +CREATE TABLE `flume_channel_ext` +( + `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key', + `parent_name` varchar(128) NOT NULL COMMENT 'FlumeChannel name, English, numbers and underscore', + `set_name` varchar(128) NOT NULL COMMENT 'ClusterSet name, English, numbers and underscore', + `key_name` varchar(64) NOT NULL COMMENT 'Configuration item name', + `key_value` varchar(256) DEFAULT NULL COMMENT 'The value of the configuration item', + `is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted', + `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'modify time', + PRIMARY KEY (`id`), + KEY `index_flume_channel_ext` (`parent_name`) +) ENGINE = InnoDB + DEFAULT CHARSET = utf8mb4 COMMENT ='FlumeChannel extension table'; + +-- ---------------------------- +-- Table structure for flume_sink +-- ---------------------------- +DROP TABLE IF EXISTS `flume_sink`; +CREATE TABLE `flume_sink` +( + `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key', + `sink_name` varchar(128) NOT NULL COMMENT 'FlumeSink name, English, numbers and underscore', + `set_name` varchar(128) NOT NULL COMMENT 'ClusterSet name, English, numbers and underscore', + `type` varchar(128) NOT NULL COMMENT 'FlumeSink classname', + `channel` varchar(128) NOT NULL COMMENT 'FlumeSink channel', + PRIMARY KEY (`id`), + UNIQUE KEY `unique_flume_sink` (`sink_name`,`set_name`) +) ENGINE = InnoDB + DEFAULT CHARSET = utf8mb4 COMMENT ='FlumeSink table'; + +-- ---------------------------- +-- Table structure for flume_sink_ext +-- ---------------------------- +DROP TABLE IF EXISTS `flume_sink_ext`; +CREATE TABLE `flume_sink_ext` +( + `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key', + `parent_name` varchar(128) NOT NULL COMMENT 'FlumeSink name, English, numbers and underscore', + `set_name` varchar(128) NOT NULL COMMENT 'ClusterSet name, English, numbers and underscore', + `key_name` varchar(64) NOT NULL COMMENT 'Configuration item name', + `key_value` varchar(256) DEFAULT NULL COMMENT 'The value of the configuration item', + `is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted', + `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'modify time', + PRIMARY KEY (`id`), + KEY `index_flume_sink_ext` (`parent_name`) +) ENGINE = InnoDB + DEFAULT CHARSET = utf8mb4 COMMENT ='FlumeSink extension table'; + SET FOREIGN_KEY_CHECKS = 1; diff --git a/inlong-common/src/main/java/org/apache/inlong/commons/pojo/dataproxy/CacheClusterObject.java b/inlong-common/src/main/java/org/apache/inlong/commons/pojo/dataproxy/CacheClusterObject.java new file mode 100644 index 00000000000..02d7f955134 --- /dev/null +++ b/inlong-common/src/main/java/org/apache/inlong/commons/pojo/dataproxy/CacheClusterObject.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.commons.pojo.dataproxy; + +import java.util.HashMap; +import java.util.Map; + +/** + * CacheCluster + */ +public class CacheClusterObject { + private String name; + private String zone; + private Map params = new HashMap<>(); + + /** + * get name + * + * @return the name + */ + public String getName() { + return name; + } + + /** + * set name + * + * @param name the name to set + */ + public void setName(String name) { + this.name = name; + } + + /** + * get zone + * + * @return the zone + */ + public String getZone() { + return zone; + } + + /** + * set zone + * + * @param zone the zone to set + */ + public void setZone(String zone) { + this.zone = zone; + } + + /** + * get params + * + * @return the params + */ + public Map getParams() { + return params; + } + + /** + * set params + * + * @param params the params to set + */ + public void setParams(Map params) { + this.params = params; + } + +} diff --git a/inlong-common/src/main/java/org/apache/inlong/commons/pojo/dataproxy/CacheClusterSetObject.java b/inlong-common/src/main/java/org/apache/inlong/commons/pojo/dataproxy/CacheClusterSetObject.java new file mode 100644 index 00000000000..f6d632bda86 --- /dev/null +++ b/inlong-common/src/main/java/org/apache/inlong/commons/pojo/dataproxy/CacheClusterSetObject.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.commons.pojo.dataproxy; + +import java.util.ArrayList; +import java.util.List; + +/** + * CacheClusterSet + */ +public class CacheClusterSetObject { + private String setName; + private String type; + private List cacheClusters = new ArrayList<>(); + private List topics = new ArrayList<>(); + + /** + * get setName + * + * @return the setName + */ + public String getSetName() { + return setName; + } + + /** + * set setName + * + * @param setName the setName to set + */ + public void setSetName(String setName) { + this.setName = setName; + } + + /** + * get type + * + * @return the type + */ + public String getType() { + return type; + } + + /** + * set type + * + * @param type the type to set + */ + public void setType(String type) { + this.type = type; + } + + /** + * get cacheClusters + * + * @return the cacheClusters + */ + public List getCacheClusters() { + return cacheClusters; + } + + /** + * set cacheClusters + * + * @param cacheClusters the cacheClusters to set + */ + public void setCacheClusters(List cacheClusters) { + this.cacheClusters = cacheClusters; + } + + /** + * get topics + * + * @return the topics + */ + public List getTopics() { + return topics; + } + + /** + * set topics + * + * @param topics the topics to set + */ + public void setTopics(List topics) { + this.topics = topics; + } + +} diff --git a/inlong-common/src/main/java/org/apache/inlong/commons/pojo/dataproxy/CacheTopicObject.java b/inlong-common/src/main/java/org/apache/inlong/commons/pojo/dataproxy/CacheTopicObject.java new file mode 100644 index 00000000000..12a37984e25 --- /dev/null +++ b/inlong-common/src/main/java/org/apache/inlong/commons/pojo/dataproxy/CacheTopicObject.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.commons.pojo.dataproxy; + +/** + * CacheTopic + */ +public class CacheTopicObject { + private String topic; + private Integer partitionNum; + + /** + * get topic + * + * @return the topic + */ + public String getTopic() { + return topic; + } + + /** + * set topic + * + * @param topic the topic to set + */ + public void setTopic(String topic) { + this.topic = topic; + } + + /** + * get partitionNum + * + * @return the partitionNum + */ + public Integer getPartitionNum() { + return partitionNum; + } + + /** + * set partitionNum + * + * @param partitionNum the partitionNum to set + */ + public void setPartitionNum(Integer partitionNum) { + this.partitionNum = partitionNum; + } + +} diff --git a/inlong-common/src/main/java/org/apache/inlong/commons/pojo/dataproxy/DataProxyCluster.java b/inlong-common/src/main/java/org/apache/inlong/commons/pojo/dataproxy/DataProxyCluster.java new file mode 100644 index 00000000000..c19ffd54729 --- /dev/null +++ b/inlong-common/src/main/java/org/apache/inlong/commons/pojo/dataproxy/DataProxyCluster.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.commons.pojo.dataproxy; + +/** + * DataProxyCluster + */ +public class DataProxyCluster { + private ProxyClusterObject proxyCluster = new ProxyClusterObject(); + private CacheClusterSetObject cacheClusterSet = new CacheClusterSetObject(); + + /** + * get proxyCluster + * + * @return the proxyCluster + */ + public ProxyClusterObject getProxyCluster() { + return proxyCluster; + } + + /** + * set proxyCluster + * + * @param proxyCluster the proxyCluster to set + */ + public void setProxyCluster(ProxyClusterObject proxyCluster) { + this.proxyCluster = proxyCluster; + } + + /** + * get cacheClusterSet + * + * @return the cacheClusterSet + */ + public CacheClusterSetObject getCacheClusterSet() { + return cacheClusterSet; + } + + /** + * set cacheClusterSet + * + * @param cacheClusterSet the cacheClusterSet to set + */ + public void setCacheClusterSet(CacheClusterSetObject cacheClusterSet) { + this.cacheClusterSet = cacheClusterSet; + } + +} diff --git a/inlong-common/src/main/java/org/apache/inlong/commons/pojo/dataproxy/DataProxyConfigResponse.java b/inlong-common/src/main/java/org/apache/inlong/commons/pojo/dataproxy/DataProxyConfigResponse.java new file mode 100644 index 00000000000..2dca53328e1 --- /dev/null +++ b/inlong-common/src/main/java/org/apache/inlong/commons/pojo/dataproxy/DataProxyConfigResponse.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.commons.pojo.dataproxy; + +/** + * DataProxyConfigResponse + */ +public class DataProxyConfigResponse { + public static final int SUCC = 0; // success + public static final int NOUPDATE = 1; // no update, for same md5 + public static final int FAIL = -1; // general error + public static final int REQ_PARAMS_ERROR = -101; // error request parameter + + private Boolean result; + private Integer errCode; + private String md5; + private DataProxyCluster data; + + /** + * get result + * + * @return the result + */ + public Boolean isResult() { + return result; + } + + /** + * set result + * + * @param result the result to set + */ + public void setResult(Boolean result) { + this.result = result; + } + + /** + * get errCode + * + * @return the errCode + */ + public Integer getErrCode() { + return errCode; + } + + /** + * set errCode + * + * @param errCode the errCode to set + */ + public void setErrCode(Integer errCode) { + this.errCode = errCode; + } + + /** + * get md5 + * + * @return the md5 + */ + public String getMd5() { + return md5; + } + + /** + * set md5 + * + * @param md5 the md5 to set + */ + public void setMd5(String md5) { + this.md5 = md5; + } + + /** + * get data + * + * @return the data + */ + public DataProxyCluster getData() { + return data; + } + + /** + * set data + * + * @param data the data to set + */ + public void setData(DataProxyCluster data) { + this.data = data; + } + +} diff --git a/inlong-common/src/main/java/org/apache/inlong/commons/pojo/dataproxy/IRepository.java b/inlong-common/src/main/java/org/apache/inlong/commons/pojo/dataproxy/IRepository.java new file mode 100644 index 00000000000..4013d8d48fa --- /dev/null +++ b/inlong-common/src/main/java/org/apache/inlong/commons/pojo/dataproxy/IRepository.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.commons.pojo.dataproxy; + +/** + * IRepository + */ +public interface IRepository { + public static final long DEFAULT_HEARTBEAT_INTERVAL_MS = 60000; + public static final String SEPARATOR = "&"; + public static final String KEY_VALUE_SEPARATOR = "="; + + void reload(); +} diff --git a/inlong-common/src/main/java/org/apache/inlong/commons/pojo/dataproxy/InLongIdObject.java b/inlong-common/src/main/java/org/apache/inlong/commons/pojo/dataproxy/InLongIdObject.java new file mode 100644 index 00000000000..4973c900ed7 --- /dev/null +++ b/inlong-common/src/main/java/org/apache/inlong/commons/pojo/dataproxy/InLongIdObject.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.commons.pojo.dataproxy; + +import java.util.HashMap; +import java.util.Map; + +/** + * InLongId + */ +public class InLongIdObject { + private String inlongId; + private String topic; + private Map params = new HashMap<>(); + + /** + * get inlongId + * + * @return the inlongId + */ + public String getInlongId() { + return inlongId; + } + + /** + * set inlongId + * + * @param inlongId the inlongId to set + */ + public void setInlongId(String inlongId) { + this.inlongId = inlongId; + } + + /** + * get topic + * + * @return the topic + */ + public String getTopic() { + return topic; + } + + /** + * set topic + * + * @param topic the topic to set + */ + public void setTopic(String topic) { + this.topic = topic; + } + + /** + * get params + * + * @return the params + */ + public Map getParams() { + return params; + } + + /** + * set params + * + * @param params the params to set + */ + public void setParams(Map params) { + this.params = params; + } + +} diff --git a/inlong-common/src/main/java/org/apache/inlong/commons/pojo/dataproxy/ProxyChannel.java b/inlong-common/src/main/java/org/apache/inlong/commons/pojo/dataproxy/ProxyChannel.java new file mode 100644 index 00000000000..2d1de34f014 --- /dev/null +++ b/inlong-common/src/main/java/org/apache/inlong/commons/pojo/dataproxy/ProxyChannel.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.commons.pojo.dataproxy; + +import java.util.HashMap; +import java.util.Map; + +/** + * DataProxyChannel + */ +public class ProxyChannel { + private String name; + private String type; + private Map params = new HashMap<>(); + + /** + * get name + * + * @return the name + */ + public String getName() { + return name; + } + + /** + * set name + * + * @param name the name to set + */ + public void setName(String name) { + this.name = name; + } + + /** + * get type + * + * @return the type + */ + public String getType() { + return type; + } + + /** + * set type + * + * @param type the type to set + */ + public void setType(String type) { + this.type = type; + } + + /** + * get params + * + * @return the params + */ + public Map getParams() { + return params; + } + + /** + * set params + * + * @param params the params to set + */ + public void setParams(Map params) { + this.params = params; + } + +} diff --git a/inlong-common/src/main/java/org/apache/inlong/commons/pojo/dataproxy/ProxyClusterObject.java b/inlong-common/src/main/java/org/apache/inlong/commons/pojo/dataproxy/ProxyClusterObject.java new file mode 100644 index 00000000000..b620b33ab80 --- /dev/null +++ b/inlong-common/src/main/java/org/apache/inlong/commons/pojo/dataproxy/ProxyClusterObject.java @@ -0,0 +1,161 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.commons.pojo.dataproxy; + +import java.util.ArrayList; +import java.util.List; + +/** + * DataProxyCluster + */ +public class ProxyClusterObject { + private String name; + private String setName; + private String zone; + private List channels = new ArrayList<>(); + private List inlongIds = new ArrayList<>(); + private List sources = new ArrayList<>(); + private List sinks = new ArrayList<>(); + + /** + * get name + * + * @return the name + */ + public String getName() { + return name; + } + + /** + * set name + * + * @param name the name to set + */ + public void setName(String name) { + this.name = name; + } + + /** + * get setName + * + * @return the setName + */ + public String getSetName() { + return setName; + } + + /** + * set setName + * + * @param setName the setName to set + */ + public void setSetName(String setName) { + this.setName = setName; + } + + /** + * get zone + * + * @return the zone + */ + public String getZone() { + return zone; + } + + /** + * set zone + * + * @param zone the zone to set + */ + public void setZone(String zone) { + this.zone = zone; + } + + /** + * get channels + * + * @return the channels + */ + public List getChannels() { + return channels; + } + + /** + * set channels + * + * @param channels the channels to set + */ + public void setChannels(List channels) { + this.channels = channels; + } + + /** + * get inlongIds + * + * @return the inlongIds + */ + public List getInlongIds() { + return inlongIds; + } + + /** + * set inlongIds + * + * @param inlongIds the inlongIds to set + */ + public void setInlongIds(List inlongIds) { + this.inlongIds = inlongIds; + } + + /** + * get sources + * + * @return the sources + */ + public List getSources() { + return sources; + } + + /** + * set sources + * + * @param sources the sources to set + */ + public void setSources(List sources) { + this.sources = sources; + } + + /** + * get sinks + * + * @return the sinks + */ + public List getSinks() { + return sinks; + } + + /** + * set sinks + * + * @param sinks the sinks to set + */ + public void setSinks(List sinks) { + this.sinks = sinks; + } + +} diff --git a/inlong-common/src/main/java/org/apache/inlong/commons/pojo/dataproxy/ProxySink.java b/inlong-common/src/main/java/org/apache/inlong/commons/pojo/dataproxy/ProxySink.java new file mode 100644 index 00000000000..1f4240943f4 --- /dev/null +++ b/inlong-common/src/main/java/org/apache/inlong/commons/pojo/dataproxy/ProxySink.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.commons.pojo.dataproxy; + +import java.util.HashMap; +import java.util.Map; + +/** + * DataProxySink + */ +public class ProxySink { + private String name; + private String type; + private String channel; + private Map params = new HashMap<>(); + + /** + * get name + * + * @return the name + */ + public String getName() { + return name; + } + + /** + * set name + * + * @param name the name to set + */ + public void setName(String name) { + this.name = name; + } + + /** + * get type + * + * @return the type + */ + public String getType() { + return type; + } + + /** + * set type + * + * @param type the type to set + */ + public void setType(String type) { + this.type = type; + } + + /** + * get channel + * + * @return the channel + */ + public String getChannel() { + return channel; + } + + /** + * set channel + * + * @param channel the channel to set + */ + public void setChannel(String channel) { + this.channel = channel; + } + + /** + * get params + * + * @return the params + */ + public Map getParams() { + return params; + } + + /** + * set params + * + * @param params the params to set + */ + public void setParams(Map params) { + this.params = params; + } + +} diff --git a/inlong-common/src/main/java/org/apache/inlong/commons/pojo/dataproxy/ProxySource.java b/inlong-common/src/main/java/org/apache/inlong/commons/pojo/dataproxy/ProxySource.java new file mode 100644 index 00000000000..15f4d49b769 --- /dev/null +++ b/inlong-common/src/main/java/org/apache/inlong/commons/pojo/dataproxy/ProxySource.java @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.commons.pojo.dataproxy; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * DataProxySource + */ +public class ProxySource { + private String name; + private String type; + private String selectorType; + private List channels = new ArrayList<>(); + private Map params = new HashMap<>(); + + /** + * get name + * + * @return the name + */ + public String getName() { + return name; + } + + /** + * set name + * + * @param name the name to set + */ + public void setName(String name) { + this.name = name; + } + + /** + * get type + * + * @return the type + */ + public String getType() { + return type; + } + + /** + * set type + * + * @param type the type to set + */ + public void setType(String type) { + this.type = type; + } + + /** + * get selectorType + * + * @return the selectorType + */ + public String getSelectorType() { + return selectorType; + } + + /** + * set selectorType + * + * @param selectorType the selectorType to set + */ + public void setSelectorType(String selectorType) { + this.selectorType = selectorType; + } + + /** + * get channels + * + * @return the channels + */ + public List getChannels() { + return channels; + } + + /** + * set channels + * + * @param channels the channels to set + */ + public void setChannels(List channels) { + this.channels = channels; + } + + /** + * get params + * + * @return the params + */ + public Map getParams() { + return params; + } + + /** + * set params + * + * @param params the params to set + */ + public void setParams(Map params) { + this.params = params; + } + +} diff --git a/inlong-common/src/main/java/org/apache/inlong/commons/pojo/dataproxy/RepositoryTimerTask.java b/inlong-common/src/main/java/org/apache/inlong/commons/pojo/dataproxy/RepositoryTimerTask.java new file mode 100644 index 00000000000..1c20b94f0fb --- /dev/null +++ b/inlong-common/src/main/java/org/apache/inlong/commons/pojo/dataproxy/RepositoryTimerTask.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.commons.pojo.dataproxy; + +import java.util.TimerTask; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * RepositoryTimerTask + */ +public class RepositoryTimerTask extends TimerTask { + + private static final Logger LOGGER = LoggerFactory.getLogger(RepositoryTimerTask.class); + private T repository; + + public RepositoryTimerTask(T repository) { + this.repository = repository; + } + + @Override + public void run() { + try { + repository.reload(); + } catch (Throwable e) { + LOGGER.error(e.getMessage(), e); + } + } +} diff --git a/inlong-manager/doc/sql/apache_inlong_manager.sql b/inlong-manager/doc/sql/apache_inlong_manager.sql index 7226f0c5ce4..ed55f59b8a0 100644 --- a/inlong-manager/doc/sql/apache_inlong_manager.sql +++ b/inlong-manager/doc/sql/apache_inlong_manager.sql @@ -845,4 +845,225 @@ CREATE TABLE `wf_task_instance` AUTO_INCREMENT = 704 DEFAULT CHARSET = utf8mb4 COMMENT ='Task instance'; +-- ---------------------------- +-- Table structure for cluster_set +-- ---------------------------- +DROP TABLE IF EXISTS `cluster_set`; +CREATE TABLE `cluster_set` +( + `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key', + `set_name` varchar(128) NOT NULL COMMENT 'ClusterSet name, English, numbers and underscore', + `cn_name` varchar(256) DEFAULT NULL COMMENT 'Chinese display name', + `description` varchar(256) DEFAULT NULL COMMENT 'ClusterSet Introduction', + `middleware_type` varchar(10) DEFAULT 'Pulsar' COMMENT 'The middleware type of data storage, high throughput: Pulsar', + `in_charges` varchar(512) DEFAULT NULL COMMENT 'Name of responsible person, separated by commas', + `followers` varchar(512) DEFAULT NULL COMMENT 'List of names of business followers, separated by commas', + `status` int(11) DEFAULT '21' COMMENT 'ClusterSet status', + `is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted', + `creator` varchar(64) DEFAULT NULL COMMENT 'creator name', + `modifier` varchar(64) DEFAULT NULL COMMENT 'modifier name', + `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'create time', + `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'modify time', + PRIMARY KEY (`id`), + UNIQUE KEY `unique_cluster_set` (`set_name`) +) ENGINE = InnoDB + AUTO_INCREMENT = 16 + DEFAULT CHARSET = utf8mb4 COMMENT ='ClusterSet table'; + +-- ---------------------------- +-- Table structure for cluster_set_inlongid +-- ---------------------------- +DROP TABLE IF EXISTS `cluster_set_inlongid`; +CREATE TABLE `cluster_set_inlongid` +( + `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key', + `set_name` varchar(128) NOT NULL COMMENT 'ClusterSet name, English, numbers and underscore', + `business_identifier` varchar(128) NOT NULL COMMENT 'Business identifier, filled in by the user, undeleted ones cannot be repeated', + PRIMARY KEY (`id`), + UNIQUE KEY `unique_cluster_set_inlongid` (`set_name`,`business_identifier`) +) ENGINE = InnoDB + AUTO_INCREMENT = 16 + DEFAULT CHARSET = utf8mb4 COMMENT ='InlongId table'; + +-- ---------------------------- +-- Table structure for cache_cluster +-- ---------------------------- +DROP TABLE IF EXISTS `cache_cluster`; +CREATE TABLE `cache_cluster` +( + `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key', + `cluster_name` varchar(128) NOT NULL COMMENT 'CacheCluster name, English, numbers and underscore', + `set_name` varchar(128) NOT NULL COMMENT 'ClusterSet name, English, numbers and underscore', + `zone` varchar(128) NOT NULL COMMENT 'Zone, sz/sh/tj', + PRIMARY KEY (`id`), + UNIQUE KEY `unique_cache_cluster` (`cluster_name`) +) ENGINE = InnoDB + AUTO_INCREMENT = 16 + DEFAULT CHARSET = utf8mb4 COMMENT ='CacheCluster table'; + +-- ---------------------------- +-- Table structure for cache_cluster_ext +-- ---------------------------- +DROP TABLE IF EXISTS `cache_cluster_ext`; +CREATE TABLE `cache_cluster_ext` +( + `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key', + `cluster_name` varchar(128) NOT NULL COMMENT 'CacheCluster name, English, numbers and underscore', + `key_name` varchar(64) NOT NULL COMMENT 'Configuration item name', + `key_value` varchar(256) DEFAULT NULL COMMENT 'The value of the configuration item', + `is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted', + `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'modify time', + PRIMARY KEY (`id`), + KEY `index_cache_cluster` (`cluster_name`) +) ENGINE = InnoDB + DEFAULT CHARSET = utf8mb4 COMMENT ='CacheCluster extension table'; + +-- ---------------------------- +-- Table structure for cache_topic +-- ---------------------------- +DROP TABLE IF EXISTS `cache_topic`; +CREATE TABLE `cache_topic` +( + `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key', + `topic_name` varchar(128) NOT NULL COMMENT 'Topic name, English, numbers and underscore', + `set_name` varchar(128) NOT NULL COMMENT 'ClusterSet name, English, numbers and underscore', + `partition_num` int(11) NOT NULL COMMENT 'Partition number', + PRIMARY KEY (`id`), + UNIQUE KEY `unique_cache_topic` (`topic_name`,`set_name`) +) ENGINE = InnoDB + AUTO_INCREMENT = 16 + DEFAULT CHARSET = utf8mb4 COMMENT ='CacheTopic table'; + +-- ---------------------------- +-- Table structure for proxy_cluster +-- ---------------------------- +DROP TABLE IF EXISTS `proxy_cluster`; +CREATE TABLE `proxy_cluster` +( + `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key', + `cluster_name` varchar(128) NOT NULL COMMENT 'ProxyCluster name, English, numbers and underscore', + `set_name` varchar(128) NOT NULL COMMENT 'ClusterSet name, English, numbers and underscore', + `zone` varchar(128) NOT NULL COMMENT 'Zone, sz/sh/tj', + PRIMARY KEY (`id`), + UNIQUE KEY `unique_proxy_cluster` (`cluster_name`,`set_name`) +) ENGINE = InnoDB + AUTO_INCREMENT = 16 + DEFAULT CHARSET = utf8mb4 COMMENT ='ProxyCluster table'; + +-- ---------------------------- +-- Table structure for proxy_cluster_to_cache_cluster +-- ---------------------------- +DROP TABLE IF EXISTS `proxy_cluster_to_cache_cluster`; +CREATE TABLE `proxy_cluster_to_cache_cluster` +( + `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key', + `proxy_cluster_name` varchar(128) NOT NULL COMMENT 'ProxyCluster name, English, numbers and underscore', + `cache_cluster_name` varchar(128) NOT NULL COMMENT 'CacheCluster name, English, numbers and underscore', + PRIMARY KEY (`id`), + UNIQUE KEY `unique_proxy_cluster_to_cache_cluster` (`proxy_cluster_name`,`cache_cluster_name`) +) ENGINE = InnoDB + DEFAULT CHARSET = utf8mb4 COMMENT ='The relation table of ProxyCluster and CacheCluster'; + +-- ---------------------------- +-- Table structure for flume_source +-- ---------------------------- +DROP TABLE IF EXISTS `flume_source`; +CREATE TABLE `flume_source` +( + `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key', + `source_name` varchar(128) NOT NULL COMMENT 'FlumeSource name, English, numbers and underscore', + `set_name` varchar(128) NOT NULL COMMENT 'ClusterSet name, English, numbers and underscore', + `type` varchar(128) NOT NULL COMMENT 'FlumeSource classname', + `channels` varchar(128) NOT NULL COMMENT 'The channels of FlumeSource, separated by space', + `selector_type` varchar(128) NOT NULL COMMENT 'FlumeSource channel selector classname', + PRIMARY KEY (`id`), + UNIQUE KEY `unique_flume_source` (`source_name`,`set_name`) +) ENGINE = InnoDB + DEFAULT CHARSET = utf8mb4 COMMENT ='FlumeSource table'; + +-- ---------------------------- +-- Table structure for flume_source_ext +-- ---------------------------- +DROP TABLE IF EXISTS `flume_source_ext`; +CREATE TABLE `flume_source_ext` +( + `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key', + `parent_name` varchar(128) NOT NULL COMMENT 'FlumeSource name, English, numbers and underscore', + `set_name` varchar(128) NOT NULL COMMENT 'ClusterSet name, English, numbers and underscore', + `key_name` varchar(64) NOT NULL COMMENT 'Configuration item name', + `key_value` varchar(256) DEFAULT NULL COMMENT 'The value of the configuration item', + `is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted', + `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'modify time', + PRIMARY KEY (`id`), + KEY `index_flume_source_ext` (`parent_name`) +) ENGINE = InnoDB + DEFAULT CHARSET = utf8mb4 COMMENT ='FlumeSource extension table'; + +-- ---------------------------- +-- Table structure for flume_channel +-- ---------------------------- +DROP TABLE IF EXISTS `flume_channel`; +CREATE TABLE `flume_channel` +( + `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key', + `channel_name` varchar(128) NOT NULL COMMENT 'FlumeChannel name, English, numbers and underscore', + `set_name` varchar(128) NOT NULL COMMENT 'ClusterSet name, English, numbers and underscore', + `type` varchar(128) NOT NULL COMMENT 'FlumeChannel classname', + PRIMARY KEY (`id`), + UNIQUE KEY `unique_flume_channel` (`channel_name`,`set_name`) +) ENGINE = InnoDB + DEFAULT CHARSET = utf8mb4 COMMENT ='FlumeChannel table'; + +-- ---------------------------- +-- Table structure for flume_channel_ext +-- ---------------------------- +DROP TABLE IF EXISTS `flume_channel_ext`; +CREATE TABLE `flume_channel_ext` +( + `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key', + `parent_name` varchar(128) NOT NULL COMMENT 'FlumeChannel name, English, numbers and underscore', + `set_name` varchar(128) NOT NULL COMMENT 'ClusterSet name, English, numbers and underscore', + `key_name` varchar(64) NOT NULL COMMENT 'Configuration item name', + `key_value` varchar(256) DEFAULT NULL COMMENT 'The value of the configuration item', + `is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted', + `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'modify time', + PRIMARY KEY (`id`), + KEY `index_flume_channel_ext` (`parent_name`) +) ENGINE = InnoDB + DEFAULT CHARSET = utf8mb4 COMMENT ='FlumeChannel extension table'; + +-- ---------------------------- +-- Table structure for flume_sink +-- ---------------------------- +DROP TABLE IF EXISTS `flume_sink`; +CREATE TABLE `flume_sink` +( + `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key', + `sink_name` varchar(128) NOT NULL COMMENT 'FlumeSink name, English, numbers and underscore', + `set_name` varchar(128) NOT NULL COMMENT 'ClusterSet name, English, numbers and underscore', + `type` varchar(128) NOT NULL COMMENT 'FlumeSink classname', + `channel` varchar(128) NOT NULL COMMENT 'FlumeSink channel', + PRIMARY KEY (`id`), + UNIQUE KEY `unique_flume_sink` (`sink_name`,`set_name`) +) ENGINE = InnoDB + DEFAULT CHARSET = utf8mb4 COMMENT ='FlumeSink table'; + +-- ---------------------------- +-- Table structure for flume_sink_ext +-- ---------------------------- +DROP TABLE IF EXISTS `flume_sink_ext`; +CREATE TABLE `flume_sink_ext` +( + `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key', + `parent_name` varchar(128) NOT NULL COMMENT 'FlumeSink name, English, numbers and underscore', + `set_name` varchar(128) NOT NULL COMMENT 'ClusterSet name, English, numbers and underscore', + `key_name` varchar(64) NOT NULL COMMENT 'Configuration item name', + `key_value` varchar(256) DEFAULT NULL COMMENT 'The value of the configuration item', + `is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted', + `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'modify time', + PRIMARY KEY (`id`), + KEY `index_flume_sink_ext` (`parent_name`) +) ENGINE = InnoDB + DEFAULT CHARSET = utf8mb4 COMMENT ='FlumeSink extension table'; + SET FOREIGN_KEY_CHECKS = 1; diff --git a/inlong-manager/manager-common/pom.xml b/inlong-manager/manager-common/pom.xml index ec8b1f7d256..84f16a4a82e 100644 --- a/inlong-manager/manager-common/pom.xml +++ b/inlong-manager/manager-common/pom.xml @@ -153,6 +153,11 @@ + + org.apache.inlong + inlong-common + ${project.version} + diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataproxy/DataProxyClusterSet.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataproxy/DataProxyClusterSet.java new file mode 100644 index 00000000000..8076016d7be --- /dev/null +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataproxy/DataProxyClusterSet.java @@ -0,0 +1,268 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.common.pojo.dataproxy; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.inlong.commons.pojo.dataproxy.CacheClusterSetObject; +import org.apache.inlong.commons.pojo.dataproxy.InLongIdObject; +import org.apache.inlong.commons.pojo.dataproxy.ProxyChannel; +import org.apache.inlong.commons.pojo.dataproxy.ProxyClusterObject; +import org.apache.inlong.commons.pojo.dataproxy.ProxySink; +import org.apache.inlong.commons.pojo.dataproxy.ProxySource; + +/** + * DataProxyClusterSet + */ +public class DataProxyClusterSet { + + private String setName; + private CacheClusterSetObject cacheClusterSet = new CacheClusterSetObject(); + private List proxyClusterList = new ArrayList<>(); + private Map proxyChannelMap = new HashMap<>(); + private Map proxySourceMap = new HashMap<>(); + private Map proxySinkMap = new HashMap<>(); + private List inlongIds = new ArrayList<>(); + private Map> proxy2Cache = new HashMap<>(); + // + private String defaultConfigJson; + // Map + private Map proxyConfigJson = new HashMap<>(); + // Map + private Map md5Map = new HashMap<>(); + + /** + * get setName + * + * @return the setName + */ + public String getSetName() { + return setName; + } + + /** + * set setName + * + * @param setName the setName to set + */ + public void setSetName(String setName) { + this.setName = setName; + } + + /** + * get cacheClusterSet + * + * @return the cacheClusterSet + */ + public CacheClusterSetObject getCacheClusterSet() { + return cacheClusterSet; + } + + /** + * set cacheClusterSet + * + * @param cacheClusterSet the cacheClusterSet to set + */ + public void setCacheClusterSet(CacheClusterSetObject cacheClusterSet) { + this.cacheClusterSet = cacheClusterSet; + } + + /** + * get proxyClusterList + * + * @return the proxyClusterList + */ + public List getProxyClusterList() { + return proxyClusterList; + } + + /** + * set proxyClusterList + * + * @param proxyClusterList the proxyClusterList to set + */ + public void setProxyClusterList(List proxyClusterList) { + this.proxyClusterList = proxyClusterList; + } + + /** + * get proxyChannelMap + * + * @return the proxyChannelMap + */ + public Map getProxyChannelMap() { + return proxyChannelMap; + } + + /** + * set proxyChannelMap + * + * @param proxyChannelMap the proxyChannelMap to set + */ + public void setProxyChannelMap(Map proxyChannelMap) { + this.proxyChannelMap = proxyChannelMap; + } + + /** + * get defaultConfigJson + * + * @return the defaultConfigJson + */ + public String getDefaultConfigJson() { + return defaultConfigJson; + } + + /** + * set defaultConfigJson + * + * @param defaultConfigJson the defaultConfigJson to set + */ + public void setDefaultConfigJson(String defaultConfigJson) { + this.defaultConfigJson = defaultConfigJson; + } + + /** + * get proxySourceMap + * + * @return the proxySourceMap + */ + public Map getProxySourceMap() { + return proxySourceMap; + } + + /** + * set proxySourceMap + * + * @param proxySourceMap the proxySourceMap to set + */ + public void setProxySourceMap(Map proxySourceMap) { + this.proxySourceMap = proxySourceMap; + } + + /** + * get proxySinkMap + * + * @return the proxySinkMap + */ + public Map getProxySinkMap() { + return proxySinkMap; + } + + /** + * set proxySinkMap + * + * @param proxySinkMap the proxySinkMap to set + */ + public void setProxySinkMap(Map proxySinkMap) { + this.proxySinkMap = proxySinkMap; + } + + /** + * get inlongIds + * + * @return the inlongIds + */ + public List getInlongIds() { + return inlongIds; + } + + /** + * set inlongIds + * + * @param inlongIds the inlongIds to set + */ + public void setInlongIds(List inlongIds) { + this.inlongIds = inlongIds; + } + + /** + * get proxy2Cache + * + * @return the proxy2Cache + */ + public Map> getProxy2Cache() { + return proxy2Cache; + } + + /** + * set proxy2Cache + * + * @param proxy2Cache the proxy2Cache to set + */ + public void setProxy2Cache(Map> proxy2Cache) { + this.proxy2Cache = proxy2Cache; + } + + /** + * + * addProxy2Cache + * + * @param proxyClusterName + * @param cacheClusterName + */ + public void addProxy2Cache(String proxyClusterName, String cacheClusterName) { + Set cacheNameSet = this.proxy2Cache.get(proxyClusterName); + if (cacheNameSet == null) { + cacheNameSet = new HashSet<>(); + this.proxy2Cache.put(proxyClusterName, cacheNameSet); + } + cacheNameSet.add(cacheClusterName); + } + + /** + * get proxyConfigJson + * + * @return the proxyConfigJson + */ + public Map getProxyConfigJson() { + return proxyConfigJson; + } + + /** + * set proxyConfigJson + * + * @param proxyConfigJson the proxyConfigJson to set + */ + public void setProxyConfigJson(Map proxyConfigJson) { + this.proxyConfigJson = proxyConfigJson; + } + + /** + * get md5Map + * + * @return the md5Map + */ + public Map getMd5Map() { + return md5Map; + } + + /** + * set md5Map + * + * @param md5Map the md5Map to set + */ + public void setMd5Map(Map md5Map) { + this.md5Map = md5Map; + } + +} diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/CacheCluster.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/CacheCluster.java new file mode 100644 index 00000000000..9aec0c6e0e9 --- /dev/null +++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/CacheCluster.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.dao.entity; + +/** + * CacheCluster + */ +public class CacheCluster { + private String clusterName; + private String setName; + private String zone; + + /** + * get clusterName + * + * @return the clusterName + */ + public String getClusterName() { + return clusterName; + } + + /** + * set clusterName + * + * @param clusterName the clusterName to set + */ + public void setClusterName(String clusterName) { + this.clusterName = clusterName; + } + + /** + * get setName + * + * @return the setName + */ + public String getSetName() { + return setName; + } + + /** + * set setName + * + * @param setName the setName to set + */ + public void setSetName(String setName) { + this.setName = setName; + } + + /** + * get zone + * + * @return the zone + */ + public String getZone() { + return zone; + } + + /** + * set zone + * + * @param zone the zone to set + */ + public void setZone(String zone) { + this.zone = zone; + } + +} diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/CacheClusterExt.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/CacheClusterExt.java new file mode 100644 index 00000000000..5c89df8df0f --- /dev/null +++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/CacheClusterExt.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.dao.entity; + +/** + * CacheClusterExt + */ +public class CacheClusterExt { + + private String clusterName; + private String keyName; + private String keyValue; + private Integer isDeleted; + + /** + * get clusterName + * + * @return the clusterName + */ + public String getClusterName() { + return clusterName; + } + + /** + * set clusterName + * + * @param clusterName the clusterName to set + */ + public void setClusterName(String clusterName) { + this.clusterName = clusterName; + } + + /** + * get keyName + * + * @return the keyName + */ + public String getKeyName() { + return keyName; + } + + /** + * set keyName + * + * @param keyName the keyName to set + */ + public void setKeyName(String keyName) { + this.keyName = keyName; + } + + /** + * get keyValue + * + * @return the keyValue + */ + public String getKeyValue() { + return keyValue; + } + + /** + * set keyValue + * + * @param keyValue the keyValue to set + */ + public void setKeyValue(String keyValue) { + this.keyValue = keyValue; + } + + /** + * getJIsDeleted + * + * @return + */ + public Integer getJIsDeleted() { + return isDeleted; + } + + /** + * setIsDeleted + * + * @param isDeleted + */ + public void setIsDeleted(Integer isDeleted) { + this.isDeleted = isDeleted; + } + +} diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/CacheTopic.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/CacheTopic.java new file mode 100644 index 00000000000..26b5bd1efde --- /dev/null +++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/CacheTopic.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.dao.entity; + +/** + * CacheTopic + */ +public class CacheTopic { + private String topicName; + private String setName; + private Integer partitionNum; + + /** + * get topicName + * + * @return the topicName + */ + public String getTopicName() { + return topicName; + } + + /** + * set topicName + * + * @param topicName the topicName to set + */ + public void setTopicName(String topicName) { + this.topicName = topicName; + } + + /** + * get setName + * + * @return the setName + */ + public String getSetName() { + return setName; + } + + /** + * set setName + * + * @param setName the setName to set + */ + public void setSetName(String setName) { + this.setName = setName; + } + + /** + * get partitionNum + * + * @return the partitionNum + */ + public Integer getPartitionNum() { + return partitionNum; + } + + /** + * set partitionNum + * + * @param partitionNum the partitionNum to set + */ + public void setPartitionNum(Integer partitionNum) { + this.partitionNum = partitionNum; + } + +} diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ClusterSet.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ClusterSet.java new file mode 100644 index 00000000000..9344ebfd691 --- /dev/null +++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ClusterSet.java @@ -0,0 +1,215 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.dao.entity; + +/** + * ClusterSet + */ +public class ClusterSet { + private String setName; + private String cnName; + private String description; + private String middlewareType; + private String inCharges; + private String followers; + private Integer status; + private Integer isDeleted; + private String creator; + private String modifier; + + /** + * get setName + * + * @return the setName + */ + public String getSetName() { + return setName; + } + + /** + * set setName + * + * @param setName the setName to set + */ + public void setSetName(String setName) { + this.setName = setName; + } + + /** + * get cnName + * + * @return the cnName + */ + public String getCnName() { + return cnName; + } + + /** + * set cnName + * + * @param cnName the cnName to set + */ + public void setCnName(String cnName) { + this.cnName = cnName; + } + + /** + * get description + * + * @return the description + */ + public String getDescription() { + return description; + } + + /** + * set description + * + * @param description the description to set + */ + public void setDescription(String description) { + this.description = description; + } + + /** + * get middlewareType + * + * @return the middlewareType + */ + public String getMiddlewareType() { + return middlewareType; + } + + /** + * set middlewareType + * + * @param middlewareType the middlewareType to set + */ + public void setMiddlewareType(String middlewareType) { + this.middlewareType = middlewareType; + } + + /** + * get inCharges + * + * @return the inCharges + */ + public String getInCharges() { + return inCharges; + } + + /** + * set inCharges + * + * @param inCharges the inCharges to set + */ + public void setInCharges(String inCharges) { + this.inCharges = inCharges; + } + + /** + * get followers + * + * @return the followers + */ + public String getFollowers() { + return followers; + } + + /** + * set followers + * + * @param followers the followers to set + */ + public void setFollowers(String followers) { + this.followers = followers; + } + + /** + * get status + * + * @return the status + */ + public Integer getStatus() { + return status; + } + + /** + * set status + * + * @param status the status to set + */ + public void setStatus(Integer status) { + this.status = status; + } + + /** + * get isDeleted + * + * @return the isDeleted + */ + public Integer getIsDeleted() { + return isDeleted; + } + + /** + * set isDeleted + * + * @param isDeleted the isDeleted to set + */ + public void setIsDeleted(Integer isDeleted) { + this.isDeleted = isDeleted; + } + + /** + * get creator + * + * @return the creator + */ + public String getCreator() { + return creator; + } + + /** + * set creator + * + * @param creator the creator to set + */ + public void setCreator(String creator) { + this.creator = creator; + } + + /** + * get modifier + * + * @return the modifier + */ + public String getModifier() { + return modifier; + } + + /** + * set modifier + * + * @param modifier the modifier to set + */ + public void setModifier(String modifier) { + this.modifier = modifier; + } + +} diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataproxy/DataProxyConfig.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/DataProxyConfig.java similarity index 94% rename from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataproxy/DataProxyConfig.java rename to inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/DataProxyConfig.java index 9d84d55091f..8a5a6c54b82 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataproxy/DataProxyConfig.java +++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/DataProxyConfig.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.inlong.manager.common.pojo.dataproxy; +package org.apache.inlong.manager.dao.entity; import lombok.Data; diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/FlumeChannel.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/FlumeChannel.java new file mode 100644 index 00000000000..1b19ff816b3 --- /dev/null +++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/FlumeChannel.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.dao.entity; + +/** + * FlumeChannel + */ +public class FlumeChannel { + private String channelName; + private String setName; + private String type; + + /** + * get channelName + * + * @return the channelName + */ + public String getChannelName() { + return channelName; + } + + /** + * set channelName + * + * @param channelName the channelName to set + */ + public void setChannelName(String channelName) { + this.channelName = channelName; + } + + /** + * get setName + * + * @return the setName + */ + public String getSetName() { + return setName; + } + + /** + * set setName + * + * @param setName the setName to set + */ + public void setSetName(String setName) { + this.setName = setName; + } + + /** + * get type + * + * @return the type + */ + public String getType() { + return type; + } + + /** + * set type + * + * @param type the type to set + */ + public void setType(String type) { + this.type = type; + } + +} diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/FlumeChannelExt.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/FlumeChannelExt.java new file mode 100644 index 00000000000..4fa5d7d11d9 --- /dev/null +++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/FlumeChannelExt.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.dao.entity; + +/** + * FlumeChannelExt + */ +public class FlumeChannelExt { + private String parentName; + private String setName; + private String keyName; + private String keyValue; + private Integer isDeleted; + + /** + * get parentName + * + * @return the parentName + */ + public String getParentName() { + return parentName; + } + + /** + * set parentName + * + * @param parentName the parentName to set + */ + public void setParentName(String parentName) { + this.parentName = parentName; + } + + /** + * get setName + * + * @return the setName + */ + public String getSetName() { + return setName; + } + + /** + * set setName + * + * @param setName the setName to set + */ + public void setSetName(String setName) { + this.setName = setName; + } + + /** + * get keyName + * + * @return the keyName + */ + public String getKeyName() { + return keyName; + } + + /** + * set keyName + * + * @param keyName the keyName to set + */ + public void setKeyName(String keyName) { + this.keyName = keyName; + } + + /** + * get keyValue + * + * @return the keyValue + */ + public String getKeyValue() { + return keyValue; + } + + /** + * set keyValue + * + * @param keyValue the keyValue to set + */ + public void setKeyValue(String keyValue) { + this.keyValue = keyValue; + } + + /** + * get isDeleted + * + * @return the isDeleted + */ + public Integer getIsDeleted() { + return isDeleted; + } + + /** + * set isDeleted + * + * @param isDeleted the isDeleted to set + */ + public void setIsDeleted(Integer isDeleted) { + this.isDeleted = isDeleted; + } + +} diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/FlumeSink.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/FlumeSink.java new file mode 100644 index 00000000000..54c53697908 --- /dev/null +++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/FlumeSink.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.dao.entity; + +/** + * FlumeSink + */ +public class FlumeSink { + private String sinkName; + private String setName; + private String type; + private String channel; + + /** + * get sinkName + * + * @return the sinkName + */ + public String getSinkName() { + return sinkName; + } + + /** + * set sinkName + * + * @param sinkName the sinkName to set + */ + public void setSinkName(String sinkName) { + this.sinkName = sinkName; + } + + /** + * get setName + * + * @return the setName + */ + public String getSetName() { + return setName; + } + + /** + * set setName + * + * @param setName the setName to set + */ + public void setSetName(String setName) { + this.setName = setName; + } + + /** + * get type + * + * @return the type + */ + public String getType() { + return type; + } + + /** + * set type + * + * @param type the type to set + */ + public void setType(String type) { + this.type = type; + } + + /** + * get channel + * + * @return the channel + */ + public String getChannel() { + return channel; + } + + /** + * set channel + * + * @param channel the channel to set + */ + public void setChannel(String channel) { + this.channel = channel; + } + +} diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/FlumeSinkExt.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/FlumeSinkExt.java new file mode 100644 index 00000000000..8022da8db0a --- /dev/null +++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/FlumeSinkExt.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.dao.entity; + +/** + * FlumeSinkExt + */ +public class FlumeSinkExt { + private String parentName; + private String setName; + private String keyName; + private String keyValue; + private Integer isDeleted; + + /** + * get parentName + * + * @return the parentName + */ + public String getParentName() { + return parentName; + } + + /** + * set parentName + * + * @param parentName the parentName to set + */ + public void setParentName(String parentName) { + this.parentName = parentName; + } + + /** + * get setName + * + * @return the setName + */ + public String getSetName() { + return setName; + } + + /** + * set setName + * + * @param setName the setName to set + */ + public void setSetName(String setName) { + this.setName = setName; + } + + /** + * get keyName + * + * @return the keyName + */ + public String getKeyName() { + return keyName; + } + + /** + * set keyName + * + * @param keyName the keyName to set + */ + public void setKeyName(String keyName) { + this.keyName = keyName; + } + + /** + * get keyValue + * + * @return the keyValue + */ + public String getKeyValue() { + return keyValue; + } + + /** + * set keyValue + * + * @param keyValue the keyValue to set + */ + public void setKeyValue(String keyValue) { + this.keyValue = keyValue; + } + + /** + * get isDeleted + * + * @return the isDeleted + */ + public Integer getIsDeleted() { + return isDeleted; + } + + /** + * set isDeleted + * + * @param isDeleted the isDeleted to set + */ + public void setIsDeleted(Integer isDeleted) { + this.isDeleted = isDeleted; + } + +} diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/FlumeSource.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/FlumeSource.java new file mode 100644 index 00000000000..0afb4a7983f --- /dev/null +++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/FlumeSource.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.dao.entity; + +/** + * FlumeSource + */ +public class FlumeSource { + private String sourceName; + private String setName; + private String type; + private String channels; + private String selectorType; + + /** + * get sourceName + * + * @return the sourceName + */ + public String getSourceName() { + return sourceName; + } + + /** + * set sourceName + * + * @param sourceName the sourceName to set + */ + public void setSourceName(String sourceName) { + this.sourceName = sourceName; + } + + /** + * get setName + * + * @return the setName + */ + public String getSetName() { + return setName; + } + + /** + * set setName + * + * @param setName the setName to set + */ + public void setSetName(String setName) { + this.setName = setName; + } + + /** + * get type + * + * @return the type + */ + public String getType() { + return type; + } + + /** + * set type + * + * @param type the type to set + */ + public void setType(String type) { + this.type = type; + } + + /** + * get channels + * + * @return the channels + */ + public String getChannels() { + return channels; + } + + /** + * set channels + * + * @param channels the channels to set + */ + public void setChannels(String channels) { + this.channels = channels; + } + + /** + * get selectorType + * + * @return the selectorType + */ + public String getSelectorType() { + return selectorType; + } + + /** + * set selectorType + * + * @param selectorType the selectorType to set + */ + public void setSelectorType(String selectorType) { + this.selectorType = selectorType; + } + +} diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/FlumeSourceExt.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/FlumeSourceExt.java new file mode 100644 index 00000000000..5891e00f6a7 --- /dev/null +++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/FlumeSourceExt.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.dao.entity; + +/** + * FlumeSourceExt + */ +public class FlumeSourceExt { + private String parentName; + private String setName; + private String keyName; + private String keyValue; + private Integer isDeleted; + + /** + * get parentName + * + * @return the parentName + */ + public String getParentName() { + return parentName; + } + + /** + * set parentName + * + * @param parentName the parentName to set + */ + public void setParentName(String parentName) { + this.parentName = parentName; + } + + /** + * get setName + * + * @return the setName + */ + public String getSetName() { + return setName; + } + + /** + * set setName + * + * @param setName the setName to set + */ + public void setSetName(String setName) { + this.setName = setName; + } + + /** + * get keyName + * + * @return the keyName + */ + public String getKeyName() { + return keyName; + } + + /** + * set keyName + * + * @param keyName the keyName to set + */ + public void setKeyName(String keyName) { + this.keyName = keyName; + } + + /** + * get keyValue + * + * @return the keyValue + */ + public String getKeyValue() { + return keyValue; + } + + /** + * set keyValue + * + * @param keyValue the keyValue to set + */ + public void setKeyValue(String keyValue) { + this.keyValue = keyValue; + } + + /** + * get isDeleted + * + * @return the isDeleted + */ + public Integer getIsDeleted() { + return isDeleted; + } + + /** + * set isDeleted + * + * @param isDeleted the isDeleted to set + */ + public void setIsDeleted(Integer isDeleted) { + this.isDeleted = isDeleted; + } + +} diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InLongId.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InLongId.java new file mode 100644 index 00000000000..0de815720af --- /dev/null +++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InLongId.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.dao.entity; + +/** + * InLongId + */ +public class InLongId { + private String inlongId; + private String topic; + private String params; + private String setName; + + /** + * get inlongId + * + * @return the inlongId + */ + public String getInlongId() { + return inlongId; + } + + /** + * set inlongId + * + * @param inlongId the inlongId to set + */ + public void setInlongId(String inlongId) { + this.inlongId = inlongId; + } + + /** + * get topic + * + * @return the topic + */ + public String getTopic() { + return topic; + } + + /** + * set topic + * + * @param topic the topic to set + */ + public void setTopic(String topic) { + this.topic = topic; + } + + /** + * get params + * + * @return the params + */ + public String getParams() { + return params; + } + + /** + * set params + * + * @param params the params to set + */ + public void setParams(String params) { + this.params = params; + } + + /** + * get setName + * + * @return the setName + */ + public String getSetName() { + return setName; + } + + /** + * set setName + * + * @param setName the setName to set + */ + public void setSetName(String setName) { + this.setName = setName; + } + +} diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ProxyCluster.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ProxyCluster.java new file mode 100644 index 00000000000..f1c0660ab25 --- /dev/null +++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ProxyCluster.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.dao.entity; + +/** + * ProxyCluster + */ +public class ProxyCluster { + private String clusterName; + private String setName; + private String zone; + + /** + * get clusterName + * + * @return the clusterName + */ + public String getClusterName() { + return clusterName; + } + + /** + * set clusterName + * + * @param clusterName the clusterName to set + */ + public void setClusterName(String clusterName) { + this.clusterName = clusterName; + } + + /** + * get setName + * + * @return the setName + */ + public String getSetName() { + return setName; + } + + /** + * set setName + * + * @param setName the setName to set + */ + public void setSetName(String setName) { + this.setName = setName; + } + + /** + * get zone + * + * @return the zone + */ + public String getZone() { + return zone; + } + + /** + * set zone + * + * @param zone the zone to set + */ + public void setZone(String zone) { + this.zone = zone; + } + +} diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ProxyClusterToCacheCluster.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ProxyClusterToCacheCluster.java new file mode 100644 index 00000000000..73b71c78ce3 --- /dev/null +++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ProxyClusterToCacheCluster.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.dao.entity; + +/** + * ProxyClusterToCacheCluster + */ +public class ProxyClusterToCacheCluster { + private String proxyClusterName; + private String cacheClusterName; + + /** + * get proxyClusterName + * + * @return the proxyClusterName + */ + public String getProxyClusterName() { + return proxyClusterName; + } + + /** + * set proxyClusterName + * + * @param proxyClusterName the proxyClusterName to set + */ + public void setProxyClusterName(String proxyClusterName) { + this.proxyClusterName = proxyClusterName; + } + + /** + * get cacheClusterName + * + * @return the cacheClusterName + */ + public String getCacheClusterName() { + return cacheClusterName; + } + + /** + * set cacheClusterName + * + * @param cacheClusterName the cacheClusterName to set + */ + public void setCacheClusterName(String cacheClusterName) { + this.cacheClusterName = cacheClusterName; + } + +} diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/BusinessEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/BusinessEntityMapper.java index a03c19be198..83431e99ac5 100644 --- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/BusinessEntityMapper.java +++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/BusinessEntityMapper.java @@ -21,8 +21,8 @@ import java.util.Map; import org.apache.ibatis.annotations.Param; import org.apache.inlong.manager.common.pojo.business.BusinessPageRequest; -import org.apache.inlong.manager.common.pojo.dataproxy.DataProxyConfig; import org.apache.inlong.manager.dao.entity.BusinessEntity; +import org.apache.inlong.manager.dao.entity.DataProxyConfig; import org.springframework.stereotype.Repository; @Repository diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ClusterSetMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ClusterSetMapper.java new file mode 100644 index 00000000000..74ec7153a1f --- /dev/null +++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ClusterSetMapper.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.dao.mapper; + +import java.util.List; + +import org.apache.inlong.manager.dao.entity.CacheCluster; +import org.apache.inlong.manager.dao.entity.CacheClusterExt; +import org.apache.inlong.manager.dao.entity.CacheTopic; +import org.apache.inlong.manager.dao.entity.ClusterSet; +import org.apache.inlong.manager.dao.entity.FlumeChannel; +import org.apache.inlong.manager.dao.entity.FlumeChannelExt; +import org.apache.inlong.manager.dao.entity.FlumeSink; +import org.apache.inlong.manager.dao.entity.FlumeSinkExt; +import org.apache.inlong.manager.dao.entity.FlumeSource; +import org.apache.inlong.manager.dao.entity.FlumeSourceExt; +import org.apache.inlong.manager.dao.entity.InLongId; +import org.apache.inlong.manager.dao.entity.ProxyCluster; +import org.apache.inlong.manager.dao.entity.ProxyClusterToCacheCluster; +import org.springframework.stereotype.Repository; + +/** + * ClusterSetMapper + */ +@Repository +public interface ClusterSetMapper { + List selectClusterSet(); + + List selectInlongId(); + + List selectCacheCluster(); + + List selectCacheClusterExt(); + + List selectCacheTopic(); + + List selectProxyCluster(); + + List selectProxyClusterToCacheCluster(); + + List selectFlumeSource(); + + List selectFlumeSourceExt(); + + List selectFlumeChannel(); + + List selectFlumeChannelExt(); + + List selectFlumeSink(); + + List selectFlumeSinkExt(); + +} diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/ClusterSetMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/ClusterSetMapper.xml new file mode 100644 index 00000000000..2bb2f238dca --- /dev/null +++ b/inlong-manager/manager-dao/src/main/resources/mappers/ClusterSetMapper.xml @@ -0,0 +1,87 @@ + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/DataProxyClusterService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/DataProxyClusterService.java index 88f0d6f4f4f..2dc43df0786 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/DataProxyClusterService.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/DataProxyClusterService.java @@ -21,9 +21,9 @@ import java.util.List; import org.apache.inlong.manager.common.pojo.cluster.DataProxyClusterInfo; import org.apache.inlong.manager.common.pojo.cluster.DataProxyClusterPageRequest; -import org.apache.inlong.manager.common.pojo.dataproxy.DataProxyConfig; import org.apache.inlong.manager.common.pojo.dataproxy.DataProxyIpRequest; import org.apache.inlong.manager.common.pojo.dataproxy.DataProxyIpResponse; +import org.apache.inlong.manager.dao.entity.DataProxyConfig; /** * DataProxy cluster service layer interface @@ -34,7 +34,7 @@ public interface DataProxyClusterService { * Save DataProxy cluster information * * @param clusterInfo Cluster information - * @param operator Current operator + * @param operator Current operator * @return ID after saving */ Integer save(DataProxyClusterInfo clusterInfo, String operator); @@ -59,7 +59,7 @@ public interface DataProxyClusterService { * Change DataProxy cluster information * * @param clusterInfo The information to be modified - * @param operator Current operator + * @param operator Current operator * @return Whether succeed */ Boolean update(DataProxyClusterInfo clusterInfo, String operator); @@ -67,7 +67,7 @@ public interface DataProxyClusterService { /** * Delete DataProxy cluster information * - * @param id Cluster ID to be deleted + * @param id Cluster ID to be deleted * @param operator Current operator * @return Whether succeed */ @@ -88,4 +88,13 @@ public interface DataProxyClusterService { */ List getConfig(); + /** + * query data proxy config by cluster id + * + * @param clusterName + * @param setName + * @param md5 + * @return data proxy config + */ + String getAllConfig(String clusterName, String setName, String md5); } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DataProxyClusterServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DataProxyClusterServiceImpl.java index d2fe6d61fc7..001ecbffea8 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DataProxyClusterServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DataProxyClusterServiceImpl.java @@ -17,36 +17,43 @@ package org.apache.inlong.manager.service.core.impl; -import com.github.pagehelper.Page; -import com.github.pagehelper.PageHelper; -import com.github.pagehelper.PageInfo; import java.util.ArrayList; import java.util.Collections; import java.util.Date; import java.util.List; -import lombok.extern.slf4j.Slf4j; + import org.apache.commons.lang3.StringUtils; +import org.apache.inlong.commons.pojo.dataproxy.DataProxyConfigResponse; import org.apache.inlong.manager.common.enums.BizErrorCodeEnum; import org.apache.inlong.manager.common.enums.EntityStatus; import org.apache.inlong.manager.common.exceptions.BusinessException; import org.apache.inlong.manager.common.pojo.cluster.DataProxyClusterInfo; import org.apache.inlong.manager.common.pojo.cluster.DataProxyClusterPageRequest; -import org.apache.inlong.manager.common.pojo.dataproxy.DataProxyConfig; +import org.apache.inlong.manager.common.pojo.dataproxy.DataProxyClusterSet; import org.apache.inlong.manager.common.pojo.dataproxy.DataProxyIpRequest; import org.apache.inlong.manager.common.pojo.dataproxy.DataProxyIpResponse; import org.apache.inlong.manager.common.util.CommonBeanUtils; import org.apache.inlong.manager.common.util.Preconditions; import org.apache.inlong.manager.dao.entity.DataProxyClusterEntity; +import org.apache.inlong.manager.dao.entity.DataProxyConfig; import org.apache.inlong.manager.dao.mapper.BusinessEntityMapper; import org.apache.inlong.manager.dao.mapper.DataProxyClusterEntityMapper; import org.apache.inlong.manager.dao.mapper.SourceFileDetailEntityMapper; import org.apache.inlong.manager.service.core.DataProxyClusterService; +import org.apache.inlong.manager.service.repository.DataProxyConfigRepository; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; +import com.github.pagehelper.Page; +import com.github.pagehelper.PageHelper; +import com.github.pagehelper.PageInfo; +import com.google.gson.Gson; + +import lombok.extern.slf4j.Slf4j; + /** * DataProxy cluster service layer implementation class */ @@ -62,6 +69,8 @@ public class DataProxyClusterServiceImpl implements DataProxyClusterService { private SourceFileDetailEntityMapper sourceFileDetailMapper; @Autowired private BusinessEntityMapper businessEntityMapper; + @Autowired + private DataProxyConfigRepository proxyRepository; @Transactional(rollbackFor = Throwable.class) @Override @@ -104,9 +113,10 @@ public PageInfo listByCondition(DataProxyClusterPageReques PageHelper.startPage(request.getPageNum(), request.getPageSize()); Page entityPage = (Page) dataProxyClusterMapper .selectByCondition(request); - List clusterList = CommonBeanUtils - .copyListProperties(entityPage, DataProxyClusterInfo::new); - // Encapsulate the paging query results into the PageInfo object to obtain related paging information + List clusterList = CommonBeanUtils.copyListProperties(entityPage, + DataProxyClusterInfo::new); + // Encapsulate the paging query results into the PageInfo object to obtain + // related paging information PageInfo page = new PageInfo<>(clusterList); page.setTotal(entityPage.getTotal()); @@ -208,4 +218,42 @@ public List getConfig() { return configList; } + + /** + * query data proxy config by cluster id + * + * @param clusterName + * @param setName + * @param md5 + * @return data proxy config + */ + public String getAllConfig(String clusterName, String setName, String md5) { + DataProxyClusterSet setObj = proxyRepository.getDataProxyClusterSet(setName); + if (setObj == null) { + return this.getErrorAllConfig(); + } + String configMd5 = setObj.getMd5Map().get(clusterName); + if (configMd5 == null || !configMd5.equals(md5)) { + return this.getErrorAllConfig(); + } + String configJson = setObj.getProxyConfigJson().get(clusterName); + if (configJson == null) { + return this.getErrorAllConfig(); + } + return configJson; + } + + /** + * getErrorAllConfig + * + * @return + */ + private String getErrorAllConfig() { + DataProxyConfigResponse response = new DataProxyConfigResponse(); + response.setResult(false); + response.setErrCode(DataProxyConfigResponse.REQ_PARAMS_ERROR); + Gson gson = new Gson(); + return gson.toJson(response); + } + } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java new file mode 100644 index 00000000000..8150d25a3fe --- /dev/null +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java @@ -0,0 +1,485 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.service.repository; + +import java.util.Arrays; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.Timer; +import java.util.TimerTask; + +import org.apache.commons.codec.digest.DigestUtils; +import org.apache.inlong.commons.pojo.dataproxy.CacheClusterObject; +import org.apache.inlong.commons.pojo.dataproxy.CacheClusterSetObject; +import org.apache.inlong.commons.pojo.dataproxy.CacheTopicObject; +import org.apache.inlong.commons.pojo.dataproxy.DataProxyCluster; +import org.apache.inlong.commons.pojo.dataproxy.DataProxyConfigResponse; +import org.apache.inlong.commons.pojo.dataproxy.IRepository; +import org.apache.inlong.commons.pojo.dataproxy.InLongIdObject; +import org.apache.inlong.commons.pojo.dataproxy.ProxyChannel; +import org.apache.inlong.commons.pojo.dataproxy.ProxyClusterObject; +import org.apache.inlong.commons.pojo.dataproxy.ProxySink; +import org.apache.inlong.commons.pojo.dataproxy.ProxySource; +import org.apache.inlong.commons.pojo.dataproxy.RepositoryTimerTask; +import org.apache.inlong.manager.common.pojo.dataproxy.DataProxyClusterSet; +import org.apache.inlong.manager.dao.entity.CacheCluster; +import org.apache.inlong.manager.dao.entity.CacheClusterExt; +import org.apache.inlong.manager.dao.entity.CacheTopic; +import org.apache.inlong.manager.dao.entity.ClusterSet; +import org.apache.inlong.manager.dao.entity.FlumeChannel; +import org.apache.inlong.manager.dao.entity.FlumeChannelExt; +import org.apache.inlong.manager.dao.entity.FlumeSink; +import org.apache.inlong.manager.dao.entity.FlumeSinkExt; +import org.apache.inlong.manager.dao.entity.FlumeSource; +import org.apache.inlong.manager.dao.entity.FlumeSourceExt; +import org.apache.inlong.manager.dao.entity.InLongId; +import org.apache.inlong.manager.dao.entity.ProxyCluster; +import org.apache.inlong.manager.dao.entity.ProxyClusterToCacheCluster; +import org.apache.inlong.manager.dao.mapper.ClusterSetMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Repository; + +import com.google.common.base.Splitter; +import com.google.gson.Gson; + +/** + * DataProxyConfigRepository + */ +@Repository(value = "dataProxyConfigRepository") +public class DataProxyConfigRepository implements IRepository { + private static final Logger LOGGER = LoggerFactory.getLogger(DataProxyConfigRepository.class); + public static final Splitter.MapSplitter MAP_SPLITTER = Splitter.on(SEPARATOR).trimResults() + .withKeyValueSeparator(KEY_VALUE_SEPARATOR); + @Autowired + private ClusterSetMapper clusterSetMapper; + // Map + private Map clusterSets = new HashMap<>(); + + // Map + private Map proxyClusterMap = new HashMap<>(); + // Map + private Map cacheClusterMap = new HashMap<>(); + + private long reloadInterval; + private Timer reloadTimer; + + private Gson gson = new Gson(); + + public DataProxyConfigRepository() { + LOGGER.info("create repository for {}" + DataProxyConfigRepository.class.getSimpleName()); + try { + this.reloadInterval = DEFAULT_HEARTBEAT_INTERVAL_MS; + reload(); + setReloadTimer(); + } catch (Throwable t) { + LOGGER.error(t.getMessage(), t); + } + } + + /** + * reload + */ + public void reload() { + LOGGER.info("start to reload config."); + List setList = clusterSetMapper.selectClusterSet(); + if (setList.size() == 0) { + return; + } + + Map newClusterSets = new HashMap<>(); + for (ClusterSet set : setList) { + String setName = set.getSetName(); + DataProxyClusterSet setObj = new DataProxyClusterSet(); + setObj.setSetName(setName); + setObj.getCacheClusterSet().setSetName(setName); + setObj.getCacheClusterSet().setType(set.getMiddlewareType()); + newClusterSets.put(setName, setObj); + } + // + this.proxyClusterMap.clear(); + this.cacheClusterMap.clear(); + // + this.reloadCacheCluster(newClusterSets); + // + this.reloadCacheClusterExt(newClusterSets); + // + this.reloadCacheTopic(newClusterSets); + // + this.reloadProxyCluster(newClusterSets); + // + this.reloadFlumeChannel(newClusterSets); + // + this.reloadFlumeChannelExt(newClusterSets); + // + this.reloadFlumeSource(newClusterSets); + // + this.reloadFlumeSourceExt(newClusterSets); + // + this.reloadFlumeSink(newClusterSets); + // + this.reloadFlumeSinkExt(newClusterSets); + // reload inlongid + this.reloadInlongId(newClusterSets); + // + this.reloadProxy2Cache(newClusterSets); + // + this.generateClusterJson(newClusterSets); + + // replace + this.clusterSets = newClusterSets; + + LOGGER.info("end to reload config."); + } + + /** + * setReloadTimer + */ + private void setReloadTimer() { + reloadTimer = new Timer(true); + TimerTask task = new RepositoryTimerTask(this); + reloadTimer.schedule(task, new Date(System.currentTimeMillis() + reloadInterval), reloadInterval); + } + + /** + * get clusterSets + * + * @return the clusterSets + */ + public Map getClusterSets() { + return clusterSets; + } + + /** + * + * reloadCacheCluster + * + * @param newClusterSets + */ + private void reloadCacheCluster(Map newClusterSets) { + for (CacheCluster cacheCluster : clusterSetMapper.selectCacheCluster()) { + // + CacheClusterObject obj = new CacheClusterObject(); + obj.setName(cacheCluster.getClusterName()); + obj.setZone(cacheCluster.getZone()); + cacheClusterMap.put(obj.getName(), obj); + // + String setName = cacheCluster.getSetName(); + DataProxyClusterSet setObj = this.getOrCreateDataProxyClusterSet(newClusterSets, setName); + setObj.getCacheClusterSet().getCacheClusters().add(obj); + } + } + + /** + * getOrCreateDataProxyClusterSet + * + * @param clusterSets + * @param setName + * @return + */ + private DataProxyClusterSet getOrCreateDataProxyClusterSet(Map clusterSets, + String setName) { + DataProxyClusterSet setObj = clusterSets.get(setName); + if (setObj == null) { + setObj = new DataProxyClusterSet(); + setObj.setSetName(setName); + clusterSets.put(setName, setObj); + } + return setObj; + } + + /** + * + * reloadCacheClusterExt + * + * @param newClusterSets + */ + private void reloadCacheClusterExt(Map newClusterSets) { + for (CacheClusterExt ext : clusterSetMapper.selectCacheClusterExt()) { + String clusterName = ext.getClusterName(); + CacheClusterObject cacheClusterObject = cacheClusterMap.get(clusterName); + if (cacheClusterObject != null) { + cacheClusterObject.getParams().put(ext.getKeyName(), ext.getKeyValue()); + } + } + } + + /** + * + * reloadCacheTopic + * + * @param newClusterSets + */ + private void reloadCacheTopic(Map newClusterSets) { + for (CacheTopic cacheTopic : clusterSetMapper.selectCacheTopic()) { + // + CacheTopicObject obj = new CacheTopicObject(); + obj.setTopic(cacheTopic.getTopicName()); + obj.setPartitionNum(cacheTopic.getPartitionNum()); + // + String setName = cacheTopic.getSetName(); + DataProxyClusterSet setObj = this.getOrCreateDataProxyClusterSet(newClusterSets, setName); + setObj.getCacheClusterSet().getTopics().add(obj); + } + } + + /** + * + * reloadProxyCluster + * + * @param newClusterSets + */ + private void reloadProxyCluster(Map newClusterSets) { + for (ProxyCluster proxyCluster : clusterSetMapper.selectProxyCluster()) { + String setName = proxyCluster.getSetName(); + // + ProxyClusterObject obj = new ProxyClusterObject(); + obj.setName(proxyCluster.getClusterName()); + obj.setSetName(setName); + obj.setZone(proxyCluster.getZone()); + DataProxyClusterSet setObj = this.getOrCreateDataProxyClusterSet(newClusterSets, setName); + setObj.getProxyClusterList().add(obj); + this.proxyClusterMap.put(obj.getName(), obj); + } + } + + /** + * + * reloadFlumeChannel + * + * @param newClusterSets + */ + private void reloadFlumeChannel(Map newClusterSets) { + for (FlumeChannel flumeChannel : clusterSetMapper.selectFlumeChannel()) { + // + ProxyChannel obj = new ProxyChannel(); + obj.setName(flumeChannel.getChannelName()); + obj.setType(flumeChannel.getType()); + String setName = flumeChannel.getSetName(); + DataProxyClusterSet setObj = this.getOrCreateDataProxyClusterSet(newClusterSets, setName); + setObj.getProxyChannelMap().put(obj.getName(), obj); + } + } + + /** + * + * reloadFlumeChannelExt + * + * @param newClusterSets + */ + private void reloadFlumeChannelExt(Map newClusterSets) { + for (FlumeChannelExt ext : clusterSetMapper.selectFlumeChannelExt()) { + String setName = ext.getSetName(); + DataProxyClusterSet setObj = this.getOrCreateDataProxyClusterSet(newClusterSets, setName); + // + ProxyChannel obj = setObj.getProxyChannelMap().get(ext.getParentName()); + if (obj != null) { + obj.getParams().put(ext.getKeyName(), ext.getKeyValue()); + } + } + } + + /** + * + * reloadFlumeSource + * + * @param newClusterSets + */ + private void reloadFlumeSource(Map newClusterSets) { + for (FlumeSource flumeSource : clusterSetMapper.selectFlumeSource()) { + // + ProxySource obj = new ProxySource(); + obj.setName(flumeSource.getSourceName()); + obj.setSelectorType(flumeSource.getSelectorType()); + obj.setType(flumeSource.getType()); + // + String channels = flumeSource.getChannels(); + obj.getChannels().addAll(Arrays.asList(channels.split("\\s+"))); + // + String setName = flumeSource.getSetName(); + DataProxyClusterSet setObj = this.getOrCreateDataProxyClusterSet(newClusterSets, setName); + setObj.getProxySourceMap().put(obj.getName(), obj); + } + } + + /** + * + * reloadFlumeSourceExt + * + * @param newClusterSets + */ + private void reloadFlumeSourceExt(Map newClusterSets) { + for (FlumeSourceExt ext : clusterSetMapper.selectFlumeSourceExt()) { + String setName = ext.getSetName(); + DataProxyClusterSet setObj = this.getOrCreateDataProxyClusterSet(newClusterSets, setName); + // + ProxySource obj = setObj.getProxySourceMap().get(ext.getParentName()); + if (obj != null) { + obj.getParams().put(ext.getKeyName(), ext.getKeyValue()); + } + } + } + + /** + * + * reloadFlumeSink + * + * @param newClusterSets + */ + private void reloadFlumeSink(Map newClusterSets) { + for (FlumeSink flumeSink : clusterSetMapper.selectFlumeSink()) { + // + ProxySink obj = new ProxySink(); + obj.setName(flumeSink.getSinkName()); + obj.setType(flumeSink.getType()); + obj.setChannel(flumeSink.getChannel()); + // + String setName = flumeSink.getSetName(); + DataProxyClusterSet setObj = this.getOrCreateDataProxyClusterSet(newClusterSets, setName); + setObj.getProxySinkMap().put(obj.getName(), obj); + } + } + + /** + * + * reloadFlumeSinkExt + * + * @param newClusterSets + */ + private void reloadFlumeSinkExt(Map newClusterSets) { + for (FlumeSinkExt ext : clusterSetMapper.selectFlumeSinkExt()) { + String setName = ext.getSetName(); + DataProxyClusterSet setObj = this.getOrCreateDataProxyClusterSet(newClusterSets, setName); + // + ProxySink obj = setObj.getProxySinkMap().get(ext.getParentName()); + if (obj != null) { + obj.getParams().put(ext.getKeyName(), ext.getKeyValue()); + } + } + } + + /** + * + * reloadInlongId + * + * @param newClusterSets + */ + private void reloadInlongId(Map newClusterSets) { + for (InLongId inlongId : clusterSetMapper.selectInlongId()) { + // + InLongIdObject obj = new InLongIdObject(); + obj.setInlongId(inlongId.getInlongId()); + obj.setTopic(inlongId.getTopic()); + if (inlongId.getParams() != null) { + Map params = MAP_SPLITTER.split(inlongId.getParams()); + obj.getParams().putAll(params); + } + String setName = inlongId.getSetName(); + DataProxyClusterSet setObj = this.getOrCreateDataProxyClusterSet(newClusterSets, setName); + setObj.getInlongIds().add(obj); + } + } + + /** + * + * reloadInlongId + * + * @param newClusterSets + */ + private void reloadProxy2Cache(Map newClusterSets) { + for (ProxyClusterToCacheCluster proxy2Cache : clusterSetMapper.selectProxyClusterToCacheCluster()) { + String proxyClusterName = proxy2Cache.getProxyClusterName(); + String cacheClusterName = proxy2Cache.getCacheClusterName(); + ProxyClusterObject proxyObj = this.proxyClusterMap.get(proxyClusterName); + if (proxyObj == null) { + continue; + } + CacheClusterObject cacheObj = this.cacheClusterMap.get(cacheClusterName); + if (cacheObj == null) { + continue; + } + // + String setName = proxyObj.getSetName(); + DataProxyClusterSet setObj = this.getOrCreateDataProxyClusterSet(newClusterSets, setName); + // + setObj.addProxy2Cache(proxyClusterName, cacheClusterName); + } + } + + /** + * + * generateClusterJson + * + * @param newClusterSets + */ + private void generateClusterJson(Map newClusterSets) { + for (Entry entry : newClusterSets.entrySet()) { + for (ProxyClusterObject proxyObj : entry.getValue().getProxyClusterList()) { + // proxy + DataProxyCluster clusterObj = new DataProxyCluster(); + clusterObj.setProxyCluster(proxyObj); + // cache + CacheClusterSetObject allCacheCluster = entry.getValue().getCacheClusterSet(); + CacheClusterSetObject proxyCacheClusterSet = clusterObj.getCacheClusterSet(); + proxyCacheClusterSet.setSetName(allCacheCluster.getSetName()); + proxyCacheClusterSet.setType(allCacheCluster.getType()); + proxyCacheClusterSet.setTopics(allCacheCluster.getTopics()); + // cacheCluster + Set cacheClusterNameSet = entry.getValue().getProxy2Cache().get(proxyObj.getName()); + if (cacheClusterNameSet != null) { + for (String cacheClusterName : cacheClusterNameSet) { + CacheClusterObject cacheObj = this.cacheClusterMap.get(cacheClusterName); + if (cacheObj == null) { + continue; + } + proxyCacheClusterSet.getCacheClusters().add(cacheObj); + } + } + // + String jsonDataProxyCluster = gson.toJson(clusterObj); + String md5 = DigestUtils.md5Hex(jsonDataProxyCluster); + DataProxyConfigResponse response = new DataProxyConfigResponse(); + response.setResult(true); + response.setErrCode(DataProxyConfigResponse.SUCC); + response.setMd5(md5); + response.setData(clusterObj); + String jsonResponse = gson.toJson(clusterObj); + entry.getValue().getProxyConfigJson().put(proxyObj.getName(), jsonResponse); + entry.getValue().getMd5Map().put(proxyObj.getName(), md5); + entry.getValue().setDefaultConfigJson(jsonResponse); + } + } + } + + /** + * + * getDataProxyClusterSet + * + * @param setName + * @return + */ + public DataProxyClusterSet getDataProxyClusterSet(String setName) { + DataProxyClusterSet setObj = this.clusterSets.get(setName); + return setObj; + } +} diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/DataProxyController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/DataProxyController.java index 63d87a379bc..fd8cfc7dc6e 100644 --- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/DataProxyController.java +++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/DataProxyController.java @@ -17,21 +17,24 @@ package org.apache.inlong.manager.web.controller.openapi; -import io.swagger.annotations.Api; -import io.swagger.annotations.ApiOperation; import java.util.List; + import org.apache.inlong.manager.common.beans.Response; -import org.apache.inlong.manager.common.pojo.dataproxy.DataProxyConfig; import org.apache.inlong.manager.common.pojo.dataproxy.DataProxyIpRequest; import org.apache.inlong.manager.common.pojo.dataproxy.DataProxyIpResponse; +import org.apache.inlong.manager.dao.entity.DataProxyConfig; import org.apache.inlong.manager.service.core.DataProxyClusterService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; + @RestController @RequestMapping("/openapi/dataproxy") @Api(tags = "DataProxy Config") @@ -51,4 +54,11 @@ public Response> getIpList(@RequestBody DataProxyIpReq public Response> getConfig() { return Response.success(dataProxyClusterService.getConfig()); } + + @GetMapping("/getAllConfig") + @ApiOperation(value = "get data proxy config") + public String getAllConfig(@RequestParam("clusterName") String clusterName, @RequestParam("setName") String setName, + @RequestParam("md5") String md5) { + return dataProxyClusterService.getAllConfig(clusterName, setName, md5); + } } diff --git a/inlong-manager/pom.xml b/inlong-manager/pom.xml index 2e7fb58b1e7..395ab6bbe36 100644 --- a/inlong-manager/pom.xml +++ b/inlong-manager/pom.xml @@ -66,6 +66,14 @@ 2.0.5 3.0.0 1.6.2 + + 1.8 + 1.8 + 6.19 + UTF-8 + 3.8.1 + 1.4.13 + inlong @@ -342,4 +350,25 @@ + + + + org.apache.maven.plugins + maven-compiler-plugin + + ${project.build.encoding} + ${maven.compiler.source} + ${maven.compiler.target} + + + org.projectlombok + lombok + ${lombok.version} + + + + ${plugin.compile.version} + + +