From 11d48cbad5deb96ba933df8ebad7536638de3141 Mon Sep 17 00:00:00 2001 From: XiaoYou201 <58425449+XiaoYou201@users.noreply.github.com> Date: Wed, 8 May 2024 18:36:58 +0800 Subject: [PATCH] [INLONG-10144][Sort] Redis connectors support audit ID (#10146) Co-authored-by: vinnerzhang --- .../redis/source/RedisDynamicTableSource.java | 22 +++++++++++++++---- .../source/RedisRowDataLookupFunction.java | 18 ++++++++++++++- .../redis/table/RedisDynamicTableFactory.java | 7 +++++- 3 files changed, 41 insertions(+), 6 deletions(-) diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/source/RedisDynamicTableSource.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/source/RedisDynamicTableSource.java index 7f28fccdb52..277045eac22 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/source/RedisDynamicTableSource.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/source/RedisDynamicTableSource.java @@ -37,7 +37,11 @@ import java.util.Map; -import static org.apache.flink.table.types.logical.LogicalTypeRoot.*; +import static org.apache.flink.table.types.logical.LogicalTypeRoot.BIGINT; +import static org.apache.flink.table.types.logical.LogicalTypeRoot.DOUBLE; +import static org.apache.flink.table.types.logical.LogicalTypeRoot.VARCHAR; + +//import static org.apache.flink.table.types.logical.LogicalTypeRoot.*; /** * Redis dynamic table source @@ -52,8 +56,13 @@ public class RedisDynamicTableSource implements LookupTableSource { private final RedisLookupOptions redisLookupOptions; private final Map properties; + private final String inlongMetric; + private final String auditHostAndPorts; + private final String auditKeys; + public RedisDynamicTableSource(Map properties, ResolvedSchema tableSchema, - ReadableConfig config, RedisLookupOptions redisLookupOptions) { + ReadableConfig config, RedisLookupOptions redisLookupOptions, String inlongMetric, String auditHostAndPorts, + String auditKeys) { this.properties = properties; Preconditions.checkNotNull(properties, "properties should not be null"); this.tableSchema = tableSchema; @@ -73,11 +82,15 @@ public RedisDynamicTableSource(Map properties, ResolvedSchema ta flinkJedisConfigBase = RedisHandlerServices .findRedisHandler(InlongJedisConfigHandler.class, properties).createFlinkJedisConfig(config); this.redisLookupOptions = redisLookupOptions; + this.inlongMetric = inlongMetric; + this.auditHostAndPorts = auditHostAndPorts; + this.auditKeys = auditKeys; } @Override public DynamicTableSource copy() { - return new RedisDynamicTableSource(properties, tableSchema, config, redisLookupOptions); + return new RedisDynamicTableSource(properties, tableSchema, config, redisLookupOptions, inlongMetric, + auditHostAndPorts, auditKeys); } @Override @@ -88,6 +101,7 @@ public String asSummaryString() { @Override public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) { return TableFunctionProvider.of(new RedisRowDataLookupFunction( - redisMapper.getCommandDescription(), flinkJedisConfigBase, this.redisLookupOptions)); + redisMapper.getCommandDescription(), flinkJedisConfigBase, this.redisLookupOptions, inlongMetric, + auditHostAndPorts, auditKeys)); } } diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/source/RedisRowDataLookupFunction.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/source/RedisRowDataLookupFunction.java index 95c9acd41fd..7186d5a603d 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/source/RedisRowDataLookupFunction.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/source/RedisRowDataLookupFunction.java @@ -17,6 +17,8 @@ package org.apache.inlong.sort.redis.source; +import org.apache.inlong.sort.base.metric.MetricOption; +import org.apache.inlong.sort.base.metric.SourceMetricData; import org.apache.inlong.sort.redis.common.config.RedisLookupOptions; import org.apache.inlong.sort.redis.common.container.InlongRedisCommandsContainer; import org.apache.inlong.sort.redis.common.container.RedisCommandsContainerBuilder; @@ -55,14 +57,25 @@ public class RedisRowDataLookupFunction extends TableFunction { private transient Cache cache; private InlongRedisCommandsContainer redisCommandsContainer; + private SourceMetricData sourceMetricData; + RedisRowDataLookupFunction(RedisCommandDescription redisCommandDescription, - FlinkJedisConfigBase flinkJedisConfigBase, RedisLookupOptions redisLookupOptions) { + FlinkJedisConfigBase flinkJedisConfigBase, RedisLookupOptions redisLookupOptions, String inlongMetric, + String auditHostAndPorts, String auditKeys) { this.flinkJedisConfigBase = flinkJedisConfigBase; this.redisCommand = redisCommandDescription.getCommand(); this.additionalKey = redisCommandDescription.getAdditionalKey(); this.cacheMaxSize = redisLookupOptions.getCacheMaxSize(); this.cacheExpireMs = redisLookupOptions.getCacheExpireMs(); this.maxRetryTimes = redisLookupOptions.getMaxRetryTimes(); + MetricOption metricOption = MetricOption.builder() + .withInlongLabels(inlongMetric) + .withAuditAddress(auditHostAndPorts) + .withAuditKeys(auditKeys) + .build(); + if (metricOption != null) { + sourceMetricData = new SourceMetricData(metricOption); + } } /** @@ -107,6 +120,9 @@ public void eval(Object... keys) { throw new UnsupportedOperationException( String.format("Unsupported for redisCommand: %s", redisCommand)); } + if (sourceMetricData != null) { + sourceMetricData.outputMetricsWithEstimate(rowData); + } if (cache == null) { collect(rowData); } else { diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/table/RedisDynamicTableFactory.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/table/RedisDynamicTableFactory.java index ef6c778a982..54f907ece93 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/table/RedisDynamicTableFactory.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/table/RedisDynamicTableFactory.java @@ -95,10 +95,15 @@ public class RedisDynamicTableFactory implements DynamicTableSourceFactory, Dyna public DynamicTableSource createDynamicTableSource(Context context) { FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); ReadableConfig config = helper.getOptions(); + helper.validate(); validateConfigOptions(config, SUPPORT_SOURCE_COMMANDS); + String inlongMetric = config.getOptional(INLONG_METRIC).orElse(null); + String auditHostAndPorts = config.get(INLONG_AUDIT); + String auditKeys = config.get(AUDIT_KEYS); return new RedisDynamicTableSource(context.getCatalogTable().getOptions(), - context.getCatalogTable().getResolvedSchema(), config, getJdbcLookupOptions(config)); + context.getCatalogTable().getResolvedSchema(), config, getJdbcLookupOptions(config), inlongMetric, + auditHostAndPorts, auditKeys); } @Override