Skip to content

Commit

Permalink
NIFI-14109 Refactored remaining processors and control services to be…
Browse files Browse the repository at this point in the history
… uniform when creating properties and relationships.
  • Loading branch information
dan-s1 committed Dec 27, 2024
1 parent 2f73167 commit 6b945e4
Show file tree
Hide file tree
Showing 90 changed files with 800 additions and 870 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -139,23 +139,16 @@ abstract class AbstractAMQPProcessor<T extends AMQPWorker> extends AbstractProce
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build();

private static final List<PropertyDescriptor> propertyDescriptors;

static {
propertyDescriptors = List.of(
BROKERS,
HOST, PORT,
V_HOST,
USER,
PASSWORD,
AMQP_VERSION,
SSL_CONTEXT_SERVICE,
USE_CERT_AUTHENTICATION);
}

protected static List<PropertyDescriptor> getCommonPropertyDescriptors() {
return propertyDescriptors;
}
protected static final List<PropertyDescriptor> PARENT_PROPERTIES = List.of(
BROKERS,
HOST, PORT,
V_HOST,
USER,
PASSWORD,
AMQP_VERSION,
SSL_CONTEXT_SERVICE,
USE_CERT_AUTHENTICATION
);

private BlockingQueue<AMQPResource<T>> resourceQueue;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,12 @@
import org.apache.nifi.processor.util.StandardValidators;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

@Tags({"amqp", "rabbit", "get", "message", "receive", "consume"})
@InputRequirement(Requirement.INPUT_FORBIDDEN)
Expand Down Expand Up @@ -163,28 +162,24 @@ public class ConsumeAMQP extends AbstractAMQPProcessor<AMQPConsumer> {
.description("All FlowFiles that are received from the AMQP queue are routed to this relationship")
.build();

private static final List<PropertyDescriptor> propertyDescriptors;
private static final Set<Relationship> relationships;

private static final ObjectMapper objectMapper;

static {
List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(QUEUE);
properties.add(AUTO_ACKNOWLEDGE);
properties.add(BATCH_SIZE);
properties.add(PREFETCH_COUNT);
properties.add(HEADER_FORMAT);
properties.add(HEADER_KEY_PREFIX);
properties.add(HEADER_SEPARATOR);
properties.add(REMOVE_CURLY_BRACES);
properties.addAll(getCommonPropertyDescriptors());
propertyDescriptors = Collections.unmodifiableList(properties);

relationships = Set.of(REL_SUCCESS);

objectMapper = new ObjectMapper();
}
private static final List<PropertyDescriptor> PROPERTIES = Stream.concat(
Stream.of(
QUEUE,
AUTO_ACKNOWLEDGE,
BATCH_SIZE,
PREFETCH_COUNT,
HEADER_FORMAT,
HEADER_KEY_PREFIX,
HEADER_SEPARATOR,
REMOVE_CURLY_BRACES
), PARENT_PROPERTIES.stream()
).toList();

private static final Set<Relationship> RELATIONSHIPS = Set.of(
REL_SUCCESS
);

private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

/**
* Will construct a {@link FlowFile} containing the body of the consumed AMQP message (if {@link GetResponse} returned by {@link AMQPConsumer} is
Expand Down Expand Up @@ -303,7 +298,7 @@ private static String convertMapToString(Map<String, Object> headers, String val
}

private static String convertMapToJSONString(Map<String, Object> headers) throws JsonProcessingException {
return objectMapper.writeValueAsString(headers);
return OBJECT_MAPPER.writeValueAsString(headers);
}

@Override
Expand All @@ -320,12 +315,12 @@ protected synchronized AMQPConsumer createAMQPWorker(final ProcessContext contex

@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return propertyDescriptors;
return PROPERTIES;
}

@Override
public Set<Relationship> getRelationships() {
return relationships;
return RELATIONSHIPS;
}

public enum OutputHeaderFormat implements DescribedValue {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public class PublishAMQP extends AbstractAMQPProcessor<AMQPPublisher> {
HEADERS_PATTERN,
HEADER_SEPARATOR
),
getCommonPropertyDescriptors().stream()
PARENT_PROPERTIES.stream()
).toList();

private final static Set<Relationship> RELATIONSHIPS = Set.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public class StandardAsanaClientProviderService extends AbstractControllerServic
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.build();

protected static final List<PropertyDescriptor> DESCRIPTORS = List.of(
private static final List<PropertyDescriptor> PROPERTIES = List.of(
PROP_ASANA_API_BASE_URL,
PROP_ASANA_PERSONAL_ACCESS_TOKEN,
PROP_ASANA_WORKSPACE_NAME
Expand All @@ -83,7 +83,7 @@ public class StandardAsanaClientProviderService extends AbstractControllerServic

@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return DESCRIPTORS;
return PROPERTIES;
}

@OnEnabled
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
Expand Down Expand Up @@ -110,6 +109,12 @@ STRATEGY_NAME_CSE_C, new ClientSideCEncryptionStrategy()
.defaultValue(RegionUtilV1.createAllowableValue(Regions.DEFAULT_REGION).getValue())
.build();

private static final List<PropertyDescriptor> PROPERTIES = List.of(
ENCRYPTION_STRATEGY,
ENCRYPTION_VALUE,
KMS_REGION
);

private String keyValue = "";
private String kmsRegion = "";
private S3EncryptionStrategy encryptionStrategy = new NoOpEncryptionStrategy();
Expand Down Expand Up @@ -189,11 +194,7 @@ protected Collection<ValidationResult> customValidate(final ValidationContext va

@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(ENCRYPTION_STRATEGY);
properties.add(ENCRYPTION_VALUE);
properties.add(KMS_REGION);
return Collections.unmodifiableList(properties);
return PROPERTIES;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import java.net.URI;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.List;
Expand Down Expand Up @@ -137,7 +136,7 @@ public class AmazonGlueSchemaRegistry extends AbstractControllerService implemen

private static final PropertyDescriptor PROXY_CONFIGURATION_SERVICE = ProxyConfiguration.createProxyConfigPropertyDescriptor(PROXY_SPECS);

private static final List<PropertyDescriptor> PROPERTIES = new ArrayList<>(Arrays.asList(
private static final List<PropertyDescriptor> PROPERTIES = List.of(
SCHEMA_REGISTRY_NAME,
REGION,
COMMUNICATIONS_TIMEOUT,
Expand All @@ -146,7 +145,7 @@ public class AmazonGlueSchemaRegistry extends AbstractControllerService implemen
AWS_CREDENTIALS_PROVIDER_SERVICE,
PROXY_CONFIGURATION_SERVICE,
SSL_CONTEXT_SERVICE
));
);


@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ public class StandardAzureCredentialsControllerService extends AbstractControlle
.build();

private static final List<PropertyDescriptor> PROPERTIES = List.of(
CREDENTIAL_CONFIGURATION_STRATEGY, MANAGED_IDENTITY_CLIENT_ID
CREDENTIAL_CONFIGURATION_STRATEGY,
MANAGED_IDENTITY_CLIENT_ID
);

private TokenCredential credentials;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,15 @@ protected void createCosmosClient(final String uri, final String accessKey, fina
.buildClient();
}

static List<PropertyDescriptor> descriptors = List.of(
private static final List<PropertyDescriptor> PROPERTIES = List.of(
AzureCosmosDBUtils.URI,
AzureCosmosDBUtils.DB_ACCESS_KEY,
AzureCosmosDBUtils.CONSISTENCY
);

@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return descriptors;
return PROPERTIES;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public class StandardKustoIngestService extends AbstractControllerService implem
.addValidator(StandardValidators.URL_VALIDATOR)
.build();

private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = List.of(
private static final List<PropertyDescriptor> PROPERTIES = List.of(
AUTHENTICATION_STRATEGY,
APPLICATION_CLIENT_ID,
APPLICATION_KEY,
Expand All @@ -127,7 +127,7 @@ public class StandardKustoIngestService extends AbstractControllerService implem

@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTY_DESCRIPTORS;
return PROPERTIES;
}

@OnEnabled
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public class StandardKustoQueryService extends AbstractControllerService impleme
.dependsOn(AUTHENTICATION_STRATEGY, KustoAuthenticationStrategy.APPLICATION_CREDENTIALS)
.build();

private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = List.of(
private static final List<PropertyDescriptor> PROPERTIES = List.of(
CLUSTER_URI,
AUTHENTICATION_STRATEGY,
APPLICATION_CLIENT_ID,
Expand All @@ -95,7 +95,7 @@ public class StandardKustoQueryService extends AbstractControllerService impleme

@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTY_DESCRIPTORS;
return PROPERTIES;
}

public static final Pair<String, String> NIFI_SOURCE = Pair.of("processor", "nifi-source");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public class AzureEventHubRecordSink extends AbstractControllerService implement
.required(false)
.build();

private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = List.of(
private static final List<PropertyDescriptor> PROPERTIES = List.of(
SERVICE_BUS_ENDPOINT,
EVENT_HUB_NAMESPACE,
EVENT_HUB_NAME,
Expand All @@ -143,7 +143,7 @@ public class AzureEventHubRecordSink extends AbstractControllerService implement

@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTY_DESCRIPTORS;
return PROPERTIES;
}

protected EventHubProducerClient createEventHubClient(final String namespace,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,21 +148,23 @@ public class ConfluentSchemaRegistry extends AbstractControllerService implement
.sensitive(true)
.build();

private static final List<PropertyDescriptor> PROPERTIES = List.of(
SCHEMA_REGISTRY_URLS,
SSL_CONTEXT,
TIMEOUT,
CACHE_SIZE,
CACHE_EXPIRATION,
AUTHENTICATION_TYPE,
USERNAME,
PASSWORD
);

private volatile SchemaRegistryClient client;


@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(SCHEMA_REGISTRY_URLS);
properties.add(SSL_CONTEXT);
properties.add(TIMEOUT);
properties.add(CACHE_SIZE);
properties.add(CACHE_EXPIRATION);
properties.add(AUTHENTICATION_TYPE);
properties.add(USERNAME);
properties.add(PASSWORD);
return properties;
return PROPERTIES;
}

private static final Validator REQUEST_HEADER_VALIDATOR = new Validator() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
*/
package org.apache.nifi.services.dropbox;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
Expand Down Expand Up @@ -77,16 +75,12 @@ public class StandardDropboxCredentialService extends AbstractControllerService
.required(true)
.build();

private static final List<PropertyDescriptor> PROPERTIES;

static {
final List<PropertyDescriptor> props = new ArrayList<>();
props.add(APP_KEY);
props.add(APP_SECRET);
props.add(ACCESS_TOKEN);
props.add(REFRESH_TOKEN);
PROPERTIES = Collections.unmodifiableList(props);
}
private static final List<PropertyDescriptor> PROPERTIES = List.of(
APP_KEY,
APP_SECRET,
ACCESS_TOKEN,
REFRESH_TOKEN
);

private DropboxCredentialDetails credential;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,30 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im

private ObjectMapper mapper;

private static final List<PropertyDescriptor> properties = List.of(HTTP_HOSTS, PATH_PREFIX, AUTHORIZATION_SCHEME, USERNAME, PASSWORD, API_KEY_ID, API_KEY,
PROP_SSL_CONTEXT_SERVICE, PROXY_CONFIGURATION_SERVICE, CONNECT_TIMEOUT, SOCKET_TIMEOUT, CHARSET,
SUPPRESS_NULLS, COMPRESSION, SEND_META_HEADER, STRICT_DEPRECATION, NODE_SELECTOR, SNIFF_CLUSTER_NODES,
SNIFFER_INTERVAL, SNIFFER_REQUEST_TIMEOUT, SNIFF_ON_FAILURE, SNIFFER_FAILURE_DELAY);
private static final List<PropertyDescriptor> PROPERTIES = List.of(
HTTP_HOSTS,
PATH_PREFIX,
AUTHORIZATION_SCHEME,
USERNAME,
PASSWORD,
API_KEY_ID,
API_KEY,
PROP_SSL_CONTEXT_SERVICE,
PROXY_CONFIGURATION_SERVICE,
CONNECT_TIMEOUT,
SOCKET_TIMEOUT,
CHARSET,
SUPPRESS_NULLS,
COMPRESSION,
SEND_META_HEADER,
STRICT_DEPRECATION,
NODE_SELECTOR,
SNIFF_CLUSTER_NODES,
SNIFFER_INTERVAL,
SNIFFER_REQUEST_TIMEOUT,
SNIFF_ON_FAILURE,
SNIFFER_FAILURE_DELAY
);

private RestClient client;

Expand All @@ -117,7 +137,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im

@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
return PROPERTIES;
}

@Override
Expand Down
Loading

0 comments on commit 6b945e4

Please sign in to comment.