diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/pom.xml b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/pom.xml
index cfaaee3f651..57d8d16d7a7 100644
--- a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/pom.xml
+++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/pom.xml
@@ -51,6 +51,11 @@
org.testcontainers
kafka
+
+ org.testcontainers
+ pulsar
+ test
+
org.testcontainers
mongodb
@@ -157,6 +162,13 @@
jedis
test
+
+
+ org.apache.pulsar
+ pulsar-client
+ ${pulsar.version}
+
+
@@ -246,6 +258,14 @@
jar
${project.build.directory}/dependencies
+
+ org.apache.inlong
+ sort-connector-pulsar-v1.15
+ ${project.version}
+ sort-connector-pulsar.jar
+ jar
+ ${project.build.directory}/dependencies
+
diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Pulsar2StarRocksTest.java b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Pulsar2StarRocksTest.java
new file mode 100644
index 00000000000..e5252d0b4a2
--- /dev/null
+++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Pulsar2StarRocksTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.tests;
+
+import org.apache.inlong.sort.tests.utils.FlinkContainerTestEnvJRE8;
+import org.apache.inlong.sort.tests.utils.JdbcProxy;
+import org.apache.inlong.sort.tests.utils.StarRocksContainer;
+import org.apache.inlong.sort.tests.utils.TestUtils;
+
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.PulsarContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.utility.DockerImageName;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+import static org.apache.inlong.sort.tests.utils.StarRocksManager.INTER_CONTAINER_STAR_ROCKS_ALIAS;
+import static org.apache.inlong.sort.tests.utils.StarRocksManager.STAR_ROCKS_LOG;
+import static org.apache.inlong.sort.tests.utils.StarRocksManager.getNewStarRocksImageName;
+import static org.apache.inlong.sort.tests.utils.StarRocksManager.initializeStarRocksTable;
+
+public class Pulsar2StarRocksTest extends FlinkContainerTestEnvJRE8 {
+
+ private static final String PULSAR_TEST_FIRST_MESSAGE =
+ "{\"id\": 1, \"name\": \"Alice\", \"description\": \"Hello, Pulsar\"}";
+ private static final String PULSAR_TEST_SECOND_MESSAGE =
+ "{\"id\": 2, \"name\": \"Bob\", \"description\": \"Goodbye, Pulsar\"}";
+
+ private static final Logger LOG = LoggerFactory.getLogger(Pulsar2StarRocksTest.class);
+
+ public static final Logger PULSAR_LOG = LoggerFactory.getLogger(PulsarContainer.class);
+ private static final Path jdbcJar = TestUtils.getResource("sort-connector-starrocks.jar");
+ private static final Path mysqlJdbcJar = TestUtils.getResource("mysql-driver.jar");
+
+ private static final Path pulsarJar = TestUtils.getResource("sort-connector-pulsar.jar");
+
+ private static final String sqlFile;
+
+ static {
+ try {
+ URI pulsarSqlFile = Objects
+ .requireNonNull(Pulsar2StarRocksTest.class.getResource("/flinkSql/pulsar_test.sql")).toURI();
+ sqlFile = Paths.get(pulsarSqlFile).toString();
+ } catch (URISyntaxException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @ClassRule
+ public static final PulsarContainer PULSAR = new PulsarContainer(DockerImageName.parse("apachepulsar/pulsar:2.8.2"))
+ .withNetwork(NETWORK)
+ .withNetworkAliases("pulsar")
+ .withLogConsumer(new Slf4jLogConsumer(PULSAR_LOG));
+
+ @ClassRule
+ public static final StarRocksContainer STAR_ROCKS = (StarRocksContainer) new StarRocksContainer(
+ getNewStarRocksImageName())
+ .withExposedPorts(9030, 8030, 8040)
+ .withNetwork(NETWORK)
+ .withNetworkAliases(INTER_CONTAINER_STAR_ROCKS_ALIAS)
+ .withLogConsumer(new Slf4jLogConsumer(STAR_ROCKS_LOG));
+
+ @Before
+ public void setup() {
+ waitUntilJobRunning(Duration.ofSeconds(30));
+ initializePulsarTopic();
+ initializeStarRocksTable(STAR_ROCKS);
+ }
+
+ private void initializePulsarTopic() {
+ try {
+ Container.ExecResult result = PULSAR.execInContainer("bin/pulsar-admin", "topics", "create",
+ "persistent://public/default/test-topic");
+ LOG.info("Create Pulsar topic: test-topic, std: {}", result.getStdout());
+ if (result.getExitCode() != 0) {
+ throw new RuntimeException("Init Pulsar topic failed. Exit code:" + result.getExitCode());
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @AfterClass
+ public static void teardown() {
+ if (PULSAR != null) {
+ PULSAR.stop();
+ }
+ if (STAR_ROCKS != null) {
+ STAR_ROCKS.stop();
+ }
+ }
+
+ @Test
+ public void testPulsarToStarRocks() throws Exception {
+ submitSQLJob(sqlFile, pulsarJar, jdbcJar, mysqlJdbcJar);
+ waitUntilJobRunning(Duration.ofSeconds(10));
+
+ try (PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(PULSAR.getPulsarBrokerUrl()).build()) {
+ Producer producer = pulsarClient.newProducer(Schema.STRING)
+ .topic("persistent://public/default/test-topic")
+ .create();
+
+ producer.send(PULSAR_TEST_FIRST_MESSAGE);
+ producer.send(PULSAR_TEST_SECOND_MESSAGE);
+
+ producer.close();
+ }
+
+ JdbcProxy proxy = new JdbcProxy(STAR_ROCKS.getJdbcUrl(), STAR_ROCKS.getUsername(),
+ STAR_ROCKS.getPassword(), STAR_ROCKS.getDriverClassName());
+
+ List expectedResult = Arrays.asList(
+ "1,Alice,Hello, Pulsar",
+ "2,Bob,Goodbye, Pulsar");
+ proxy.checkResultWithTimeout(expectedResult, "test_output1", 3, 60000L);
+ }
+
+}
diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/flinkSql/pulsar_test.sql b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/flinkSql/pulsar_test.sql
new file mode 100644
index 00000000000..1e9ac0e1c24
--- /dev/null
+++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/flinkSql/pulsar_test.sql
@@ -0,0 +1,39 @@
+CREATE TABLE test_input1
+(
+ `id` INT,
+ name STRING,
+ description STRING
+)
+WITH (
+ 'connector' = 'pulsar-inlong',
+ 'topic' = 'persistent://public/default/test-topic',
+ 'service-url' = 'pulsar://pulsar:6650',
+ 'admin-url' = 'http://pulsar:8080',
+ 'format' = 'json',
+ 'scan.startup.mode' = 'earliest'
+);
+
+
+CREATE TABLE test_output1
+(
+ id INT,
+ name STRING,
+ description STRING
+)
+ WITH (
+ 'connector' = 'starrocks-inlong',
+ 'jdbc-url' = 'jdbc:mysql://starrocks:9030',
+ 'load-url'='starrocks:8030',
+ 'database-name'='test',
+ 'table-name' = 'test_output1',
+ 'username' = 'inlong',
+ 'password' = 'inlong',
+ 'sink.properties.format' = 'json',
+ 'sink.properties.strip_outer_array' = 'true',
+ 'sink.buffer-flush.interval-ms' = '1000'
+);
+
+
+INSERT INTO test_output1
+SELECT id, name, description
+FROM test_input1;
diff --git a/pom.xml b/pom.xml
index a313eb73eee..21f4f746135 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1200,6 +1200,12 @@
${testcontainers.version}
test
+
+ org.testcontainers
+ pulsar
+ ${testcontainers.version}
+ test
+
org.hamcrest