Skip to content

Commit

Permalink
[INLONG-11266][Sort] Add end-to-end test case for sort-connector-puls…
Browse files Browse the repository at this point in the history
…ar-v1.15
  • Loading branch information
PeterZh6 committed Oct 2, 2024
1 parent b709fc2 commit 586ba73
Show file tree
Hide file tree
Showing 4 changed files with 226 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
<inlong.root.dir>${project.parent.parent.parent.basedir}</inlong.root.dir>
<flink.version>1.15.4</flink.version>
<mongodb-driver-sync.version>4.10.2</mongodb-driver-sync.version>
<pulsar-client.version>2.8.2</pulsar-client.version>
</properties>

<dependencies>
Expand All @@ -51,6 +52,11 @@
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>pulsar</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>mongodb</artifactId>
Expand Down Expand Up @@ -157,6 +163,24 @@
<artifactId>jedis</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<version>${pulsar-client.version}</version>
</dependency>

</dependencies>

<build>
Expand Down Expand Up @@ -246,6 +270,14 @@
<type>jar</type>
<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
</artifactItem>
<artifactItem>
<groupId>org.apache.inlong</groupId>
<artifactId>sort-connector-pulsar-v1.15</artifactId>
<version>${project.version}</version>
<destFileName>sort-connector-pulsar.jar</destFileName>
<type>jar</type>
<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
</artifactItem>
</artifactItems>
</configuration>
<executions>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sort.tests;

import org.apache.inlong.sort.tests.utils.FlinkContainerTestEnvJRE8;
import org.apache.inlong.sort.tests.utils.JdbcProxy;
import org.apache.inlong.sort.tests.utils.StarRocksContainer;
import org.apache.inlong.sort.tests.utils.TestUtils;

import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.PulsarContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.utility.DockerImageName;

import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;

import static org.apache.inlong.sort.tests.utils.StarRocksManager.INTER_CONTAINER_STAR_ROCKS_ALIAS;
import static org.apache.inlong.sort.tests.utils.StarRocksManager.STAR_ROCKS_LOG;
import static org.apache.inlong.sort.tests.utils.StarRocksManager.getNewStarRocksImageName;
import static org.apache.inlong.sort.tests.utils.StarRocksManager.initializeStarRocksTable;

public class Pulsar2StarRocksTest extends FlinkContainerTestEnvJRE8 {

private static final String PULSAR_TEST_FIRST_MESSAGE =
"{\"id\": 1, \"name\": \"Alice\", \"description\": \"Hello, Pulsar\"}";
private static final String PULSAR_TEST_SECOND_MESSAGE =
"{\"id\": 2, \"name\": \"Bob\", \"description\": \"Goodbye, Pulsar\"}";

private static final Logger LOG = LoggerFactory.getLogger(Pulsar2StarRocksTest.class);

public static final Logger PULSAR_LOG = LoggerFactory.getLogger(PulsarContainer.class);
private static final Path jdbcJar = TestUtils.getResource("sort-connector-starrocks.jar");
private static final Path mysqlJdbcJar = TestUtils.getResource("mysql-driver.jar");

private static final Path pulsarJar = TestUtils.getResource("sort-connector-pulsar.jar");

private static final String sqlFile;

static {
try {
URI pulsarSqlFile = Objects
.requireNonNull(Pulsar2StarRocksTest.class.getResource("/flinkSql/pulsar_test.sql")).toURI();
sqlFile = Paths.get(pulsarSqlFile).toString();
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
}

@ClassRule
public static final PulsarContainer PULSAR = new PulsarContainer(DockerImageName.parse("apachepulsar/pulsar:2.8.2"))
.withNetwork(NETWORK)
.withNetworkAliases("pulsar")
.withLogConsumer(new Slf4jLogConsumer(PULSAR_LOG));

@ClassRule
public static final StarRocksContainer STAR_ROCKS = (StarRocksContainer) new StarRocksContainer(
getNewStarRocksImageName())
.withExposedPorts(9030, 8030, 8040)
.withNetwork(NETWORK)
.withNetworkAliases(INTER_CONTAINER_STAR_ROCKS_ALIAS)
.withLogConsumer(new Slf4jLogConsumer(STAR_ROCKS_LOG));

@Before
public void setup() {
waitUntilJobRunning(Duration.ofSeconds(30));
initializePulsarTopic();
initializeStarRocksTable(STAR_ROCKS);
}

private void initializePulsarTopic() {
try {
Container.ExecResult result = PULSAR.execInContainer("bin/pulsar-admin", "topics", "create",
"persistent://public/default/test-topic");
LOG.info("Create Pulsar topic: test-topic, std: {}", result.getStdout());
if (result.getExitCode() != 0) {
throw new RuntimeException("Init Pulsar topic failed. Exit code:" + result.getExitCode());
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@AfterClass
public static void teardown() {
if (PULSAR != null) {
PULSAR.stop();
}
if (STAR_ROCKS != null) {
STAR_ROCKS.stop();
}
}

@Test
public void testPulsarToStarRocks() throws Exception {
submitSQLJob(sqlFile, pulsarJar, jdbcJar, mysqlJdbcJar);
waitUntilJobRunning(Duration.ofSeconds(10));

try (PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(PULSAR.getPulsarBrokerUrl()).build()) {
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic("persistent://public/default/test-topic")
.create();

producer.send(PULSAR_TEST_FIRST_MESSAGE);
producer.send(PULSAR_TEST_SECOND_MESSAGE);

producer.close();
}

JdbcProxy proxy = new JdbcProxy(STAR_ROCKS.getJdbcUrl(), STAR_ROCKS.getUsername(),
STAR_ROCKS.getPassword(), STAR_ROCKS.getDriverClassName());

List<String> expectedResult = Arrays.asList(
"1,Alice,Hello, Pulsar",
"2,Bob,Goodbye, Pulsar");
proxy.checkResultWithTimeout(expectedResult, "test_output1", 3, 60000L);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
CREATE TABLE test_input1
(
`id` INT,
name STRING,
description STRING
)
WITH (
'connector' = 'pulsar-inlong',
'topic' = 'persistent://public/default/test-topic',
'service-url' = 'pulsar://pulsar:6650',
'admin-url' = 'http://pulsar:8080',
'format' = 'json',
'scan.startup.mode' = 'earliest'
);


CREATE TABLE test_output1
(
id INT,
name STRING,
description STRING
)
WITH (
'connector' = 'starrocks-inlong',
'jdbc-url' = 'jdbc:mysql://starrocks:9030',
'load-url'='starrocks:8030',
'database-name'='test',
'table-name' = 'test_output1',
'username' = 'inlong',
'password' = 'inlong',
'sink.properties.format' = 'json',
'sink.properties.strip_outer_array' = 'true',
'sink.buffer-flush.interval-ms' = '1000'
);


INSERT INTO test_output1
SELECT id, name, description
FROM test_input1;
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1200,6 +1200,12 @@
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>pulsar</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.hamcrest</groupId>
Expand Down

0 comments on commit 586ba73

Please sign in to comment.