Skip to content

Commit

Permalink
[INLONG-10144][Sort] Redis connectors support audit ID (apache#10146)
Browse files Browse the repository at this point in the history
Co-authored-by: vinnerzhang <[email protected]>
  • Loading branch information
2 people authored and herywang committed May 9, 2024
1 parent 4c64519 commit 11d48cb
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,11 @@

import java.util.Map;

import static org.apache.flink.table.types.logical.LogicalTypeRoot.*;
import static org.apache.flink.table.types.logical.LogicalTypeRoot.BIGINT;
import static org.apache.flink.table.types.logical.LogicalTypeRoot.DOUBLE;
import static org.apache.flink.table.types.logical.LogicalTypeRoot.VARCHAR;

//import static org.apache.flink.table.types.logical.LogicalTypeRoot.*;

/**
* Redis dynamic table source
Expand All @@ -52,8 +56,13 @@ public class RedisDynamicTableSource implements LookupTableSource {
private final RedisLookupOptions redisLookupOptions;
private final Map<String, String> properties;

private final String inlongMetric;
private final String auditHostAndPorts;
private final String auditKeys;

public RedisDynamicTableSource(Map<String, String> properties, ResolvedSchema tableSchema,
ReadableConfig config, RedisLookupOptions redisLookupOptions) {
ReadableConfig config, RedisLookupOptions redisLookupOptions, String inlongMetric, String auditHostAndPorts,
String auditKeys) {
this.properties = properties;
Preconditions.checkNotNull(properties, "properties should not be null");
this.tableSchema = tableSchema;
Expand All @@ -73,11 +82,15 @@ public RedisDynamicTableSource(Map<String, String> properties, ResolvedSchema ta
flinkJedisConfigBase = RedisHandlerServices
.findRedisHandler(InlongJedisConfigHandler.class, properties).createFlinkJedisConfig(config);
this.redisLookupOptions = redisLookupOptions;
this.inlongMetric = inlongMetric;
this.auditHostAndPorts = auditHostAndPorts;
this.auditKeys = auditKeys;
}

@Override
public DynamicTableSource copy() {
return new RedisDynamicTableSource(properties, tableSchema, config, redisLookupOptions);
return new RedisDynamicTableSource(properties, tableSchema, config, redisLookupOptions, inlongMetric,
auditHostAndPorts, auditKeys);
}

@Override
Expand All @@ -88,6 +101,7 @@ public String asSummaryString() {
@Override
public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
return TableFunctionProvider.of(new RedisRowDataLookupFunction(
redisMapper.getCommandDescription(), flinkJedisConfigBase, this.redisLookupOptions));
redisMapper.getCommandDescription(), flinkJedisConfigBase, this.redisLookupOptions, inlongMetric,
auditHostAndPorts, auditKeys));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.inlong.sort.redis.source;

import org.apache.inlong.sort.base.metric.MetricOption;
import org.apache.inlong.sort.base.metric.SourceMetricData;
import org.apache.inlong.sort.redis.common.config.RedisLookupOptions;
import org.apache.inlong.sort.redis.common.container.InlongRedisCommandsContainer;
import org.apache.inlong.sort.redis.common.container.RedisCommandsContainerBuilder;
Expand Down Expand Up @@ -55,14 +57,25 @@ public class RedisRowDataLookupFunction extends TableFunction<RowData> {
private transient Cache<RowData, RowData> cache;
private InlongRedisCommandsContainer redisCommandsContainer;

private SourceMetricData sourceMetricData;

RedisRowDataLookupFunction(RedisCommandDescription redisCommandDescription,
FlinkJedisConfigBase flinkJedisConfigBase, RedisLookupOptions redisLookupOptions) {
FlinkJedisConfigBase flinkJedisConfigBase, RedisLookupOptions redisLookupOptions, String inlongMetric,
String auditHostAndPorts, String auditKeys) {
this.flinkJedisConfigBase = flinkJedisConfigBase;
this.redisCommand = redisCommandDescription.getCommand();
this.additionalKey = redisCommandDescription.getAdditionalKey();
this.cacheMaxSize = redisLookupOptions.getCacheMaxSize();
this.cacheExpireMs = redisLookupOptions.getCacheExpireMs();
this.maxRetryTimes = redisLookupOptions.getMaxRetryTimes();
MetricOption metricOption = MetricOption.builder()
.withInlongLabels(inlongMetric)
.withAuditAddress(auditHostAndPorts)
.withAuditKeys(auditKeys)
.build();
if (metricOption != null) {
sourceMetricData = new SourceMetricData(metricOption);
}
}

/**
Expand Down Expand Up @@ -107,6 +120,9 @@ public void eval(Object... keys) {
throw new UnsupportedOperationException(
String.format("Unsupported for redisCommand: %s", redisCommand));
}
if (sourceMetricData != null) {
sourceMetricData.outputMetricsWithEstimate(rowData);
}
if (cache == null) {
collect(rowData);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,15 @@ public class RedisDynamicTableFactory implements DynamicTableSourceFactory, Dyna
public DynamicTableSource createDynamicTableSource(Context context) {
FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
ReadableConfig config = helper.getOptions();

helper.validate();
validateConfigOptions(config, SUPPORT_SOURCE_COMMANDS);
String inlongMetric = config.getOptional(INLONG_METRIC).orElse(null);
String auditHostAndPorts = config.get(INLONG_AUDIT);
String auditKeys = config.get(AUDIT_KEYS);
return new RedisDynamicTableSource(context.getCatalogTable().getOptions(),
context.getCatalogTable().getResolvedSchema(), config, getJdbcLookupOptions(config));
context.getCatalogTable().getResolvedSchema(), config, getJdbcLookupOptions(config), inlongMetric,
auditHostAndPorts, auditKeys);
}

@Override
Expand Down

0 comments on commit 11d48cb

Please sign in to comment.