Skip to content

Commit

Permalink
[INLONG-10287][Agent] Update the Redis Source (#11084)
Browse files Browse the repository at this point in the history
  • Loading branch information
emptyOVO authored Oct 8, 2024
1 parent 624c63a commit cc82864
Show file tree
Hide file tree
Showing 8 changed files with 989 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,22 @@ public class TaskConstants extends CommonConstants {
public static final String TASK_SQLSERVER_UNIX_TIMESTAMP_FORMAT_ENABLE =
"task.sqlserverTask.unixTimestampFormatEnable";

public static final String TASK_REDIS_PORT = "task.redisTask.port";
public static final String TASK_REDIS_HOSTNAME = "task.redisTask.hostname";
public static final String TASK_REDIS_SSL = "task.redisTask.ssl";
public static final String TASK_REDIS_AUTHUSER = "task.redisTask.authUser";
public static final String TASK_REDIS_AUTHPASSWORD = "task.redisTask.authPassword";
public static final String TASK_REDIS_READTIMEOUT = "task.redisTask.readTimeout";
public static final String TASK_REDIS_REPLID = "task.redisTask.replId";
public static final String TASK_REDIS_OFFSET = "task.redisTask.offset";
public static final String TASK_REDIS_DB_NUMBER = "task.redisTask.dbNumber";
public static final String TASK_REDIS_COMMAND = "task.redisTask.command";
public static final String TASK_REDIS_KEYS = "task.redisTask.keys";
public static final String TASK_REDIS_FIELD_OR_MEMBER = "task.redisTask.fieldOrMember";
public static final String TASK_REDIS_IS_SUBSCRIBE = "task.redisTask.isSubscribe";
public static final String TASK_REDIS_SUBOPERATION = "task.redisTask.subOperation";
public static final String TASK_REDIS_SYNC_FREQ = "task.redisTask.syncFreq";

public static final String TASK_STATE = "task.state";

public static final String INSTANCE_STATE = "instance.state";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.inlong.agent.pojo;

import lombok.Data;

@Data
public class RedisTask {

Expand All @@ -30,6 +29,13 @@ public class RedisTask {
private String readTimeout;
private String queueSize;
private String replId;
private String dbNumber;
private String command;
private String keys;
private String fieldOrMember;
private Boolean isSubscribe;
private String syncFreq;
private String subOperations;

@Data
public static class RedisTaskConfig {
Expand All @@ -42,5 +48,12 @@ public static class RedisTaskConfig {
private String timeout;
private String queueSize;
private String replId;
private String dbNumber;
private String command;
private String keys;
private String fieldOrMember;
private Boolean isSubscribe;
private String syncFreq;
private String subOperations;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public class TaskProfileDto {
public static final String DEFAULT_PULSAR_TASK = "org.apache.inlong.agent.plugin.task.PulsarTask";
public static final String DEFAULT_MONGODB_TASK = "org.apache.inlong.agent.plugin.task.MongoDBTask";
public static final String DEFAULT_ORACLE_TASK = "org.apache.inlong.agent.plugin.task.OracleTask";
public static final String DEFAULT_REDIS_TASK = "org.apache.inlong.agent.plugin.task.RedisTask";
public static final String DEFAULT_POSTGRESQL_TASK = "org.apache.inlong.agent.plugin.task.PostgreSQLTask";
public static final String DEFAULT_MQTT_TASK = "org.apache.inlong.agent.plugin.task.MqttTask";
public static final String DEFAULT_SQLSERVER_TASK = "org.apache.inlong.agent.plugin.task.SQLServerTask";
Expand Down Expand Up @@ -274,8 +275,14 @@ private static RedisTask getRedisTask(DataConfig dataConfig) {
redisTask.setPort(config.getPort());
redisTask.setSsl(config.getSsl());
redisTask.setReadTimeout(config.getTimeout());
redisTask.setQueueSize(config.getQueueSize());
redisTask.setReplId(config.getReplId());
redisTask.setCommand(config.getCommand());
redisTask.setDbNumber(config.getDbNumber());
redisTask.setKeys(config.getKeys());
redisTask.setFieldOrMember(config.getFieldOrMember());
redisTask.setIsSubscribe(config.getIsSubscribe());
redisTask.setSyncFreq(config.getSyncFreq());
redisTask.setSubOperations(config.getSubOperations());

return redisTask;
}
Expand Down Expand Up @@ -521,6 +528,7 @@ public static TaskProfile convertToTaskProfile(DataConfig dataConfig) {
profileDto.setTask(task);
break;
case REDIS:
task.setTaskClass(DEFAULT_REDIS_TASK);
RedisTask redisTask = getRedisTask(dataConfig);
task.setRedisTask(redisTask);
task.setSource(REDIS_SOURCE);
Expand Down
4 changes: 4 additions & 0 deletions inlong-agent/agent-plugins/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@
<artifactId>ojdbc8</artifactId>
<version>${ojdbc.version}</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
</dependency>
<dependency>
<groupId>org.apache.inlong</groupId>
<artifactId>agent-common</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.agent.plugin.instance;

import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.constant.TaskConstants;

public class RedisInstance extends CommonInstance {

@Override
public void setInodeInfo(InstanceProfile profile) {
profile.set(TaskConstants.INODE_INFO, "");
}
}
Loading

0 comments on commit cc82864

Please sign in to comment.