restClusterClient;
-
- static GenericContainer> jobManager;
- static GenericContainer> taskManager;
-
- @AfterClass
- public static void after() {
- if (restClusterClient != null) {
- restClusterClient.close();
- }
- if (jobManager != null) {
- jobManager.stop();
- }
- if (taskManager != null) {
- taskManager.stop();
- }
- }
-
- /**
- * Submits a SQL job to the running cluster.
- *
- * NOTE: You should not use {@code '\t'}.
- */
- public void submitSQLJob(String sqlFile, Path... jars)
- throws IOException, InterruptedException {
- final List commands = new ArrayList<>();
- String containerSqlFile = copyToContainerTmpPath(jobManager, sqlFile);
- commands.add(FLINK_BIN + "/flink run -d");
- commands.add("-c org.apache.inlong.sort.Entrance");
- commands.add(copyToContainerTmpPath(jobManager, constructDistJar(jars)));
- commands.add("--sql.script.file");
- commands.add(containerSqlFile);
-
- ExecResult execResult =
- jobManager.execInContainer("bash", "-c", String.join(" ", commands));
- LOG.info(execResult.getStdout());
- if (execResult.getExitCode() != 0) {
- LOG.error(execResult.getStderr());
- throw new AssertionError("Failed when submitting the SQL job.");
- }
- }
-
- /**
- * Get {@link RestClusterClient} connected to this FlinkContainer.
- *
- * This method lazily initializes the REST client on-demand.
- */
- public RestClusterClient getRestClusterClient() {
- checkState(
- jobManager.isRunning(),
- "Cluster client should only be retrieved for a running cluster");
- try {
- final Configuration clientConfiguration = new Configuration();
- clientConfiguration.set(RestOptions.ADDRESS, jobManager.getHost());
- clientConfiguration.set(
- RestOptions.PORT, jobManager.getMappedPort(JOB_MANAGER_REST_PORT));
- this.restClusterClient =
- new RestClusterClient<>(clientConfiguration, StandaloneClusterId.getInstance());
- } catch (Exception e) {
- throw new IllegalStateException(
- "Failed to create client for Flink container cluster", e);
- }
- return restClusterClient;
- }
-
- /**
- * Polling to detect task status until the task successfully into {@link JobStatus.RUNNING}
- *
- * @param timeout
- */
- public void waitUntilJobRunning(Duration timeout) {
- RestClusterClient> clusterClient = getRestClusterClient();
- Deadline deadline = Deadline.fromNow(timeout);
- while (deadline.hasTimeLeft()) {
- Collection jobStatusMessages;
- try {
- jobStatusMessages = clusterClient.listJobs().get(10, TimeUnit.SECONDS);
- } catch (Exception e) {
- LOG.warn("Error when fetching job status.", e);
- continue;
- }
- if (jobStatusMessages != null && !jobStatusMessages.isEmpty()) {
- JobStatusMessage message = jobStatusMessages.iterator().next();
- JobStatus jobStatus = message.getJobState();
- if (jobStatus.isTerminalState()) {
- throw new ValidationException(
- String.format(
- "Job has been terminated! JobName: %s, JobID: %s, Status: %s",
- message.getJobName(),
- message.getJobId(),
- message.getJobState()));
- } else if (jobStatus == JobStatus.RUNNING) {
- return;
- }
- }
- }
- }
-
- /**
- * Copy all other dependencies into user jar 'lib/' entry.
- * Flink per-job mode only support upload one jar to cluster.
- */
- private String constructDistJar(Path... jars) throws IOException {
-
- File newJar = temporaryFolder.newFile("sort-dist.jar");
- try (
- JarFile jarFile = new JarFile(SORT_DIST_JAR.toFile());
- JarOutputStream jos = new JarOutputStream(new FileOutputStream(newJar))) {
- jarFile.stream().forEach(entry -> {
- try (InputStream is = jarFile.getInputStream(entry)) {
- jos.putNextEntry(entry);
- jos.write(IOUtils.toByteArray(is));
- jos.closeEntry();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- });
-
- for (Path jar : jars) {
- try (InputStream is = new FileInputStream(jar.toFile())) {
- jos.putNextEntry(new JarEntry("lib/" + jar.getFileName().toString()));
- jos.write(IOUtils.toByteArray(is));
- jos.closeEntry();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- }
- return newJar.getAbsolutePath();
- }
-
- // Should not a big file, all file data will load into memory, then copy to container.
- private String copyToContainerTmpPath(GenericContainer> container, String filePath) throws IOException {
- Path path = Paths.get(filePath);
- byte[] fileData = Files.readAllBytes(path);
- String containerPath = "/tmp/" + path.getFileName();
- container.copyFileToContainer(Transferable.of(fileData), containerPath);
- return containerPath;
- }
-}
diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/main/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnvJRE11.java b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/main/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnvJRE11.java
deleted file mode 100644
index 9033740822f..00000000000
--- a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/main/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnvJRE11.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.tests.utils;
-
-import org.junit.BeforeClass;
-import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.lifecycle.Startables;
-
-import java.util.stream.Stream;
-
-public abstract class FlinkContainerTestEnvJRE11 extends FlinkContainerTestEnv {
-
- @BeforeClass
- public static void before() {
- LOG.info("Starting containers...");
- jobManager =
- new GenericContainer<>("flink:1.18.1-scala_2.12")
- .withCommand("jobmanager")
- .withNetwork(NETWORK)
- .withNetworkAliases(INTER_CONTAINER_JM_ALIAS)
- .withExposedPorts(JOB_MANAGER_REST_PORT, DEBUG_PORT)
- .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES)
- .withExposedPorts(JOB_MANAGER_REST_PORT)
- .withLogConsumer(new Slf4jLogConsumer(JM_LOG));
- taskManager =
- new GenericContainer<>("flink:1.18.1-scala_2.12")
- .withCommand("taskmanager")
- .withNetwork(NETWORK)
- .withNetworkAliases(INTER_CONTAINER_TM_ALIAS)
- .withExposedPorts(DEBUG_PORT)
- .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES)
- .dependsOn(jobManager)
- .withLogConsumer(new Slf4jLogConsumer(TM_LOG));
-
- Startables.deepStart(Stream.of(jobManager)).join();
- Startables.deepStart(Stream.of(taskManager)).join();
- LOG.info("Containers are started.");
- }
-}
diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/main/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnvJRE8.java b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/main/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnvJRE8.java
deleted file mode 100644
index de982da4ba0..00000000000
--- a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/main/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnvJRE8.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.tests.utils;
-
-import org.junit.BeforeClass;
-import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.lifecycle.Startables;
-
-import java.util.stream.Stream;
-
-public abstract class FlinkContainerTestEnvJRE8 extends FlinkContainerTestEnv {
-
- @BeforeClass
- public static void before() {
- LOG.info("Starting containers...");
- jobManager =
- new GenericContainer<>("flink:1.18.1-scala_2.12-java8")
- .withCommand("jobmanager")
- .withNetwork(NETWORK)
- .withNetworkAliases(INTER_CONTAINER_JM_ALIAS)
- .withExposedPorts(JOB_MANAGER_REST_PORT, DEBUG_PORT)
- .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES)
- .withExposedPorts(JOB_MANAGER_REST_PORT)
- .withLogConsumer(new Slf4jLogConsumer(JM_LOG));
- taskManager =
- new GenericContainer<>("flink:1.18.1-scala_2.12-java8")
- .withCommand("taskmanager")
- .withNetwork(NETWORK)
- .withNetworkAliases(INTER_CONTAINER_TM_ALIAS)
- .withExposedPorts(DEBUG_PORT)
- .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES)
- .dependsOn(jobManager)
- .withLogConsumer(new Slf4jLogConsumer(TM_LOG));
-
- Startables.deepStart(Stream.of(jobManager)).join();
- Startables.deepStart(Stream.of(taskManager)).join();
- LOG.info("Containers are started.");
- }
-}
diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/main/java/org/apache/inlong/sort/tests/utils/PlaceholderResolver.java b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/main/java/org/apache/inlong/sort/tests/utils/PlaceholderResolver.java
deleted file mode 100644
index 0c283336999..00000000000
--- a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/main/java/org/apache/inlong/sort/tests/utils/PlaceholderResolver.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.tests.utils;
-
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.List;
-import java.util.Map;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
-/**
- * A file placeholder replacement tool.
- */
-public class PlaceholderResolver {
-
- /**
- * Default placeholder prefix
- */
- public static final String DEFAULT_PLACEHOLDER_PREFIX = "${";
-
- /**
- * Default placeholder suffix
- */
- public static final String DEFAULT_PLACEHOLDER_SUFFIX = "}";
-
- /**
- * Default singleton resolver
- */
- private static PlaceholderResolver defaultResolver = new PlaceholderResolver();
-
- /**
- * Placeholder prefix
- */
- private String placeholderPrefix = DEFAULT_PLACEHOLDER_PREFIX;
-
- /**
- * Placeholder suffix
- */
- private String placeholderSuffix = DEFAULT_PLACEHOLDER_SUFFIX;
-
- private PlaceholderResolver() {
-
- }
-
- private PlaceholderResolver(String placeholderPrefix, String placeholderSuffix) {
- this.placeholderPrefix = placeholderPrefix;
- this.placeholderSuffix = placeholderSuffix;
- }
-
- public static PlaceholderResolver getDefaultResolver() {
- return defaultResolver;
- }
-
- public static PlaceholderResolver getResolver(String placeholderPrefix, String placeholderSuffix) {
- return new PlaceholderResolver(placeholderPrefix, placeholderSuffix);
- }
-
- /**
- * Replace template string with special placeholder according to replace function.
- * @param content template string with special placeholder
- * @param rule placeholder replacement rule
- * @return new replaced string
- */
- public String resolveByRule(String content, Function rule) {
- int start = content.indexOf(this.placeholderPrefix);
- if (start == -1) {
- return content;
- }
- StringBuilder result = new StringBuilder(content);
- while (start != -1) {
- int end = result.indexOf(this.placeholderSuffix, start);
- // get placeholder actual value (e.g. ${id}, get the value represent id)
- String placeholder = result.substring(start + this.placeholderPrefix.length(), end);
- // replace placeholder value
- String replaceContent = placeholder.trim().isEmpty() ? "" : rule.apply(placeholder);
- result.replace(start, end + this.placeholderSuffix.length(), replaceContent);
- start = result.indexOf(this.placeholderPrefix, start + replaceContent.length());
- }
- return result.toString();
- }
-
- /**
- * Replace template string with special placeholder according to replace function.
- * @param file template file with special placeholder
- * @param rule placeholder replacement rule
- * @return new replaced string
- */
- public Path resolveByRule(Path file, Function rule) {
- try {
- List newContents = Files.readAllLines(file, StandardCharsets.UTF_8)
- .stream()
- .map(content -> resolveByRule(content, rule))
- .collect(Collectors.toList());
- Path newPath = Paths.get(file.getParent().toString(), file.getFileName() + "$");
- Files.write(newPath, String.join(System.lineSeparator(), newContents).getBytes(StandardCharsets.UTF_8));
- return newPath;
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- /**
- * Replace template string with special placeholder according to properties file.
- * Key is the content of the placeholder
- * e.g: content = product:${id}:detail:${did}
- * valueMap = id -> 1; pid -> 2
- * return: product:1:detail:2
- *
- * @param content template string with special placeholder
- * @param valueMap placeholder replacement map
- * @return new replaced string
- */
- public String resolveByMap(String content, final Map valueMap) {
- return resolveByRule(content, placeholderValue -> String.valueOf(valueMap.get(placeholderValue)));
- }
-
- /**
- * Replace template string with special placeholder according to properties file.
- * Key is the content of the placeholder
- * e.g: content = product:${id}:detail:${did}
- * valueMap = id -> 1; pid -> 2
- * return: product:1:detail:2
- *
- * @param file template string with special placeholder
- * @param valueMap placeholder replacement map
- * @return new replaced string
- */
- public Path resolveByMap(Path file, final Map valueMap) {
- return resolveByRule(file, placeholderValue -> String.valueOf(valueMap.get(placeholderValue)));
- }
-}
diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/main/java/org/apache/inlong/sort/tests/utils/TestUtils.java b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/main/java/org/apache/inlong/sort/tests/utils/TestUtils.java
deleted file mode 100644
index 8daff533da2..00000000000
--- a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/main/java/org/apache/inlong/sort/tests/utils/TestUtils.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.tests.utils;
-
-import org.junit.Test;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.function.Function;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * Test util for test container.
- */
-public class TestUtils {
-
- private static final ParameterProperty MODULE_DIRECTORY =
- new ParameterProperty<>("moduleDir", Paths::get);
-
- /**
- * Searches for a resource file matching the given regex in the given directory. This method is
- * primarily intended to be used for the initialization of static {@link Path} fields for
- * resource file(i.e. jar, config file) that reside in the modules {@code target} directory.
- *
- * @param resourceNameRegex regex pattern to match against
- * @return Path pointing to the matching jar
- * @throws RuntimeException if none or multiple resource files could be found
- */
- public static Path getResource(final String resourceNameRegex) {
- // if the property is not set then we are most likely running in the IDE, where the working
- // directory is the
- // module of the test that is currently running, which is exactly what we want
- Path moduleDirectory = MODULE_DIRECTORY.get(Paths.get("").toAbsolutePath());
-
- try (Stream dependencyResources = Files.walk(moduleDirectory)) {
- final List matchingResources =
- dependencyResources
- .filter(
- jar -> Pattern.compile(resourceNameRegex)
- .matcher(jar.toAbsolutePath().toString())
- .find())
- .collect(Collectors.toList());
- switch (matchingResources.size()) {
- case 0:
- throw new RuntimeException(
- new FileNotFoundException(
- String.format(
- "No resource file could be found that matches the pattern %s. "
- + "This could mean that the test module must be rebuilt via maven.",
- resourceNameRegex)));
- case 1:
- return matchingResources.get(0);
- default:
- throw new RuntimeException(
- new IOException(
- String.format(
- "Multiple resource files were found matching the pattern %s. Matches=%s",
- resourceNameRegex, matchingResources)));
- }
- } catch (final IOException ioe) {
- throw new RuntimeException("Could not search for resource resource files.", ioe);
- }
- }
-
- /**
- * A simple system properties value getter with default value when could not find the system property.
- * @param
- */
- static class ParameterProperty {
-
- private final String propertyName;
- private final Function converter;
-
- public ParameterProperty(final String propertyName, final Function converter) {
- this.propertyName = propertyName;
- this.converter = converter;
- }
-
- /**
- * Retrieves the value of this property, or the given default if no value was set.
- *
- * @return the value of this property, or the given default if no value was set
- */
- public V get(final V defaultValue) {
- final String value = System.getProperty(propertyName);
- return value == null ? defaultValue : converter.apply(value);
- }
- }
-
- @Test
- public void testReplaceholder() {
- String before = "today is ${date}, today weather is ${weather}";
- Map maps = new HashMap<>();
- maps.put("date", "2024.07.15");
- maps.put("weather", "song");
- String after = PlaceholderResolver.getDefaultResolver().resolveByMap(before, maps);
- assertEquals(after, "today is 2024.07.15, today weather is song");
- }
-}
diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/main/resources/log4j2-test.properties b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/main/resources/log4j2-test.properties
deleted file mode 100644
index 8b0c6558317..00000000000
--- a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/main/resources/log4j2-test.properties
+++ /dev/null
@@ -1,82 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-rootLogger=INFO, STDOUT
-
-appender.console.type=Console
-appender.console.name=STDOUT
-appender.console.layout.type=PatternLayout
-appender.console.layout.pattern=%-4r [%t] %-5p %c %x - %m%n
-
-appender.jm.type = File
-appender.jm.name = jobmanager
-appender.jm.fileName = target/logs/jobmanager.log
-appender.jm.layout.type = PatternLayout
-appender.jm.layout.pattern = - %m%n
-
-appender.tm.type = File
-appender.tm.name = taskmanager
-appender.tm.fileName = target/logs/taskmanager.log
-appender.tm.layout.type = PatternLayout
-appender.tm.layout.pattern = - %m%n
-
-appender.kafka.type = File
-appender.kafka.name = kafkaserver
-appender.kafka.fileName = target/logs/kafka.log
-appender.kafka.layout.type = PatternLayout
-appender.kafka.layout.pattern = - %m%n
-
-appender.starrocks.type = File
-appender.starrocks.name = starrocks
-appender.starrocks.fileName = target/logs/starrocks.log
-appender.starrocks.layout.type = PatternLayout
-appender.starrocks.layout.pattern = - %m%n
-
-appender.postgres.type = File
-appender.postgres.name = postgres
-appender.postgres.fileName = target/logs/postgres.log
-appender.postgres.layout.type = PatternLayout
-appender.postgres.layout.pattern = - %m%n
-
-appender.redis.type = File
-appender.redis.name = redis
-appender.redis.fileName = target/logs/redis.log
-appender.redis.layout.type = PatternLayout
-appender.redis.layout.pattern = - %m%n
-
-logger.jm=INFO, jobmanager
-logger.jm.name=org.apache.flink.runtime.jobmaster.JobMaster
-logger.jm.additivity=false
-
-logger.tm=INFO, taskmanager
-logger.tm.name=org.apache.flink.runtime.taskexecutor.TaskExecutor
-logger.tm.additivity=false
-
-logger.starrocks=INFO, starrocks
-logger.starrocks.name=org.apache.inlong.sort.tests.utils.StarRocksContainer
-logger.starrocks.additivity=false
-
-logger.postgres=INFO, postgres
-logger.postgres.name=org.testcontainers.containers.PostgreSQLContainer
-logger.postgres.additivity=false
-
-logger.redis=INFO, redis
-logger.redis.name=org.apache.inlong.sort.tests.utils.RedisContainer
-logger.redis.additivity=false
-
-
diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch6/pom.xml b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch6/pom.xml
new file mode 100644
index 00000000000..e7bce101659
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch6/pom.xml
@@ -0,0 +1,127 @@
+
+
+
+ 4.0.0
+
+ org.apache.inlong
+ sort-connectors-v1.18
+ 1.14.0-SNAPSHOT
+
+
+ sort-connector-elasticsearch6-v1.18
+ jar
+ Apache InLong - Sort-connector-elasticsearch6
+
+
+ ${project.parent.parent.parent.parent.parent.basedir}
+ 6.8.17
+ 3.0.1-1.17
+
+
+
+
+ org.apache.inlong
+ sort-flink-dependencies-v1.18
+ ${project.version}
+ provided
+
+
+ org.apache.flink
+ flink-connector-elasticsearch6
+ ${elasticsearch.connector.version}
+
+
+ org.apache.flink
+ flink-json
+ ${flink.version}
+ provided
+
+
+ org.apache.inlong
+ sort-connector-elasticsearch-base-v1.18
+ ${project.version}
+
+
+ org.elasticsearch
+ elasticsearch
+ ${elasticsearch.version}
+
+
+
+ org.ow2.asm
+ *
+
+
+
+
+ org.apache.logging.log4j
+ log4j-api
+ provided
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+
+
+ shade-flink
+
+ shade
+
+ package
+
+ true
+
+
+ org.apache.inlong:sort-connector-*
+
+ org/apache/inlong/**
+ META-INF/services/org.apache.flink.table.factories.Factory
+
+
+
+ *:*
+
+ log4j.properties
+ META-INF/*.SF
+ META-INF/*.DSA
+ META-INF/*.RSA
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch6/src/main/java/org/apache/inlong/sort/elasticsearch6/Elasticsearch6ApiCallBridge.java b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch6/src/main/java/org/apache/inlong/sort/elasticsearch6/Elasticsearch6ApiCallBridge.java
new file mode 100644
index 00000000000..8e0080f7349
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch6/src/main/java/org/apache/inlong/sort/elasticsearch6/Elasticsearch6ApiCallBridge.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch6;
+
+import org.apache.inlong.sort.elasticsearch.ElasticsearchApiCallBridge;
+import org.apache.inlong.sort.elasticsearch.ElasticsearchSinkBase;
+import org.apache.inlong.sort.elasticsearch.RequestIndexer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.connectors.elasticsearch6.RestClientFactory;
+import org.apache.flink.util.Preconditions;
+import org.apache.http.HttpHost;
+import org.elasticsearch.action.bulk.BackoffPolicy;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.common.unit.TimeValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+/** Implementation of {@link ElasticsearchApiCallBridge} for Elasticsearch 6 and later versions.
+ * Modify from {@link org.apache.flink.streaming.connectors.elasticsearch6.Elasticsearch6ApiCallBridge}
+ * */
+@Internal
+public class Elasticsearch6ApiCallBridge
+ implements
+ ElasticsearchApiCallBridge {
+
+ private static final long serialVersionUID = -5222683870097809633L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(Elasticsearch6ApiCallBridge.class);
+
+ /** User-provided HTTP Host. */
+ private final List httpHosts;
+
+ /** The factory to configure the rest client. */
+ private final RestClientFactory restClientFactory;
+
+ Elasticsearch6ApiCallBridge(List httpHosts, RestClientFactory restClientFactory) {
+ Preconditions.checkArgument(httpHosts != null && !httpHosts.isEmpty());
+ this.httpHosts = httpHosts;
+ this.restClientFactory = Preconditions.checkNotNull(restClientFactory);
+ }
+
+ @Override
+ public RestHighLevelClient createClient() {
+ RestClientBuilder builder =
+ RestClient.builder(httpHosts.toArray(new HttpHost[httpHosts.size()]));
+ restClientFactory.configureRestClientBuilder(builder);
+
+ RestHighLevelClient rhlClient = new RestHighLevelClient(builder);
+
+ return rhlClient;
+ }
+
+ @Override
+ public BulkProcessor.Builder createBulkProcessorBuilder(
+ RestHighLevelClient client, BulkProcessor.Listener listener) {
+ return BulkProcessor.builder(client::bulkAsync, listener);
+ }
+
+ @Override
+ public Throwable extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkItemResponse) {
+ if (!bulkItemResponse.isFailed()) {
+ return null;
+ } else {
+ return bulkItemResponse.getFailure().getCause();
+ }
+ }
+
+ @Override
+ public void configureBulkProcessorFlushInterval(
+ BulkProcessor.Builder builder, long flushIntervalMillis) {
+ builder.setFlushInterval(TimeValue.timeValueMillis(flushIntervalMillis));
+ }
+
+ @Override
+ public void configureBulkProcessorBackoff(
+ BulkProcessor.Builder builder,
+ @Nullable ElasticsearchSinkBase.BulkFlushBackoffPolicy flushBackoffPolicy) {
+
+ BackoffPolicy backoffPolicy;
+ if (flushBackoffPolicy != null) {
+ switch (flushBackoffPolicy.getBackoffType()) {
+ case CONSTANT:
+ backoffPolicy =
+ BackoffPolicy.constantBackoff(
+ new TimeValue(flushBackoffPolicy.getDelayMillis()),
+ flushBackoffPolicy.getMaxRetryCount());
+ break;
+ case EXPONENTIAL:
+ default:
+ backoffPolicy =
+ BackoffPolicy.exponentialBackoff(
+ new TimeValue(flushBackoffPolicy.getDelayMillis()),
+ flushBackoffPolicy.getMaxRetryCount());
+ }
+ } else {
+ backoffPolicy = BackoffPolicy.noBackoff();
+ }
+
+ builder.setBackoffPolicy(backoffPolicy);
+ }
+
+ @Override
+ public RequestIndexer createBulkProcessorIndexer(
+ BulkProcessor bulkProcessor,
+ boolean flushOnCheckpoint,
+ AtomicLong numPendingRequestsRef) {
+ return new Elasticsearch6BulkProcessorIndexer(
+ bulkProcessor, flushOnCheckpoint, numPendingRequestsRef);
+ }
+
+ @Override
+ public void verifyClientConnection(RestHighLevelClient client) throws IOException {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Pinging Elasticsearch cluster via hosts {} ...", httpHosts);
+ }
+
+ if (!client.ping()) {
+ throw new RuntimeException("There are no reachable Elasticsearch nodes!");
+ }
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Elasticsearch RestHighLevelClient is connected to {}", httpHosts.toString());
+ }
+ }
+}
diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch6/src/main/java/org/apache/inlong/sort/elasticsearch6/Elasticsearch6BulkProcessorIndexer.java b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch6/src/main/java/org/apache/inlong/sort/elasticsearch6/Elasticsearch6BulkProcessorIndexer.java
new file mode 100644
index 00000000000..ac91481ef93
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch6/src/main/java/org/apache/inlong/sort/elasticsearch6/Elasticsearch6BulkProcessorIndexer.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch6;
+
+import org.apache.inlong.sort.elasticsearch.RequestIndexer;
+
+import org.apache.flink.annotation.Internal;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Implementation of a {@link RequestIndexer}, using a {@link BulkProcessor}. {@link ActionRequest
+ * ActionRequests} will be buffered before sending a bulk request to the Elasticsearch cluster.
+ *
+ * Note: This class is binary compatible to Elasticsearch 6.
+ * Modify from {@link org.apache.flink.streaming.connectors.elasticsearch6.Elasticsearch6BulkProcessorIndexer}
+ */
+@Internal
+class Elasticsearch6BulkProcessorIndexer implements RequestIndexer {
+
+ private final BulkProcessor bulkProcessor;
+ private final boolean flushOnCheckpoint;
+ private final AtomicLong numPendingRequestsRef;
+
+ Elasticsearch6BulkProcessorIndexer(
+ BulkProcessor bulkProcessor,
+ boolean flushOnCheckpoint,
+ AtomicLong numPendingRequestsRef) {
+ this.bulkProcessor = checkNotNull(bulkProcessor);
+ this.flushOnCheckpoint = flushOnCheckpoint;
+ this.numPendingRequestsRef = checkNotNull(numPendingRequestsRef);
+ }
+
+ @Override
+ public void add(DeleteRequest... deleteRequests) {
+ for (DeleteRequest deleteRequest : deleteRequests) {
+ if (flushOnCheckpoint) {
+ numPendingRequestsRef.getAndIncrement();
+ }
+ this.bulkProcessor.add(deleteRequest);
+ }
+ }
+
+ @Override
+ public void add(IndexRequest... indexRequests) {
+ for (IndexRequest indexRequest : indexRequests) {
+ if (flushOnCheckpoint) {
+ numPendingRequestsRef.getAndIncrement();
+ }
+ this.bulkProcessor.add(indexRequest);
+ }
+ }
+
+ @Override
+ public void add(UpdateRequest... updateRequests) {
+ for (UpdateRequest updateRequest : updateRequests) {
+ if (flushOnCheckpoint) {
+ numPendingRequestsRef.getAndIncrement();
+ }
+ this.bulkProcessor.add(updateRequest);
+ }
+ }
+}
diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch6/src/main/java/org/apache/inlong/sort/elasticsearch6/ElasticsearchSink.java b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch6/src/main/java/org/apache/inlong/sort/elasticsearch6/ElasticsearchSink.java
new file mode 100644
index 00000000000..165bb51933a
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch6/src/main/java/org/apache/inlong/sort/elasticsearch6/ElasticsearchSink.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch6;
+
+import org.apache.inlong.sort.elasticsearch.ActionRequestFailureHandler;
+import org.apache.inlong.sort.elasticsearch.ElasticsearchSinkBase;
+import org.apache.inlong.sort.elasticsearch.ElasticsearchSinkFunction;
+import org.apache.inlong.sort.elasticsearch.util.NoOpFailureHandler;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.streaming.connectors.elasticsearch6.RestClientFactory;
+import org.apache.flink.util.Preconditions;
+import org.apache.http.HttpHost;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.client.RestHighLevelClient;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Elasticsearch 6.x sink that requests multiple {@link ActionRequest ActionRequests} against a
+ * cluster for each incoming element.
+ *
+ *
The sink internally uses a {@link RestHighLevelClient} to communicate with an Elasticsearch
+ * cluster. The sink will fail if no cluster can be connected to using the provided transport
+ * addresses passed to the constructor.
+ *
+ *
Internally, the sink will use a {@link BulkProcessor} to send {@link ActionRequest
+ * ActionRequests}. This will buffer elements before sending a request to the cluster. The behaviour
+ * of the {@code BulkProcessor} can be configured using these config keys:
+ *
+ *
+ * - {@code bulk.flush.max.actions}: Maximum amount of elements to buffer
+ *
- {@code bulk.flush.max.size.mb}: Maximum amount of data (in megabytes) to buffer
+ *
- {@code bulk.flush.interval.ms}: Interval at which to flush data regardless of the other two
+ * settings in milliseconds
+ *
+ *
+ * You also have to provide an {@link ElasticsearchSinkFunction}. This is used to create multiple
+ * {@link ActionRequest ActionRequests} for each incoming element. See the class level documentation
+ * of {@link ElasticsearchSinkFunction} for an example.
+ *
+ * @param Type of the elements handled by this sink
+ *
+ * Modify from {@link org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink}
+ */
+@PublicEvolving
+public class ElasticsearchSink extends ElasticsearchSinkBase {
+
+ private static final long serialVersionUID = 1L;
+
+ private ElasticsearchSink(
+ Map bulkRequestsConfig,
+ List httpHosts,
+ ElasticsearchSinkFunction elasticsearchSinkFunction,
+ ActionRequestFailureHandler failureHandler,
+ RestClientFactory restClientFactory) {
+
+ super(
+ new Elasticsearch6ApiCallBridge(httpHosts, restClientFactory),
+ bulkRequestsConfig,
+ elasticsearchSinkFunction,
+ failureHandler);
+ }
+
+ /**
+ * A builder for creating an {@link ElasticsearchSink}.
+ *
+ * @param Type of the elements handled by the sink this builder creates.
+ * @deprecated This has been deprecated, please use {@link
+ * org.apache.flink.connector.elasticsearch.sink.Elasticsearch6SinkBuilder}.
+ */
+ @Deprecated
+ @PublicEvolving
+ public static class Builder {
+
+ private final List httpHosts;
+ private final ElasticsearchSinkFunction elasticsearchSinkFunction;
+
+ private Map bulkRequestsConfig = new HashMap<>();
+ private ActionRequestFailureHandler failureHandler = new NoOpFailureHandler();
+ private RestClientFactory restClientFactory = restClientBuilder -> {
+ };
+
+ /**
+ * Creates a new {@code ElasticsearchSink} that connects to the cluster using a {@link
+ * RestHighLevelClient}.
+ *
+ * @param httpHosts The list of {@link HttpHost} to which the {@link RestHighLevelClient}
+ * connects to.
+ * @param elasticsearchSinkFunction This is used to generate multiple {@link ActionRequest}
+ * from the incoming element.
+ */
+ public Builder(
+ List httpHosts, ElasticsearchSinkFunction elasticsearchSinkFunction) {
+ this.httpHosts = Preconditions.checkNotNull(httpHosts);
+ this.elasticsearchSinkFunction = Preconditions.checkNotNull(elasticsearchSinkFunction);
+ }
+
+ /**
+ * Sets the maximum number of actions to buffer for each bulk request. You can pass -1 to
+ * disable it.
+ *
+ * @param numMaxActions the maximum number of actions to buffer per bulk request.
+ */
+ public void setBulkFlushMaxActions(int numMaxActions) {
+ Preconditions.checkArgument(
+ numMaxActions == -1 || numMaxActions > 0,
+ "Max number of buffered actions must be larger than 0.");
+
+ this.bulkRequestsConfig.put(
+ CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, String.valueOf(numMaxActions));
+ }
+
+ /**
+ * Sets the maximum size of buffered actions, in mb, per bulk request. You can pass -1 to
+ * disable it.
+ *
+ * @param maxSizeMb the maximum size of buffered actions, in mb.
+ */
+ public void setBulkFlushMaxSizeMb(int maxSizeMb) {
+ Preconditions.checkArgument(
+ maxSizeMb == -1 || maxSizeMb > 0,
+ "Max size of buffered actions must be larger than 0.");
+
+ this.bulkRequestsConfig.put(
+ CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB, String.valueOf(maxSizeMb));
+ }
+
+ /**
+ * Sets the bulk flush interval, in milliseconds. You can pass -1 to disable it.
+ *
+ * @param intervalMillis the bulk flush interval, in milliseconds.
+ */
+ public void setBulkFlushInterval(long intervalMillis) {
+ Preconditions.checkArgument(
+ intervalMillis == -1 || intervalMillis >= 0,
+ "Interval (in milliseconds) between each flush must be larger than or equal to 0.");
+
+ this.bulkRequestsConfig.put(
+ CONFIG_KEY_BULK_FLUSH_INTERVAL_MS, String.valueOf(intervalMillis));
+ }
+
+ /**
+ * Sets whether or not to enable bulk flush backoff behaviour.
+ *
+ * @param enabled whether or not to enable backoffs.
+ */
+ public void setBulkFlushBackoff(boolean enabled) {
+ this.bulkRequestsConfig.put(
+ CONFIG_KEY_BULK_FLUSH_BACKOFF_ENABLE, String.valueOf(enabled));
+ }
+
+ /**
+ * Sets the type of back of to use when flushing bulk requests.
+ *
+ * @param flushBackoffType the backoff type to use.
+ */
+ public void setBulkFlushBackoffType(FlushBackoffType flushBackoffType) {
+ this.bulkRequestsConfig.put(
+ CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE,
+ Preconditions.checkNotNull(flushBackoffType).toString());
+ }
+
+ /**
+ * Sets the maximum number of retries for a backoff attempt when flushing bulk requests.
+ *
+ * @param maxRetries the maximum number of retries for a backoff attempt when flushing bulk
+ * requests
+ */
+ public void setBulkFlushBackoffRetries(int maxRetries) {
+ Preconditions.checkArgument(
+ maxRetries > 0, "Max number of backoff attempts must be larger than 0.");
+
+ this.bulkRequestsConfig.put(
+ CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES, String.valueOf(maxRetries));
+ }
+
+ /**
+ * Sets the amount of delay between each backoff attempt when flushing bulk requests, in
+ * milliseconds.
+ *
+ * @param delayMillis the amount of delay between each backoff attempt when flushing bulk
+ * requests, in milliseconds.
+ */
+ public void setBulkFlushBackoffDelay(long delayMillis) {
+ Preconditions.checkArgument(
+ delayMillis >= 0,
+ "Delay (in milliseconds) between each backoff attempt must be larger than or equal to 0.");
+ this.bulkRequestsConfig.put(
+ CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY, String.valueOf(delayMillis));
+ }
+
+ /**
+ * Sets a failure handler for action requests.
+ *
+ * @param failureHandler This is used to handle failed {@link ActionRequest}.
+ */
+ public void setFailureHandler(ActionRequestFailureHandler failureHandler) {
+ this.failureHandler = Preconditions.checkNotNull(failureHandler);
+ }
+
+ /**
+ * Sets a REST client factory for custom client configuration.
+ *
+ * @param restClientFactory the factory that configures the rest client.
+ */
+ public void setRestClientFactory(RestClientFactory restClientFactory) {
+ this.restClientFactory = Preconditions.checkNotNull(restClientFactory);
+ }
+
+ /**
+ * Creates the Elasticsearch sink.
+ *
+ * @return the created Elasticsearch sink.
+ */
+ public ElasticsearchSink build() {
+ return new ElasticsearchSink<>(
+ bulkRequestsConfig,
+ httpHosts,
+ elasticsearchSinkFunction,
+ failureHandler,
+ restClientFactory);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ Builder> builder = (Builder>) o;
+ return Objects.equals(httpHosts, builder.httpHosts)
+ && Objects.equals(elasticsearchSinkFunction, builder.elasticsearchSinkFunction)
+ && Objects.equals(bulkRequestsConfig, builder.bulkRequestsConfig)
+ && Objects.equals(failureHandler, builder.failureHandler)
+ && Objects.equals(restClientFactory, builder.restClientFactory);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ httpHosts,
+ elasticsearchSinkFunction,
+ bulkRequestsConfig,
+ failureHandler,
+ restClientFactory);
+ }
+ }
+}
diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6Configuration.java b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6Configuration.java
new file mode 100644
index 00000000000..94d24ad2963
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6Configuration.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch6.table;
+
+import org.apache.inlong.sort.elasticsearch.table.ElasticsearchConfiguration;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.http.HttpHost;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.HOSTS_OPTION;
+
+/** Elasticsearch 6 specific configuration.
+ * Modify from {@link org.apache.flink.streaming.connectors.elasticsearch.table.Elasticsearch6Configuration}
+ * */
+@Internal
+final class Elasticsearch6Configuration extends ElasticsearchConfiguration {
+
+ Elasticsearch6Configuration(ReadableConfig config, ClassLoader classLoader) {
+ super(config, classLoader);
+ }
+
+ public List getHosts() {
+ return config.get(HOSTS_OPTION).stream()
+ .map(Elasticsearch6Configuration::validateAndParseHostsString)
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * Parse Hosts String to list.
+ *
+ * Hosts String format was given as following:
+ *
+ *
+ * connector.hosts = http://host_name:9092;http://host_name:9093
+ *
+ */
+ private static HttpHost validateAndParseHostsString(String host) {
+ try {
+ HttpHost httpHost = HttpHost.create(host);
+ if (httpHost.getPort() < 0) {
+ throw new ValidationException(
+ String.format(
+ "Could not parse host '%s' in option '%s'. It should follow the format 'http://host_name:port'. Missing port.",
+ host, HOSTS_OPTION.key()));
+ }
+
+ if (httpHost.getSchemeName() == null) {
+ throw new ValidationException(
+ String.format(
+ "Could not parse host '%s' in option '%s'. It should follow the format 'http://host_name:port'. Missing scheme.",
+ host, HOSTS_OPTION.key()));
+ }
+ return httpHost;
+ } catch (Exception e) {
+ throw new ValidationException(
+ String.format(
+ "Could not parse host '%s' in option '%s'. It should follow the format 'http://host_name:port'.",
+ host, HOSTS_OPTION.key()),
+ e);
+ }
+ }
+}
diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSink.java b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSink.java
new file mode 100644
index 00000000000..2137151620e
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSink.java
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch6.table;
+
+import org.apache.inlong.sort.elasticsearch.table.IndexGeneratorFactory;
+import org.apache.inlong.sort.elasticsearch.table.KeyExtractor;
+import org.apache.inlong.sort.elasticsearch.table.RequestFactory;
+import org.apache.inlong.sort.elasticsearch.table.RowElasticsearchSinkFunction;
+import org.apache.inlong.sort.elasticsearch6.ElasticsearchSink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.streaming.connectors.elasticsearch6.RestClientFactory;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.StringUtils;
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
+import org.elasticsearch.client.RestClientBuilder;
+import org.elasticsearch.common.xcontent.XContentType;
+
+import javax.annotation.Nullable;
+
+import java.time.ZoneId;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * A {@link DynamicTableSink} that describes how to create a {@link ElasticsearchSink} from a
+ * logical description.
+ *
+ * Modify from {@link org.apache.flink.streaming.connectors.elasticsearch.table.Elasticsearch6DynamicSink}
+ */
+@PublicEvolving
+final class Elasticsearch6DynamicSink implements DynamicTableSink {
+
+ @VisibleForTesting
+ static final Elasticsearch6RequestFactory REQUEST_FACTORY = new Elasticsearch6RequestFactory();
+
+ private final EncodingFormat> format;
+ private final TableSchema schema;
+ private final Elasticsearch6Configuration config;
+ private final ZoneId localTimeZoneId;
+ private final boolean isDynamicIndexWithSystemTime;
+
+ public Elasticsearch6DynamicSink(
+ EncodingFormat> format,
+ Elasticsearch6Configuration config,
+ TableSchema schema,
+ ZoneId localTimeZoneId) {
+ this(format, config, schema, localTimeZoneId, (ElasticsearchSink.Builder::new));
+ }
+
+ // --------------------------------------------------------------
+ // Hack to make configuration testing possible.
+ //
+ // The code in this block should never be used outside of tests.
+ // Having a way to inject a builder we can assert the builder in
+ // the test. We can not assert everything though, e.g. it is not
+ // possible to assert flushing on checkpoint, as it is configured
+ // on the sink itself.
+ // --------------------------------------------------------------
+
+ private final ElasticSearchBuilderProvider builderProvider;
+
+ @FunctionalInterface
+ interface ElasticSearchBuilderProvider {
+
+ ElasticsearchSink.Builder createBuilder(
+ List httpHosts, RowElasticsearchSinkFunction upsertSinkFunction);
+ }
+
+ Elasticsearch6DynamicSink(
+ EncodingFormat> format,
+ Elasticsearch6Configuration config,
+ TableSchema schema,
+ ZoneId localTimeZoneId,
+ ElasticSearchBuilderProvider builderProvider) {
+ this.format = format;
+ this.schema = schema;
+ this.config = config;
+ this.localTimeZoneId = localTimeZoneId;
+ this.isDynamicIndexWithSystemTime = isDynamicIndexWithSystemTime();
+ this.builderProvider = builderProvider;
+ }
+
+ // --------------------------------------------------------------
+ // End of hack to make configuration testing possible
+ // --------------------------------------------------------------
+
+ public boolean isDynamicIndexWithSystemTime() {
+ IndexGeneratorFactory.IndexHelper indexHelper = new IndexGeneratorFactory.IndexHelper();
+ return indexHelper.checkIsDynamicIndexWithSystemTimeFormat(config.getIndex());
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+ ChangelogMode.Builder builder = ChangelogMode.newBuilder();
+ for (RowKind kind : requestedMode.getContainedKinds()) {
+ if (kind != RowKind.UPDATE_BEFORE) {
+ builder.addContainedKind(kind);
+ }
+ }
+ if (isDynamicIndexWithSystemTime && !requestedMode.containsOnly(RowKind.INSERT)) {
+ throw new ValidationException(
+ "Dynamic indexing based on system time only works on append only stream.");
+ }
+ return builder.build();
+ }
+
+ @Override
+ public SinkFunctionProvider getSinkRuntimeProvider(Context context) {
+ return () -> {
+ SerializationSchema format =
+ this.format.createRuntimeEncoder(context, schema.toRowDataType());
+
+ final RowElasticsearchSinkFunction upsertFunction =
+ new RowElasticsearchSinkFunction(
+ IndexGeneratorFactory.createIndexGenerator(
+ config.getIndex(), schema, localTimeZoneId),
+ config.getDocumentType(),
+ format,
+ XContentType.JSON,
+ REQUEST_FACTORY,
+ KeyExtractor.createKeyExtractor(schema, config.getKeyDelimiter()));
+
+ final ElasticsearchSink.Builder builder =
+ builderProvider.createBuilder(config.getHosts(), upsertFunction);
+
+ builder.setFailureHandler(config.getFailureHandler());
+ builder.setBulkFlushMaxActions(config.getBulkFlushMaxActions());
+ builder.setBulkFlushMaxSizeMb((int) (config.getBulkFlushMaxByteSize() >> 20));
+ builder.setBulkFlushInterval(config.getBulkFlushInterval());
+ builder.setBulkFlushBackoff(config.isBulkFlushBackoffEnabled());
+ config.getBulkFlushBackoffType().ifPresent(builder::setBulkFlushBackoffType);
+ config.getBulkFlushBackoffRetries().ifPresent(builder::setBulkFlushBackoffRetries);
+ config.getBulkFlushBackoffDelay().ifPresent(builder::setBulkFlushBackoffDelay);
+
+ // we must overwrite the default factory which is defined with a lambda because of a bug
+ // in shading lambda serialization shading see FLINK-18006
+ if (config.getUsername().isPresent()
+ && config.getPassword().isPresent()
+ && !StringUtils.isNullOrWhitespaceOnly(config.getUsername().get())
+ && !StringUtils.isNullOrWhitespaceOnly(config.getPassword().get())) {
+ builder.setRestClientFactory(
+ new AuthRestClientFactory(
+ config.getPathPrefix().orElse(null),
+ config.getUsername().get(),
+ config.getPassword().get()));
+ } else {
+ builder.setRestClientFactory(
+ new DefaultRestClientFactory(config.getPathPrefix().orElse(null)));
+ }
+
+ final ElasticsearchSink sink = builder.build();
+
+ if (config.isDisableFlushOnCheckpoint()) {
+ sink.disableFlushOnCheckpoint();
+ }
+
+ return sink;
+ };
+ }
+
+ @Override
+ public DynamicTableSink copy() {
+ return this;
+ }
+
+ @Override
+ public String asSummaryString() {
+ return "Elasticsearch6";
+ }
+
+ /** Serializable {@link RestClientFactory} used by the sink. */
+ @VisibleForTesting
+ static class DefaultRestClientFactory implements RestClientFactory {
+
+ private final String pathPrefix;
+
+ public DefaultRestClientFactory(@Nullable String pathPrefix) {
+ this.pathPrefix = pathPrefix;
+ }
+
+ @Override
+ public void configureRestClientBuilder(RestClientBuilder restClientBuilder) {
+ if (pathPrefix != null) {
+ restClientBuilder.setPathPrefix(pathPrefix);
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ DefaultRestClientFactory that = (DefaultRestClientFactory) o;
+ return Objects.equals(pathPrefix, that.pathPrefix);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(pathPrefix);
+ }
+ }
+
+ /** Serializable {@link RestClientFactory} used by the sink which enable authentication. */
+ @VisibleForTesting
+ static class AuthRestClientFactory implements RestClientFactory {
+
+ private final String pathPrefix;
+ private final String username;
+ private final String password;
+ private transient CredentialsProvider credentialsProvider;
+
+ public AuthRestClientFactory(
+ @Nullable String pathPrefix, String username, String password) {
+ this.pathPrefix = pathPrefix;
+ this.password = password;
+ this.username = username;
+ }
+
+ @Override
+ public void configureRestClientBuilder(RestClientBuilder restClientBuilder) {
+ if (pathPrefix != null) {
+ restClientBuilder.setPathPrefix(pathPrefix);
+ }
+ if (credentialsProvider == null) {
+ credentialsProvider = new BasicCredentialsProvider();
+ credentialsProvider.setCredentials(
+ AuthScope.ANY, new UsernamePasswordCredentials(username, password));
+ }
+ restClientBuilder.setHttpClientConfigCallback(
+ httpAsyncClientBuilder -> httpAsyncClientBuilder.setDefaultCredentialsProvider(
+ credentialsProvider));
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ AuthRestClientFactory that = (AuthRestClientFactory) o;
+ return Objects.equals(pathPrefix, that.pathPrefix)
+ && Objects.equals(username, that.username)
+ && Objects.equals(password, that.password);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(pathPrefix, username, password);
+ }
+ }
+
+ /**
+ * Version-specific creation of {@link org.elasticsearch.action.ActionRequest}s used by the
+ * sink.
+ */
+ private static class Elasticsearch6RequestFactory implements RequestFactory {
+
+ @Override
+ public UpdateRequest createUpdateRequest(
+ String index,
+ String docType,
+ String key,
+ XContentType contentType,
+ byte[] document) {
+ return new UpdateRequest(index, docType, key)
+ .doc(document, contentType)
+ .upsert(document, contentType);
+ }
+
+ @Override
+ public IndexRequest createIndexRequest(
+ String index,
+ String docType,
+ String key,
+ XContentType contentType,
+ byte[] document) {
+ return new IndexRequest(index, docType, key).source(document, contentType);
+ }
+
+ @Override
+ public DeleteRequest createDeleteRequest(String index, String docType, String key) {
+ return new DeleteRequest(index, docType, key);
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ Elasticsearch6DynamicSink that = (Elasticsearch6DynamicSink) o;
+ return Objects.equals(format, that.format)
+ && Objects.equals(schema, that.schema)
+ && Objects.equals(config, that.config)
+ && Objects.equals(builderProvider, that.builderProvider);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(format, schema, config, builderProvider);
+ }
+}
diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSinkFactory.java b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSinkFactory.java
new file mode 100644
index 00000000000..dbf2a0badd9
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSinkFactory.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch6.table;
+
+import org.apache.inlong.sort.elasticsearch.table.ElasticsearchValidationUtils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.api.config.TableConfigOptions;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.SerializationFormatFactory;
+import org.apache.flink.table.utils.TableSchemaUtils;
+import org.apache.flink.util.StringUtils;
+
+import java.time.ZoneId;
+import java.util.Set;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.BULK_FLASH_MAX_SIZE_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.BULK_FLUSH_BACKOFF_DELAY_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.BULK_FLUSH_BACKOFF_TYPE_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.BULK_FLUSH_INTERVAL_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.BULK_FLUSH_MAX_ACTIONS_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.CONNECTION_PATH_PREFIX;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.DOCUMENT_TYPE_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.FAILURE_HANDLER_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.FLUSH_ON_CHECKPOINT_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.FORMAT_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.HOSTS_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.INDEX_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.KEY_DELIMITER_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.PASSWORD_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.USERNAME_OPTION;
+
+/** A {@link DynamicTableSinkFactory} for discovering {@link Elasticsearch6DynamicSink}.
+ * Modify from {@link org.apache.flink.streaming.connectors.elasticsearch.table.Elasticsearch6DynamicSinkFactory}
+ * */
+@Internal
+public class Elasticsearch6DynamicSinkFactory implements DynamicTableSinkFactory {
+
+ private static final String IDENTIFIER = "elasticsearch6-inlong";
+
+ private static final Set> requiredOptions =
+ Stream.of(HOSTS_OPTION, INDEX_OPTION, DOCUMENT_TYPE_OPTION).collect(Collectors.toSet());
+ private static final Set> optionalOptions =
+ Stream.of(
+ KEY_DELIMITER_OPTION,
+ FAILURE_HANDLER_OPTION,
+ FLUSH_ON_CHECKPOINT_OPTION,
+ BULK_FLASH_MAX_SIZE_OPTION,
+ BULK_FLUSH_MAX_ACTIONS_OPTION,
+ BULK_FLUSH_INTERVAL_OPTION,
+ BULK_FLUSH_BACKOFF_TYPE_OPTION,
+ BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION,
+ BULK_FLUSH_BACKOFF_DELAY_OPTION,
+ CONNECTION_PATH_PREFIX,
+ FORMAT_OPTION,
+ PASSWORD_OPTION,
+ USERNAME_OPTION)
+ .collect(Collectors.toSet());
+
+ @Override
+ public DynamicTableSink createDynamicTableSink(Context context) {
+ TableSchema tableSchema = context.getCatalogTable().getSchema();
+ ElasticsearchValidationUtils.validatePrimaryKey(tableSchema);
+ final FactoryUtil.TableFactoryHelper helper =
+ FactoryUtil.createTableFactoryHelper(this, context);
+
+ final EncodingFormat> format =
+ helper.discoverEncodingFormat(SerializationFormatFactory.class, FORMAT_OPTION);
+
+ helper.validate();
+ Configuration configuration = new Configuration();
+ context.getCatalogTable().getOptions().forEach(configuration::setString);
+ Elasticsearch6Configuration config =
+ new Elasticsearch6Configuration(configuration, context.getClassLoader());
+
+ validate(config, configuration);
+
+ return new Elasticsearch6DynamicSink(
+ format,
+ config,
+ TableSchemaUtils.getPhysicalSchema(tableSchema),
+ getLocalTimeZoneId(context.getConfiguration()));
+ }
+
+ ZoneId getLocalTimeZoneId(ReadableConfig readableConfig) {
+ final String zone = readableConfig.get(TableConfigOptions.LOCAL_TIME_ZONE);
+ final ZoneId zoneId =
+ TableConfigOptions.LOCAL_TIME_ZONE.defaultValue().equals(zone)
+ ? ZoneId.systemDefault()
+ : ZoneId.of(zone);
+
+ return zoneId;
+ }
+
+ private void validate(Elasticsearch6Configuration config, Configuration originalConfiguration) {
+ config.getFailureHandler(); // checks if we can instantiate the custom failure handler
+ config.getHosts(); // validate hosts
+ validate(
+ config.getIndex().length() >= 1,
+ () -> String.format("'%s' must not be empty", INDEX_OPTION.key()));
+ int maxActions = config.getBulkFlushMaxActions();
+ validate(
+ maxActions == -1 || maxActions >= 1,
+ () -> String.format(
+ "'%s' must be at least 1. Got: %s",
+ BULK_FLUSH_MAX_ACTIONS_OPTION.key(), maxActions));
+ long maxSize = config.getBulkFlushMaxByteSize();
+ long mb1 = 1024 * 1024;
+ validate(
+ maxSize == -1 || (maxSize >= mb1 && maxSize % mb1 == 0),
+ () -> String.format(
+ "'%s' must be in MB granularity. Got: %s",
+ BULK_FLASH_MAX_SIZE_OPTION.key(),
+ originalConfiguration
+ .get(BULK_FLASH_MAX_SIZE_OPTION)
+ .toHumanReadableString()));
+ validate(
+ config.getBulkFlushBackoffRetries().map(retries -> retries >= 1).orElse(true),
+ () -> String.format(
+ "'%s' must be at least 1. Got: %s",
+ BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION.key(),
+ config.getBulkFlushBackoffRetries().get()));
+ if (config.getUsername().isPresent()
+ && !StringUtils.isNullOrWhitespaceOnly(config.getUsername().get())) {
+ validate(
+ config.getPassword().isPresent()
+ && !StringUtils.isNullOrWhitespaceOnly(config.getPassword().get()),
+ () -> String.format(
+ "'%s' and '%s' must be set at the same time. Got: username '%s' and password '%s'",
+ USERNAME_OPTION.key(),
+ PASSWORD_OPTION.key(),
+ config.getUsername().get(),
+ config.getPassword().orElse("")));
+ }
+ }
+
+ private static void validate(boolean condition, Supplier message) {
+ if (!condition) {
+ throw new ValidationException(message.get());
+ }
+ }
+
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public Set> requiredOptions() {
+ return requiredOptions;
+ }
+
+ @Override
+ public Set> optionalOptions() {
+ return optionalOptions;
+ }
+}
diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch6/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch6/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 00000000000..0ea039a71e0
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch6/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,15 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.inlong.sort.elasticsearch6.table.Elasticsearch6DynamicSinkFactory
diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pom.xml b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pom.xml
index f21472326a9..e4cb21591ee 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pom.xml
+++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pom.xml
@@ -34,6 +34,7 @@
pulsar
jdbc
elasticsearch-base
+ elasticsearch6
diff --git a/licenses/inlong-sort-connectors/LICENSE b/licenses/inlong-sort-connectors/LICENSE
index d5b47b354e5..419af16967e 100644
--- a/licenses/inlong-sort-connectors/LICENSE
+++ b/licenses/inlong-sort-connectors/LICENSE
@@ -959,6 +959,15 @@ License : https://github.com/apache/flink/blob/master/LICENSE
Source : org.apache.flink:flink-connector-elasticsearch-base-3.0.1-1.17.jar (Please note that the software have been modified.)
License : https://github.com/apache/flink/blob/master/LICENSE
+ inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6Configuration.java
+ inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6Configuration.java
+ inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6Configuration.java
+ inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch6/src/main/java/org/apache/inlong/sort/elasticsearch6/Elasticsearch6ApiCallBridge.java
+ inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch6/src/main/java/org/apache/inlong/sort/elasticsearch6/Elasticsearch6BulkProcessorIndexer.java
+ inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch6/src/main/java/org/apache/inlong/sort/elasticsearch6/ElasticsearchSink.java
+Source : org.apache.flink:flink-connector-elasticsearch6-3.0.1-1.17.jar (Please note that the software have been modified.)
+License : https://github.com/apache/flink/blob/master/LICENSE
+
=======================================================================
Apache InLong Subcomponents: