Skip to content

Commit

Permalink
NIFI-14129 Added Database Dialect Service
Browse files Browse the repository at this point in the history
- Added database-dialect-service-api
- Added Standard Database Dialect Service implementation
- Added Database Adapter implementation
- Added Database Dialect Service property descriptor to Database Processors
- Refactored Database Processors with optional Database Dialect Service
- Adjusted Derby implementation to throw an exception for more than one column in an ALTER TABLE statement construction

This closes apache#9640.

Signed-off-by: Tamas Palfy <[email protected]>
  • Loading branch information
exceptionfactory authored and tpalfy committed Jan 22, 2025
1 parent 0aa100a commit 253a076
Show file tree
Hide file tree
Showing 57 changed files with 2,201 additions and 888 deletions.
6 changes: 6 additions & 0 deletions nifi-assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,12 @@ language governing permissions and limitations under the License. -->
<version>2.2.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-database-dialect-service-nar</artifactId>
<version>2.2.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mongodb-client-service-api-nar</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-dbcp-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-database-dialect-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-web-client-provider-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,18 @@
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.database.dialect.service.api.ColumnDefinition;
import org.apache.nifi.database.dialect.service.api.StandardColumnDefinition;
import org.apache.nifi.database.dialect.service.api.DatabaseDialectService;
import org.apache.nifi.database.dialect.service.api.QueryStatementRequest;
import org.apache.nifi.database.dialect.service.api.StandardQueryStatementRequest;
import org.apache.nifi.database.dialect.service.api.StatementResponse;
import org.apache.nifi.database.dialect.service.api.StatementType;
import org.apache.nifi.database.dialect.service.api.TableDefinition;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.db.DatabaseAdapter;
import org.apache.nifi.util.StringUtils;
import org.apache.nifi.processors.standard.db.DatabaseAdapterDescriptor;

import java.sql.Connection;
import java.sql.ResultSet;
Expand All @@ -38,37 +45,17 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.Optional;
import java.util.stream.Collectors;

@Tags({"database", "dbcp", "sql"})
@CapabilityDescription("Fetches parameters from database tables")

public class DatabaseParameterProvider extends AbstractParameterProvider implements VerifiableParameterProvider {

protected final static Map<String, DatabaseAdapter> dbAdapters = new HashMap<>();

public static final PropertyDescriptor DB_TYPE;

static {
// Load the DatabaseAdapters
ArrayList<AllowableValue> dbAdapterValues = new ArrayList<>();
ServiceLoader<DatabaseAdapter> dbAdapterLoader = ServiceLoader.load(DatabaseAdapter.class);
dbAdapterLoader.forEach(it -> {
dbAdapters.put(it.getName(), it);
dbAdapterValues.add(new AllowableValue(it.getName(), it.getName(), it.getDescription()));
});

DB_TYPE = new PropertyDescriptor.Builder()
.name("db-type")
.displayName("Database Type")
.description("The type/flavor of database, used for generating database-specific code. In many cases the Generic type "
+ "should suffice, but some databases (such as Oracle) require custom SQL clauses. ")
.allowableValues(dbAdapterValues.toArray(new AllowableValue[dbAdapterValues.size()]))
.defaultValue("Generic")
.required(true)
.build();
}
public static final PropertyDescriptor DB_TYPE = DatabaseAdapterDescriptor.getDatabaseTypeDescriptor("db-type");

public static final PropertyDescriptor DATABASE_DIALECT_SERVICE = DatabaseAdapterDescriptor.getDatabaseDialectServiceDescriptor(DB_TYPE);

static AllowableValue GROUPING_BY_COLUMN = new AllowableValue("grouping-by-column", "Column",
"A single table is partitioned by the 'Parameter Group Name Column'. All rows with the same value in this column will " +
Expand Down Expand Up @@ -149,6 +136,7 @@ public class DatabaseParameterProvider extends AbstractParameterProvider impleme
protected void init(final ParameterProviderInitializationContext config) {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(DB_TYPE);
properties.add(DATABASE_DIALECT_SERVICE);
properties.add(DBCP_SERVICE);
properties.add(PARAMETER_GROUPING_STRATEGY);
properties.add(TABLE_NAME);
Expand Down Expand Up @@ -178,7 +166,7 @@ public List<ParameterGroup> fetchParameters(final ConfigurationContext context)

final List<String> tableNames = groupByColumn
? Collections.singletonList(context.getProperty(TABLE_NAME).getValue())
: Arrays.stream(context.getProperty(TABLE_NAMES).getValue().split(",")).map(String::trim).collect(Collectors.toList());
: Arrays.stream(context.getProperty(TABLE_NAMES).getValue().split(",")).map(String::trim).toList();

final Map<String, List<Parameter>> parameterMap = new HashMap<>();
for (final String tableName : tableNames) {
Expand Down Expand Up @@ -233,8 +221,24 @@ private void validateValueNotNull(final String value, final String columnName) {
}

String getQuery(final ConfigurationContext context, final String tableName, final List<String> columns, final String whereClause) {
final DatabaseAdapter dbAdapter = dbAdapters.get(context.getProperty(DB_TYPE).getValue());
return dbAdapter.getSelectStatement(tableName, StringUtils.join(columns, ", "), whereClause, null, null, null);
final String databaseType = context.getProperty(DB_TYPE).getValue();
final DatabaseDialectService databaseDialectService = DatabaseAdapterDescriptor.getDatabaseDialectService(context, DATABASE_DIALECT_SERVICE, databaseType);

final List<ColumnDefinition> columnDefinitions = columns.stream()
.map(StandardColumnDefinition::new)
.map(ColumnDefinition.class::cast)
.toList();
final TableDefinition tableDefinition = new TableDefinition(Optional.empty(), Optional.empty(), tableName, columnDefinitions);
final QueryStatementRequest queryStatementRequest = new StandardQueryStatementRequest(
StatementType.SELECT,
tableDefinition,
Optional.empty(),
Optional.ofNullable(whereClause),
Optional.empty(),
Optional.empty()
);
final StatementResponse statementResponse = databaseDialectService.getStatement(queryStatementRequest);
return statementResponse.sql();
}

@Override
Expand All @@ -243,8 +247,8 @@ public List<ConfigVerificationResult> verify(final ConfigurationContext context,
try {
final List<ParameterGroup> parameterGroups = fetchParameters(context);
final long parameterCount = parameterGroups.stream()
.flatMap(group -> group.getParameters().stream())
.count();
.mapToLong(group -> group.getParameters().size())
.sum();
results.add(new ConfigVerificationResult.Builder()
.outcome(ConfigVerificationResult.Outcome.SUCCESSFUL)
.verificationStepName("Fetch Parameters")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,10 @@
<artifactId>nifi-dbcp-service-api</artifactId>
<version>2.2.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-database-dialect-service-api</artifactId>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,18 @@
*/
package org.apache.nifi.processors.standard;

import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.database.dialect.service.api.ColumnDefinition;
import org.apache.nifi.database.dialect.service.api.StandardColumnDefinition;
import org.apache.nifi.database.dialect.service.api.DatabaseDialectService;
import org.apache.nifi.database.dialect.service.api.QueryStatementRequest;
import org.apache.nifi.database.dialect.service.api.StandardQueryStatementRequest;
import org.apache.nifi.database.dialect.service.api.StatementResponse;
import org.apache.nifi.database.dialect.service.api.StatementType;
import org.apache.nifi.database.dialect.service.api.TableDefinition;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
Expand All @@ -29,7 +37,7 @@
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.db.DatabaseAdapter;
import org.apache.nifi.processors.standard.db.DatabaseAdapterDescriptor;
import org.apache.nifi.processors.standard.db.impl.PhoenixDatabaseAdapter;
import org.apache.nifi.util.StringUtils;

Expand All @@ -50,13 +58,14 @@
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;

Expand Down Expand Up @@ -183,9 +192,9 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact
// The delimiter to use when referencing qualified names (such as table@!@column in the state map)
protected static final String NAMESPACE_DELIMITER = "@!@";

public static final PropertyDescriptor DB_TYPE;
public static final PropertyDescriptor DB_TYPE = DatabaseAdapterDescriptor.getDatabaseTypeDescriptor("db-fetch-db-type");
static final PropertyDescriptor DATABASE_DIALECT_SERVICE = DatabaseAdapterDescriptor.getDatabaseDialectServiceDescriptor(DB_TYPE);

protected final static Map<String, DatabaseAdapter> dbAdapters = new HashMap<>();
protected final Map<String, Integer> columnTypeMap = new HashMap<>();

// This value is set when the processor is scheduled and indicates whether the Table Name property contains Expression Language.
Expand All @@ -204,29 +213,11 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact

private static final DateTimeFormatter TIME_TYPE_FORMAT = DateTimeFormatter.ofPattern("HH:mm:ss.SSS");

private static final String ZERO_RESULT_WHERE_CLAUSE = "1 = 0";

// A Map (name to value) of initial maximum-value properties, filled at schedule-time and used at trigger-time
protected Map<String, String> maxValueProperties;

static {
// Load the DatabaseAdapters
ArrayList<AllowableValue> dbAdapterValues = new ArrayList<>();
ServiceLoader<DatabaseAdapter> dbAdapterLoader = ServiceLoader.load(DatabaseAdapter.class);
dbAdapterLoader.forEach(it -> {
dbAdapters.put(it.getName(), it);
dbAdapterValues.add(new AllowableValue(it.getName(), it.getName(), it.getDescription()));
});

DB_TYPE = new PropertyDescriptor.Builder()
.name("db-fetch-db-type")
.displayName("Database Type")
.description("The type/flavor of database, used for generating database-specific code. In many cases the Generic type "
+ "should suffice, but some databases (such as Oracle) require custom SQL clauses. ")
.allowableValues(dbAdapterValues.toArray(new AllowableValue[dbAdapterValues.size()]))
.defaultValue("Generic")
.required(true)
.build();
}

// A common validation procedure for DB fetch processors, it stores whether the Table Name and/or Max Value Column properties have expression language
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
// For backwards-compatibility, keep track of whether the table name and max-value column properties are dynamic (i.e. has expression language)
Expand All @@ -241,6 +232,8 @@ public void setup(final ProcessContext context) {
}

public void setup(final ProcessContext context, boolean shouldCleanCache, FlowFile flowFile) {
final DatabaseDialectService databaseDialectService = getDatabaseDialectService(context);

synchronized (setupComplete) {
setupComplete.set(false);
final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).evaluateAttributeExpressions(flowFile).getValue();
Expand All @@ -256,23 +249,15 @@ public void setup(final ProcessContext context, boolean shouldCleanCache, FlowFi
final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
final String sqlQuery = context.getProperty(SQL_QUERY).evaluateAttributeExpressions().getValue();

final DatabaseAdapter dbAdapter = dbAdapters.get(context.getProperty(DB_TYPE).getValue());
try (final Connection con = dbcpService.getConnection(flowFile == null ? Collections.emptyMap() : flowFile.getAttributes());
final Statement st = con.createStatement()) {

// Try a query that returns no rows, for the purposes of getting metadata about the columns. It is possible
// to use DatabaseMetaData.getColumns(), but not all drivers support this, notably the schema-on-read
// approach as in Apache Drill
String query;

if (StringUtils.isEmpty(sqlQuery)) {
query = dbAdapter.getSelectStatement(tableName, maxValueColumnNames, "1 = 0", null, null, null);
} else {
StringBuilder sbQuery = getWrappedQuery(dbAdapter, sqlQuery, tableName);
sbQuery.append(" WHERE 1=0");

query = sbQuery.toString();
}
final QueryStatementRequest statementRequest = getMaxValueStatementRequest(tableName, maxValueColumnNames, sqlQuery);
final StatementResponse statementResponse = databaseDialectService.getStatement(statementRequest);
final String query = statementResponse.sql();

ResultSet resultSet = st.executeQuery(query);
ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
Expand All @@ -286,13 +271,13 @@ public void setup(final ProcessContext context, boolean shouldCleanCache, FlowFi
final List<String> maxValueQualifiedColumnNameList = new ArrayList<>();

for (String maxValueColumn : maxValueColumnNameList) {
String colKey = getStateKey(tableName, maxValueColumn.trim(), dbAdapter);
String colKey = getStateKey(tableName, maxValueColumn.trim());
maxValueQualifiedColumnNameList.add(colKey);
}

for (int i = 1; i <= numCols; i++) {
String colName = resultSetMetaData.getColumnName(i).toLowerCase();
String colKey = getStateKey(tableName, colName, dbAdapter);
String colKey = getStateKey(tableName, colName);

//only include columns that are part of the maximum value tracking column list
if (!maxValueQualifiedColumnNameList.contains(colKey)) {
Expand All @@ -304,7 +289,7 @@ public void setup(final ProcessContext context, boolean shouldCleanCache, FlowFi
}

for (String maxValueColumn : maxValueColumnNameList) {
String colKey = getStateKey(tableName, maxValueColumn.trim().toLowerCase(), dbAdapter);
String colKey = getStateKey(tableName, maxValueColumn.trim().toLowerCase());
if (!columnTypeMap.containsKey(colKey)) {
throw new ProcessException("Column not found in the table/query specified: " + maxValueColumn);
}
Expand All @@ -320,15 +305,32 @@ public void setup(final ProcessContext context, boolean shouldCleanCache, FlowFi
}
}

protected static StringBuilder getWrappedQuery(DatabaseAdapter dbAdapter, String sqlQuery, String tableName) {
return new StringBuilder("SELECT * FROM (" + sqlQuery + ") " + dbAdapter.getTableAliasClause(tableName));
protected DatabaseDialectService getDatabaseDialectService(final PropertyContext context) {
final String databaseType = context.getProperty(DB_TYPE).getValue();
return DatabaseAdapterDescriptor.getDatabaseDialectService(context, DATABASE_DIALECT_SERVICE, databaseType);
}

private QueryStatementRequest getMaxValueStatementRequest(final String tableName, final String maxValueColumnNames, final String derivedTableQuery) {
final List<ColumnDefinition> maxValueColumns = Arrays.stream(maxValueColumnNames.split(","))
.map(StandardColumnDefinition::new)
.map(ColumnDefinition.class::cast)
.toList();

final TableDefinition tableDefinition = new TableDefinition(Optional.empty(), Optional.empty(), tableName, maxValueColumns);
return new StandardQueryStatementRequest(
StatementType.SELECT,
tableDefinition,
Optional.ofNullable(derivedTableQuery),
Optional.of(ZERO_RESULT_WHERE_CLAUSE),
Optional.empty(),
Optional.empty()
);
}

protected static String getMaxValueFromRow(ResultSet resultSet,
int columnIndex,
Integer type,
String maxValueString,
String databaseType)
String maxValueString)
throws ParseException, IOException, SQLException {

// Skip any columns we're not keeping track of or whose value is null
Expand Down Expand Up @@ -520,17 +522,18 @@ protected static String getLiteralByType(int type, String value, String database
* Construct a key string for a corresponding state value.
* @param prefix A prefix may contain database and table name, or just table name, this can be null
* @param columnName A column name
* @param adapter DatabaseAdapter is used to unwrap identifiers
* @return a state key string
*/
protected static String getStateKey(String prefix, String columnName, DatabaseAdapter adapter) {
protected static String getStateKey(String prefix, String columnName) {
StringBuilder sb = new StringBuilder();
if (prefix != null) {
sb.append(adapter.unwrapIdentifier(prefix.toLowerCase()));
final String prefixUnwrapped = prefix.toLowerCase().replaceAll("[\"`\\[\\]]", "");
sb.append(prefixUnwrapped);
sb.append(NAMESPACE_DELIMITER);
}
if (columnName != null) {
sb.append(adapter.unwrapIdentifier(columnName.toLowerCase()));
final String columnNameUnwrapped = columnName.toLowerCase().replaceAll("[\"`\\[\\]]", "");
sb.append(columnNameUnwrapped);
}
return sb.toString();
}
Expand Down
Loading

0 comments on commit 253a076

Please sign in to comment.