diff --git a/.github/workflows/build-connectors-action.yaml b/.github/workflows/build-connectors-action.yaml new file mode 100644 index 0000000..fdf162a --- /dev/null +++ b/.github/workflows/build-connectors-action.yaml @@ -0,0 +1,100 @@ +name: Build and Release Database Connector + +on: + push: + branches: + - "snowflake/*" + - "mysql/*" + - "oracle/*" + - "phoenix/*" + +jobs: + docker-build: + runs-on: ubuntu-latest + strategy: + matrix: + database: [snowflake, mysql, oracle, phoenix] + + outputs: + release_tag: ${{ steps.extract_tag.outputs.tag }} + + permissions: + contents: read + packages: write # Allows pushing to GHCR + id-token: write # Required for authenticating with GHCR + + steps: + - name: Checkout repository + if: contains(github.ref, matrix.database) + uses: actions/checkout@v3 + + - name: Extract version from branch name + if: contains(github.ref, matrix.database) + id: extract_tag + run: | + # Get the database type from the matrix + DB_TYPE=${{ matrix.database }} + # Use sed to remove the database prefix and get only the version part + VERSION=$(echo "${GITHUB_REF#refs/heads/}" | sed "s|^${DB_TYPE}/||") + # Construct the Docker tag + echo "docker_tag=ghcr.io/hasura/ndc-jvm-${DB_TYPE}:${VERSION}" >> $GITHUB_OUTPUT + + - name: Log in to the Container registry + uses: docker/login-action@65b78e6e13532edd9afa3aa52ac7964289d1a9c1 + with: + registry: ghcr.io + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + + - name: Set up Docker Buildx + if: contains(github.ref, matrix.database) + uses: docker/setup-buildx-action@v2 + + - name: Cross-platform Docker build and push + if: contains(github.ref, matrix.database) + uses: docker/build-push-action@v5 + with: + context: . + file: ndc-connector-${{ matrix.database }}.dockerfile + push: true + tags: ${{ steps.extract_tag.outputs.docker_tag }} + platforms: linux/amd64,linux/arm64 + build-args: | + JOOQ_PRO_EMAIL=${{ secrets.JOOQ_PRO_EMAIL }} + JOOQ_PRO_LICENSE=${{ secrets.JOOQ_PRO_LICENSE }} + + create-release: + needs: docker-build + runs-on: ubuntu-latest + strategy: + matrix: + database: [snowflake, mysql, oracle, phoenix] + + steps: + - name: Checkout repository + if: contains(github.ref, matrix.database) + uses: actions/checkout@v3 + + - name: Update dockerImage in connector-metadata.yaml + if: contains(github.ref, matrix.database) + run: | + # Use the full Docker tag from the docker-build job output + sed -i "s|^ dockerImage:.*| dockerImage: \"${{ needs.docker-build.outputs.docker_tag }}\"|" ndc-connector-${{ matrix.database }}/.hasura-connector/connector-metadata.yaml + + - name: Compress Hasura Connector Metadata + if: contains(github.ref, matrix.database) + run: | + cd ndc-connector-${{ matrix.database }} + tar -czf package.tar.gz ./.hasura-connector + + - name: Upload package.tar.gz to GitHub Release + if: contains(github.ref, matrix.database) + uses: actions/upload-release-asset@v1 + with: + tag_name: ${{ needs.docker-build.outputs.tag }} # Use the correct tag output + upload_url: ${{ steps.release.outputs.upload_url }} + asset_path: ndc-connector-${{ matrix.database }}/package.tar.gz + asset_name: package-${{ matrix.database }}.tar.gz + asset_content_type: application/gzip + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} diff --git a/create-github-release.sh b/create-github-release.sh index 98ee171..a763808 100755 --- a/create-github-release.sh +++ b/create-github-release.sh @@ -2,15 +2,15 @@ # Check if the correct number of arguments is provided if [ "$#" -ne 2 ]; then - echo "Usage: $0 " + echo "Usage: $0 " exit 1 fi # Variables OWNER="hasura" REPO="ndc-jvm-mono" -VERSION="$1" # Version passed as the first argument -SUBDIR="$2" # Subdirectory passed as the second argument +SUBDIR="$1" # Subdirectory passed as the second argument +VERSION="$2" # Version passed as the first argument # Create tag, release name, and description TAG="${SUBDIR#ndc-connector-}/${VERSION}" # Create tag like oracle/v1.0.0 @@ -36,4 +36,4 @@ if [ $? -eq 0 ]; then echo "Release ${RELEASE_NAME} created and file uploaded successfully for ${SUBDIR}." else echo "Failed to create release ${RELEASE_NAME} or upload file for ${SUBDIR}." -fi \ No newline at end of file +fi diff --git a/gradle.properties b/gradle.properties index 3e69b01..2f4752a 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,4 +1,4 @@ -version=0.1.0 +version=1.0.1 #Gradle properties quarkusPluginId=io.quarkus diff --git a/ndc-app/src/main/kotlin/io/hasura/ndc/app/application/Filters.kt b/ndc-app/src/main/kotlin/io/hasura/ndc/app/application/Filters.kt index 0b69c94..cd9658a 100644 --- a/ndc-app/src/main/kotlin/io/hasura/ndc/app/application/Filters.kt +++ b/ndc-app/src/main/kotlin/io/hasura/ndc/app/application/Filters.kt @@ -15,7 +15,8 @@ class Filters { @ServerRequestFilter(priority = 0) fun logBodyFilter(info: UriInfo, request: HttpServerRequest, ctx: ContainerRequestContext) { request.body { b -> - logger.debug("INCOMING IR: ${b.result()}") + logger.debug("INCOMING IR:") + logger.debug(b.result()) } } } diff --git a/ndc-app/src/main/kotlin/io/hasura/ndc/app/controllers/DataConnectorResource.kt b/ndc-app/src/main/kotlin/io/hasura/ndc/app/controllers/DataConnectorResource.kt index 52605fc..8cdd890 100644 --- a/ndc-app/src/main/kotlin/io/hasura/ndc/app/controllers/DataConnectorResource.kt +++ b/ndc-app/src/main/kotlin/io/hasura/ndc/app/controllers/DataConnectorResource.kt @@ -22,7 +22,7 @@ class DataConnectorResource @Inject constructor( val canConnectToDB = dataConnectorService.runHealthCheckQuery() if (canConnectToDB) { return Response - .status(Response.Status.NO_CONTENT) + .status(Response.Status.OK) .build() } else { throw RuntimeException("Unable to connect to DB") diff --git a/ndc-app/src/main/kotlin/io/hasura/ndc/app/services/AgroalDataSourceService.kt b/ndc-app/src/main/kotlin/io/hasura/ndc/app/services/AgroalDataSourceService.kt index 7109ee4..24a1f62 100644 --- a/ndc-app/src/main/kotlin/io/hasura/ndc/app/services/AgroalDataSourceService.kt +++ b/ndc-app/src/main/kotlin/io/hasura/ndc/app/services/AgroalDataSourceService.kt @@ -9,6 +9,7 @@ import io.agroal.api.security.SimplePassword import io.hasura.ndc.common.ConnectorConfiguration import io.opentelemetry.instrumentation.annotations.WithSpan import io.opentelemetry.instrumentation.jdbc.datasource.OpenTelemetryDataSource +import io.quarkus.agroal.runtime.AgroalOpenTelemetryWrapper import io.quarkus.agroal.runtime.OpenTelemetryAgroalDataSource import io.smallrye.config.ConfigMapping import io.smallrye.config.WithDefault diff --git a/ndc-app/src/main/kotlin/io/hasura/ndc/app/services/ConnectorConfigurationLoader.kt b/ndc-app/src/main/kotlin/io/hasura/ndc/app/services/ConnectorConfigurationLoader.kt deleted file mode 100644 index 9149449..0000000 --- a/ndc-app/src/main/kotlin/io/hasura/ndc/app/services/ConnectorConfigurationLoader.kt +++ /dev/null @@ -1,31 +0,0 @@ -package io.hasura.ndc.app.services - -import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper -import io.hasura.ndc.common.ConnectorConfiguration -import java.io.File -import java.nio.file.Path - -object ConnectorConfigurationLoader { - private val mapper = jacksonObjectMapper() - - private val DEFAULT_CONFIG_DIRECTORY = "/etc/connector" - private val ENV_SUPPLIED_CONFIG_DIRECTORY = System.getenv("HASURA_CONFIGURATION_DIRECTORY") - private val CONFIG_FILE_NAME = "configuration.json" - - val config: ConnectorConfiguration - - init { - val configPath = getConfigPath() - println("Loading configuration from $configPath") - config = loadConfigFile(configPath) - } - - private fun getConfigPath(): Path { - val configDirectory = ENV_SUPPLIED_CONFIG_DIRECTORY ?: DEFAULT_CONFIG_DIRECTORY - return Path.of(configDirectory, CONFIG_FILE_NAME) - } - - private fun loadConfigFile(path: Path): ConnectorConfiguration { - return mapper.readValue(File(path.toString()), ConnectorConfiguration::class.java) - } -} diff --git a/ndc-app/src/main/kotlin/io/hasura/ndc/app/services/dataConnectors/BaseDataConnectorService.kt b/ndc-app/src/main/kotlin/io/hasura/ndc/app/services/dataConnectors/BaseDataConnectorService.kt index b072360..0dd3e88 100644 --- a/ndc-app/src/main/kotlin/io/hasura/ndc/app/services/dataConnectors/BaseDataConnectorService.kt +++ b/ndc-app/src/main/kotlin/io/hasura/ndc/app/services/dataConnectors/BaseDataConnectorService.kt @@ -9,7 +9,6 @@ import io.hasura.ndc.app.interfaces.ISchemaGenerator import io.hasura.ndc.common.ConnectorConfiguration import io.hasura.ndc.app.models.ExplainResponse import io.hasura.ndc.app.services.AgroalDataSourceService -import io.hasura.ndc.app.services.ConnectorConfigurationLoader import io.opentelemetry.api.trace.Tracer import io.opentelemetry.instrumentation.annotations.WithSpan import jakarta.enterprise.inject.Produces @@ -106,14 +105,14 @@ abstract class BaseDataConnectorService( @WithSpan open fun mkDSLCtx(): DSLContext { - val config = ConnectorConfigurationLoader.config + val config = ConnectorConfiguration.Loader.config val ds = dataSourceProvider.getDataSource(config) return DSL.using(ds, jooqDialect, jooqSettings) } @WithSpan override fun getSchema(): SchemaResponse { - return schemaGenerator.getSchema(ConnectorConfigurationLoader.config) + return schemaGenerator.getSchema(ConnectorConfiguration.Loader.config) } diff --git a/ndc-cli/build.gradle.kts b/ndc-cli/build.gradle.kts index 93ac493..9e3332e 100644 --- a/ndc-cli/build.gradle.kts +++ b/ndc-cli/build.gradle.kts @@ -28,6 +28,7 @@ dependencies { implementation("net.snowflake:snowflake-jdbc:3.16.1") implementation("org.apache.phoenix:phoenix-client-hbase-2.4:5.1.1") + implementation("org.apache.phoenix:phoenix-queryserver-client:5.0.0-HBase-2.0") } tasks.withType { diff --git a/ndc-cli/src/main/kotlin/io/hasura/cli/PhoenixConfigGenerator.kt b/ndc-cli/src/main/kotlin/io/hasura/cli/PhoenixConfigGenerator.kt index 5babfb8..da782d9 100644 --- a/ndc-cli/src/main/kotlin/io/hasura/cli/PhoenixConfigGenerator.kt +++ b/ndc-cli/src/main/kotlin/io/hasura/cli/PhoenixConfigGenerator.kt @@ -6,18 +6,67 @@ import io.hasura.ndc.common.TableSchemaRow import io.hasura.ndc.common.TableType import org.jooq.impl.DSL import java.sql.JDBCType +import java.sql.Types object PhoenixConfigGenerator : IConfigGenerator { + + + private fun translatePhoenixDataTypeToSqlType(phoenixDataType: Int?, isThinClient: Boolean = true): String { + + val sqlType = if (!isThinClient) { + phoenixDataType ?: Types.OTHER + } else { + when (phoenixDataType) { + null -> Types.OTHER // Handle null data_type + -6 -> Types.TINYINT // TINYINT + -5 -> Types.BIGINT // BIGINT + -3 -> Types.VARBINARY // VARBINARY + -2 -> Types.BINARY // BINARY + 1 -> Types.CHAR // CHAR + 3 -> Types.DECIMAL // DECIMAL + 4 -> Types.INTEGER // INTEGER + 5 -> Types.SMALLINT // SMALLINT + 6 -> Types.FLOAT // FLOAT + 8 -> Types.DOUBLE // DOUBLE + 9 -> Types.VARCHAR // VARCHAR + 10 -> Types.SMALLINT // UNSIGNED_SMALLINT (maps to SMALLINT) + 11 -> Types.FLOAT // UNSIGNED_FLOAT (maps to FLOAT) + 12 -> Types.VARCHAR // VARCHAR (Phoenix specific) + 13 -> Types.VARCHAR // (Custom/Unsupported, mapped to VARCHAR) + 14 -> Types.VARCHAR // (Custom/Unsupported, mapped to VARCHAR) + 15 -> Types.VARCHAR // (Custom/Unsupported, mapped to VARCHAR) + 16 -> Types.BOOLEAN // BOOLEAN + 18 -> Types.ARRAY // ARRAY + 19 -> Types.VARBINARY // VARBINARY (Phoenix specific) + 20 -> Types.VARBINARY // VARBINARY (Phoenix specific) + 91 -> Types.DATE // DATE + 92 -> Types.TIME // TIME + 93 -> Types.TIMESTAMP // TIMESTAMP + else -> + if (JDBCType.valueOf(phoenixDataType) != null) { + phoenixDataType + } else { + throw IllegalArgumentException("Unknown Phoenix data type: $phoenixDataType") + } + } + } + return JDBCType.valueOf(sqlType).name + } + override fun getConfig( jdbcUrl: String, schemas: List ): ConnectorConfiguration { val ctx = DSL.using(jdbcUrl) - val result = ctx.fetch(""" + val isThinClient = jdbcUrl.contains("phoenix:thin", ignoreCase = true) + + val result = ctx.fetch( + """ SELECT * FROM SYSTEM.CATALOG WHERE TABLE_SCHEM != 'SYSTEM' OR TABLE_SCHEM IS NULL - """) + """ + ) val groupedBySchema = result.groupBy { it["TABLE_SCHEM"] as String? } @@ -28,10 +77,12 @@ object PhoenixConfigGenerator : IConfigGenerator { val columns = records.filter { it["COLUMN_NAME"] != null }.map { val columnFamily = it["COLUMN_FAMILY"] as String? val columnName = it["COLUMN_NAME"] as String + + ColumnSchemaRow( name = if (columnFamily != null && columnFamily != "0") "$columnFamily.$columnName" else columnName, description = null, - type = JDBCType.valueOf(it["DATA_TYPE"] as Int).name, + type = translatePhoenixDataTypeToSqlType(it["DATA_TYPE"] as? Int, isThinClient), numeric_scale = null, nullable = it["NULLABLE"] == 1, auto_increment = it["IS_AUTOINCREMENT"] == "YES", @@ -40,7 +91,7 @@ object PhoenixConfigGenerator : IConfigGenerator { } TableSchemaRow( - tableName = if (schema != null) "$schema.$tableName" else tableName, + tableName = if (schema != null) "$schema.$tableName" else tableName, tableType = if (records.any { it["TABLE_TYPE"] == "u" }) TableType.TABLE else TableType.VIEW, description = null, columns = columns, @@ -59,4 +110,4 @@ object PhoenixConfigGenerator : IConfigGenerator { functions = emptyList() ) } -} \ No newline at end of file +} diff --git a/ndc-cli/src/main/kotlin/io/hasura/cli/main.kt b/ndc-cli/src/main/kotlin/io/hasura/cli/main.kt index 65571a0..bcdeacb 100644 --- a/ndc-cli/src/main/kotlin/io/hasura/cli/main.kt +++ b/ndc-cli/src/main/kotlin/io/hasura/cli/main.kt @@ -1,6 +1,7 @@ package io.hasura.cli import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper +import io.hasura.ndc.common.ConnectorConfiguration import picocli.CommandLine import picocli.CommandLine.* import java.io.File @@ -54,11 +55,22 @@ class CLI { names = ["-s", "--schemas"], arity = "0..*", split = ",", - defaultValue = "", description = ["Comma-separated list of schemas to introspect"] ) - schemas: List = emptyList() + schemas: List? ) { + val file = File(outfile) + + println("Checking for configuration file at ${file.absolutePath}") + val existingConfig = file.let { + if (it.exists()) { + println("Existing configuration file detected") + mapper.readValue(it, ConnectorConfiguration::class.java) + } else { + println("Non-existent or empty configuration file detected") + ConnectorConfiguration() + } + } val configGenerator = when (database) { DatabaseType.ORACLE -> OracleConfigGenerator @@ -67,20 +79,23 @@ class CLI { DatabaseType.PHOENIX -> PhoenixConfigGenerator } - val config = configGenerator.getConfig( + println("Generating configuration for $database database...") + val introspectedConfig = configGenerator.getConfig( jdbcUrl = jdbcUrl, - schemas = schemas + schemas = schemas ?: emptyList() + ) + val mergedConfigWithNativeQueries = introspectedConfig.copy( + nativeQueries = existingConfig.nativeQueries ) - val file = File(outfile) try { - file.createNewFile() - mapper.writerWithDefaultPrettyPrinter().writeValue(file, config) + println("Writing configuration to ${file.absolutePath}") + mapper.writerWithDefaultPrettyPrinter().writeValue(file, mergedConfigWithNativeQueries) } catch (e: Exception) { println("Error writing configuration to file: ${e.message}") val parentDir = file.parentFile - val permissions = Files.getPosixFilePermissions(parentDir.toPath()) + val permissions = Files.getPosixFilePermissions(parentDir.toPath()) val posixPermissions = PosixFilePermissions.toString(permissions) println("Current user: ${System.getProperty("user.name")}") @@ -104,4 +119,3 @@ class CLI { } } } - diff --git a/ndc-connector-mysql.dockerfile b/ndc-connector-mysql.dockerfile index 3087166..fc910a3 100644 --- a/ndc-connector-mysql.dockerfile +++ b/ndc-connector-mysql.dockerfile @@ -1,7 +1,13 @@ # Build stage FROM registry.access.redhat.com/ubi9/openjdk-21:1.20-2 AS build +ARG JOOQ_PRO_EMAIL +ARG JOOQ_PRO_LICENSE + ENV LANG='en_US.UTF-8' LANGUAGE='en_US:en' +ENV JOOQ_PRO_EMAIL=${JOOQ_PRO_EMAIL} +ENV JOOQ_PRO_LICENSE=${JOOQ_PRO_LICENSE} + WORKDIR /build COPY . /build diff --git a/ndc-connector-mysql/.hasura-connector/connector-metadata.yaml b/ndc-connector-mysql/.hasura-connector/connector-metadata.yaml index 12edee2..f06bff7 100644 --- a/ndc-connector-mysql/.hasura-connector/connector-metadata.yaml +++ b/ndc-connector-mysql/.hasura-connector/connector-metadata.yaml @@ -1,6 +1,6 @@ packagingDefinition: type: PrebuiltDockerImage - dockerImage: "ghcr.io/hasura/ndc-jvm-mysql:v0.1.0" + dockerImage: "ghcr.io/hasura/ndc-jvm-mysql:v1.0.3" supportedEnvironmentVariables: - name: JDBC_URL description: "The JDBC URL to connect to the database" @@ -9,7 +9,7 @@ commands: docker run \ -e HASURA_PLUGIN_CONNECTOR_CONTEXT_PATH \ -v ${HASURA_PLUGIN_CONNECTOR_CONTEXT_PATH}:/app/output \ - ghcr.io/hasura/ndc-jvm-cli:v0.1.0 update $JDBC_URL \ + ghcr.io/hasura/ndc-jvm-cli:v0.1.3 update $JDBC_URL \ --database MYSQL \ --schemas $JDBC_SCHEMAS \ --outfile /app/output/configuration.json diff --git a/ndc-connector-mysql/src/main/kotlin/io/hasura/mysql/JSONGenerator.kt b/ndc-connector-mysql/src/main/kotlin/io/hasura/mysql/JSONGenerator.kt index 3821791..db1f61f 100644 --- a/ndc-connector-mysql/src/main/kotlin/io/hasura/mysql/JSONGenerator.kt +++ b/ndc-connector-mysql/src/main/kotlin/io/hasura/mysql/JSONGenerator.kt @@ -1,7 +1,9 @@ package io.hasura.mysql -import io.hasura.ndc.app.services.ConnectorConfigurationLoader +import io.hasura.ndc.common.ConnectorConfiguration import io.hasura.ndc.common.NDCScalar +import io.hasura.ndc.common.NativeQueryInfo +import io.hasura.ndc.common.NativeQueryPart import io.hasura.ndc.ir.* import io.hasura.ndc.ir.Field.ColumnField import io.hasura.ndc.ir.Field as IRField @@ -14,25 +16,11 @@ import org.jooq.impl.SQLDataType object JsonQueryGenerator : BaseQueryGenerator() { - override fun buildComparison( - col: Field, - operator: ApplyBinaryComparisonOperator, - value: Field - ): Condition { - return when (operator) { - ApplyBinaryComparisonOperator.EQ -> col.eq(value) - ApplyBinaryComparisonOperator.GT -> col.gt(value) - ApplyBinaryComparisonOperator.GTE -> col.ge(value) - ApplyBinaryComparisonOperator.LT -> col.lt(value) - ApplyBinaryComparisonOperator.LTE -> col.le(value) - ApplyBinaryComparisonOperator.IN -> col.`in`(value) - ApplyBinaryComparisonOperator.IS_NULL -> col.isNull - ApplyBinaryComparisonOperator.LIKE -> col.like(value as Field) - } - } override fun forEachQueryRequestToSQL(request: QueryRequest): Select<*> { - TODO("Not yet implemented") + return DSL.with(buildVarsCTE(request)) + .select() + .from(queryRequestToSQLInternal(request), DSL.table(DSL.name("vars"))) } override fun queryRequestToSQL(request: QueryRequest): Select<*> { @@ -41,144 +29,196 @@ object JsonQueryGenerator : BaseQueryGenerator() { fun queryRequestToSQLInternal( request: QueryRequest, + ): SelectSelectStep<*> { + // If the QueryRequest "collection" references the name of a Native Query defined in the configuration.json, + // we need to prefix the generated query with a CTE named identically to the Native Query, containing the Native Query itself + val isNativeQuery = ConnectorConfiguration.Loader.config.nativeQueries.containsKey(request.collection) + + return if (isNativeQuery) { + mkNativeQueryCTE(request).select( + jsonArrayAgg( + buildJSONSelectionForQueryRequest(request) + ) + ) + } else { + DSL.select( + jsonArrayAgg( + buildJSONSelectionForQueryRequest(request) + ) + ) + } + } + + fun buildJSONSelectionForQueryRequest( + request: QueryRequest, parentTable: String? = null, - parentRelationship: Relationship? = null, - ): SelectHavingStep> { - return DSL.select( - DSL.jsonObject( - DSL.jsonEntry( - "rows", - DSL.select( - jsonArrayAgg( - DSL.jsonObject( - (request.query.fields ?: emptyMap()).map { (alias, field) -> - when (field) { - is ColumnField -> { - DSL.jsonEntry( - alias, - DSL.field(DSL.name(field.column)) - ) - } + parentRelationship: Relationship? = null + ): JSONObjectNullStep<*> { - is IRField.RelationshipField -> { - val relationship = - request.collection_relationships[field.relationship] - ?: error("Relationship ${field.relationship} not found") - - val subQuery = queryRequestToSQLInternal( - parentTable = request.collection, - parentRelationship = relationship, - request = QueryRequest( - collection = relationship.target_collection, - collection_relationships = request.collection_relationships, - query = field.query, - arguments = field.arguments, - variables = null - ) - ) - - DSL.jsonEntry( - alias, - DSL.select( - subQuery.asField(alias) - ) - ) - } - } - } - ) + val baseSelection = DSL.select( + DSL.table(DSL.name(request.collection)).asterisk() + ).from( + if (request.query.predicate == null) { + DSL.table(DSL.name(request.collection)) + } else { + val table = DSL.table(DSL.name(request.collection)) + val requiredJoinTables = collectRequiredJoinTablesForWhereClause( + where = request.query.predicate!!, + collectionRelationships = request.collection_relationships + ) + requiredJoinTables.foldIndexed(table) { index, acc, relationship -> + val parentTable = if (index == 0) { + request.collection + } else { + requiredJoinTables.elementAt(index - 1).target_collection + } + + val joinTable = DSL.table(DSL.name(relationship.target_collection)) + acc.join(joinTable).on( + mkJoinWhereClause( + sourceTable = parentTable, + parentRelationship = relationship ) - ).from( - DSL.select( - (request.query.fields ?: emptyMap()) - .filter { (_, field) -> field is ColumnField } - .map { (alias, field) -> - field as ColumnField - DSL.field(DSL.name(request.collection, field.column)).`as`(alias) - } + - collectColumnsReferencedByRelationships( - fields = request.query.fields ?: emptyMap(), - collectionRelationships = request.collection_relationships - ).map { - DSL.field(DSL.name(request.collection, it)) - } - ).from( - run> { - val table = DSL.table(DSL.name(request.collection)) - if (request.query.predicate == null) { - table - } else { - val requiredJoinTables = collectRequiredJoinTablesForWhereClause( - where = request.query.predicate!!, - collectionRelationships = request.collection_relationships - ) + ) + } + } + ).apply { + if (request.query.predicate != null) { + where(getWhereConditions(request)) + } + if (parentRelationship != null) { + where( + mkJoinWhereClause( + sourceTable = parentTable ?: error("parentTable is null"), + parentRelationship = parentRelationship + ) + ) + } + if (request.query.order_by != null) { + orderBy( + translateIROrderByField( + orderBy = request.query.order_by, + currentCollection = getTableName(request.collection), + relationships = request.collection_relationships + ) + ) + } + if (request.query.limit != null) { + limit(request.query.limit) + } + if (request.query.offset != null) { + offset(request.query.offset) + } + }.asTable( + DSL.name(getTableName(request.collection)) + ) - requiredJoinTables.foldIndexed(table) { index, acc, relationship -> - val parentTable = if (index == 0) { - request.collection - } else { - requiredJoinTables.elementAt(index - 1).target_collection - } + return DSL.jsonObject( + buildList { + if (!request.query.fields.isNullOrEmpty()) { + add( + DSL.jsonEntry( + "rows", + DSL.select( + jsonArrayAgg( + DSL.jsonObject( + (request.query.fields ?: emptyMap()).map { (alias, field) -> + when (field) { + is ColumnField -> { + DSL.jsonEntry( + alias, + DSL.field( + DSL.name(field.column), + // columnTypeTojOOQType(request.collection, field) + ) + ) + } - val joinTable = DSL.table(DSL.name(relationship.target_collection)) - acc.join(joinTable).on( - mkJoinWhereClause( - sourceTable = parentTable, - parentRelationship = relationship - ) - ) - } - } - } - ).apply { - if (request.query.predicate != null) { - where(getWhereConditions(request)) - } - if (parentRelationship != null) { - where( - mkJoinWhereClause( - sourceTable = parentTable ?: error("parentTable is null"), - parentRelationship = parentRelationship + is IRField.RelationshipField -> { + val relationship = + request.collection_relationships[field.relationship] + ?: error("Relationship ${field.relationship} not found") + + val subQuery = buildJSONSelectionForQueryRequest( + parentTable = request.collection, + parentRelationship = relationship, + request = QueryRequest( + collection = relationship.target_collection, + collection_relationships = request.collection_relationships, + query = field.query, + arguments = field.arguments, + variables = null + ) + ) + + DSL.jsonEntry( + alias, + DSL.coalesce( + DSL.select(subQuery), + DSL.jsonObject( + DSL.jsonEntry( + "rows", + DSL.jsonArray() + ) + ) + ) + ) + } + } + } ) ) - } - if (request.query.fields != null) { - val selectedColumns = request.query.fields!!.values.filterIsInstance().map { - DSL.field(DSL.name(request.collection, it.column)) - } - val relFieldColumns = collectColumnsReferencedByRelationships( - fields = request.query.fields ?: emptyMap(), - collectionRelationships = request.collection_relationships - ).map { - DSL.field(DSL.name(request.collection, it)) - } - groupBy(selectedColumns + relFieldColumns) - } - if (request.query.order_by != null) { - orderBy( - translateIROrderByField( - orderBy = request.query.order_by, - currentCollection = getTableName(request.collection), - relationships = request.collection_relationships - ) + ).from( + baseSelection + ) + ) + ) + } + if (!request.query.aggregates.isNullOrEmpty()) { + add( + DSL.jsonEntry( + "aggregates", + DSL.select( + DSL.jsonObject( + (request.query.aggregates ?: emptyMap()).map { (alias, aggregate) -> + DSL.jsonEntry( + alias, + getAggregatejOOQFunction(aggregate) + ) + } ) - } - if (request.query.limit != null) { - limit(request.query.limit) - } - if (request.query.offset != null) { - offset(request.query.offset) - } - }.asTable( - DSL.name(getTableName(request.collection)) + ).from( + baseSelection + ) ) ) - ) - ) + } + } ) } - fun jsonArrayAgg(field: JSONObjectNullStep) = CustomField.of("mysql_json_arrayagg", SQLDataType.JSON) { + fun renderNativeQuerySQL( + nativeQuery: NativeQueryInfo, + arguments: Map + ): String { + val sql = nativeQuery.sql + val parts = sql.parts + + return parts.joinToString("") { part -> + when (part) { + is NativeQueryPart.Text -> part.value + is NativeQueryPart.Parameter -> { + val argument = arguments[part.value] ?: error("Argument ${part.value} not found") + when (argument) { + is Argument.Literal -> argument.value.toString() + else -> error("Only literals are supported in Native Queries in this version") + } + } + } + } + } + + fun jsonArrayAgg(field: JSONObjectNullStep<*>) = CustomField.of("mysql_json_arrayagg", SQLDataType.JSON) { it.visit(DSL.field("json_arrayagg({0})", field)) } @@ -215,32 +255,20 @@ object JsonQueryGenerator : BaseQueryGenerator() { } } - // Returns all the columns in a parent table which are referenced - // by fields of type "relationship" and needed to join the two tables - // - // If a join column is already present in the requested fields, we skip it to avoid duplication - fun collectColumnsReferencedByRelationships( - fields: Map, - collectionRelationships: Map - ): Set { - val columnFields = fields.values - .filterIsInstance() - .map { it.column }.toSet() - - return fields.values - .filterIsInstance() - .mapNotNull { field -> - collectionRelationships[field.relationship] - ?.column_mapping - ?.values - ?.filterNot(columnFields::contains) - } - .flatten() - .toSet() + fun ndcScalarTypeToSQLDataType(scalarType: NDCScalar): DataType = when (scalarType) { + NDCScalar.BOOLEAN -> SQLDataType.BOOLEAN + NDCScalar.INT -> SQLDataType.INTEGER + NDCScalar.FLOAT -> SQLDataType.FLOAT + NDCScalar.STRING -> SQLDataType.CLOB + NDCScalar.DATE -> SQLDataType.DATE + NDCScalar.DATETIME -> SQLDataType.TIMESTAMP + NDCScalar.DATETIME_WITH_TIMEZONE -> SQLDataType.TIMESTAMP + NDCScalar.TIME -> SQLDataType.TIME + NDCScalar.TIME_WITH_TIMEZONE -> SQLDataType.TIME } private fun columnTypeTojOOQType(collection: String, field: ColumnField): org.jooq.DataType { - val connectorConfig = ConnectorConfigurationLoader.config + val connectorConfig = ConnectorConfiguration.Loader.config val table = connectorConfig.tables.find { it.tableName == collection } ?: error("Table $collection not found in connector configuration") @@ -249,17 +277,7 @@ object JsonQueryGenerator : BaseQueryGenerator() { ?: error("Column ${field.column} not found in table $collection") val scalarType = MySQLJDBCSchemaGenerator.mapScalarType(column.type, column.numeric_scale) - return when (scalarType) { - NDCScalar.BOOLEAN -> SQLDataType.BOOLEAN - NDCScalar.INT -> SQLDataType.INTEGER - NDCScalar.FLOAT -> SQLDataType.FLOAT - NDCScalar.STRING -> SQLDataType.CLOB - NDCScalar.DATE -> SQLDataType.DATE - NDCScalar.DATETIME -> SQLDataType.TIMESTAMP - NDCScalar.DATETIME_WITH_TIMEZONE -> SQLDataType.TIMESTAMP - NDCScalar.TIME -> SQLDataType.TIME - NDCScalar.TIME_WITH_TIMEZONE -> SQLDataType.TIME - } + return ndcScalarTypeToSQLDataType(scalarType) } private fun getAggregatejOOQFunction(aggregate: Aggregate) = when (aggregate) { diff --git a/ndc-connector-mysql/src/main/kotlin/io/hasura/mysql/MySQLDataConnectorService.kt b/ndc-connector-mysql/src/main/kotlin/io/hasura/mysql/MySQLDataConnectorService.kt index 52ba1f3..7d70b67 100644 --- a/ndc-connector-mysql/src/main/kotlin/io/hasura/mysql/MySQLDataConnectorService.kt +++ b/ndc-connector-mysql/src/main/kotlin/io/hasura/mysql/MySQLDataConnectorService.kt @@ -1,41 +1,17 @@ package io.hasura.mysql -import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import com.fasterxml.jackson.module.kotlin.readValue import io.hasura.ndc.app.interfaces.IDataSourceProvider -import io.hasura.ndc.app.services.ConnectorConfigurationLoader import io.hasura.ndc.app.services.dataConnectors.BaseDataConnectorService -import io.hasura.ndc.common.ConnectorConfiguration -import io.opentelemetry.api.trace.Tracer -import io.vertx.core.http.HttpServerRequest -import jakarta.inject.Inject -import jakarta.inject.Singleton -import jakarta.ws.rs.container.ContainerRequestContext -import jakarta.ws.rs.core.UriInfo import io.hasura.ndc.ir.* import io.hasura.ndc.sqlgen.MutationTranslator +import io.opentelemetry.api.trace.Tracer import jakarta.annotation.Priority import jakarta.enterprise.inject.Alternative -import org.jboss.resteasy.reactive.server.ServerRequestFilter -import org.jooq.DSLContext +import jakarta.inject.Inject +import jakarta.inject.Singleton import org.jooq.SQLDialect import org.jooq.conf.RenderQuotedNames -import org.jooq.conf.Settings -import org.jooq.impl.DefaultDSLContext - - -class Filters { - - @ServerRequestFilter(priority = 0) - fun logBodyFilter(info: UriInfo, request: HttpServerRequest, ctx: ContainerRequestContext) { - request.body { - val text = it.result().toString() - // Print JSON string formatted with Jackson - val json = jacksonObjectMapper().readValue(text) - println(jacksonObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(json)) - } - } -} @Singleton @Alternative @@ -66,25 +42,19 @@ class MySQLDataConnectorService @Inject constructor( ) override fun handleQuery(request: QueryRequest): List { - println(ConnectorConfigurationLoader.config) - val dslCtx = mkDSLCtx() - val query = JsonQueryGenerator.queryRequestToSQL(request) - println( - dslCtx - .renderInlined(query) - ) + val query = if (!request.variables.isNullOrEmpty()) { + JsonQueryGenerator.forEachQueryRequestToSQL(request) + } else { + JsonQueryGenerator.queryRequestToSQL(request) + } val rows = executeDbQuery(query, dslCtx) val json = rows.getValue(0, 0).toString() - val rowset = objectMapper.readValue(json) - return if (rowset == null) { - listOf(RowSet(rows = emptyList(), aggregates = emptyMap())) - } else { - listOf(rowset) - } + val rowsets = objectMapper.readValue>(json) + return rowsets } override val jooqDialect = SQLDialect.MYSQL_8_0 diff --git a/ndc-connector-oracle/.hasura-connector/connector-metadata.yaml b/ndc-connector-oracle/.hasura-connector/connector-metadata.yaml index f175e4f..5c5a5df 100644 --- a/ndc-connector-oracle/.hasura-connector/connector-metadata.yaml +++ b/ndc-connector-oracle/.hasura-connector/connector-metadata.yaml @@ -1,6 +1,6 @@ packagingDefinition: type: PrebuiltDockerImage - dockerImage: "ghcr.io/hasura/ndc-jvm-oracle:v0.1.0" + dockerImage: "ghcr.io/hasura/ndc-jvm-oracle:v1.0.3" supportedEnvironmentVariables: - name: JDBC_URL description: "The JDBC URL to connect to the database" @@ -11,8 +11,7 @@ commands: docker run \ -e HASURA_PLUGIN_CONNECTOR_CONTEXT_PATH \ -v ${HASURA_PLUGIN_CONNECTOR_CONTEXT_PATH}:/app/output \ - ghcr.io/hasura/ndc-jvm-cli:v0.1.0 update $JDBC_URL \ + ghcr.io/hasura/ndc-jvm-cli:v0.1.3 update $JDBC_URL \ --database ORACLE \ --schemas $JDBC_SCHEMAS \ --outfile /app/output/configuration.json - \ No newline at end of file diff --git a/ndc-connector-oracle/src/main/kotlin/io/hasura/oracle/JSONGenerator.kt b/ndc-connector-oracle/src/main/kotlin/io/hasura/oracle/JSONGenerator.kt index 69784c3..03139e3 100644 --- a/ndc-connector-oracle/src/main/kotlin/io/hasura/oracle/JSONGenerator.kt +++ b/ndc-connector-oracle/src/main/kotlin/io/hasura/oracle/JSONGenerator.kt @@ -1,6 +1,6 @@ package io.hasura.oracle -import io.hasura.ndc.app.services.ConnectorConfigurationLoader +import io.hasura.ndc.common.ConnectorConfiguration import io.hasura.ndc.common.NDCScalar import io.hasura.ndc.ir.* import io.hasura.ndc.ir.Field.ColumnField @@ -13,22 +13,6 @@ import org.jooq.impl.SQLDataType object JsonQueryGenerator : BaseQueryGenerator() { - override fun buildComparison( - col: Field, - operator: ApplyBinaryComparisonOperator, - value: Field - ): Condition { - return when (operator) { - ApplyBinaryComparisonOperator.EQ -> col.eq(value) - ApplyBinaryComparisonOperator.GT -> col.gt(value) - ApplyBinaryComparisonOperator.GTE -> col.ge(value) - ApplyBinaryComparisonOperator.LT -> col.lt(value) - ApplyBinaryComparisonOperator.LTE -> col.le(value) - ApplyBinaryComparisonOperator.IN -> col.`in`(value) - ApplyBinaryComparisonOperator.IS_NULL -> col.isNull - ApplyBinaryComparisonOperator.LIKE -> col.like(value as Field) - } - } override fun forEachQueryRequestToSQL(request: QueryRequest): Select<*> { TODO("Not yet implemented") @@ -85,8 +69,15 @@ object JsonQueryGenerator : BaseQueryGenerator() { DSL.jsonEntry( alias, - DSL.select( - subQuery.asField(alias) + DSL.coalesce( + DSL.select( + subQuery.asField(alias) + ), DSL.jsonObject( + DSL.jsonEntry( + "rows", + DSL.jsonArray().returning(SQLDataType.CLOB) + ) + ).returning(SQLDataType.CLOB) ) ) } @@ -230,7 +221,7 @@ object JsonQueryGenerator : BaseQueryGenerator() { } private fun columnTypeTojOOQType(collection: String, field: ColumnField): org.jooq.DataType { - val connectorConfig = ConnectorConfigurationLoader.config + val connectorConfig = ConnectorConfiguration.Loader.config val table = connectorConfig.tables.find { it.tableName == collection } ?: error("Table $collection not found in connector configuration") @@ -286,7 +277,7 @@ object JsonQueryGenerator : BaseQueryGenerator() { ) private fun getTableName(collection: String): String { - return collection.split('.').last() + return collection.split('.').last() } } diff --git a/ndc-connector-oracle/src/main/kotlin/io/hasura/oracle/OracleDataConnectorService.kt b/ndc-connector-oracle/src/main/kotlin/io/hasura/oracle/OracleDataConnectorService.kt index ae9a1e1..7a2889f 100644 --- a/ndc-connector-oracle/src/main/kotlin/io/hasura/oracle/OracleDataConnectorService.kt +++ b/ndc-connector-oracle/src/main/kotlin/io/hasura/oracle/OracleDataConnectorService.kt @@ -1,39 +1,19 @@ package io.hasura.oracle -import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import com.fasterxml.jackson.module.kotlin.readValue import io.hasura.ndc.app.interfaces.IDataSourceProvider import io.hasura.ndc.app.services.dataConnectors.BaseDataConnectorService -import io.opentelemetry.api.trace.Tracer -import io.vertx.core.http.HttpServerRequest -import jakarta.inject.Inject -import jakarta.inject.Singleton -import jakarta.ws.rs.container.ContainerRequestContext -import jakarta.ws.rs.core.UriInfo import io.hasura.ndc.ir.* import io.hasura.ndc.sqlgen.MutationTranslator +import io.opentelemetry.api.trace.Tracer import jakarta.annotation.Priority import jakarta.enterprise.inject.Alternative -import org.jboss.resteasy.reactive.server.ServerRequestFilter +import jakarta.inject.Inject +import jakarta.inject.Singleton import org.jooq.SQLDialect import org.jooq.conf.RenderQuotedNames -import org.jooq.conf.Settings -import org.jooq.impl.DefaultDSLContext -class Filters { - - @ServerRequestFilter(priority = 0) - fun logBodyFilter(info: UriInfo, request: HttpServerRequest, ctx: ContainerRequestContext) { - request.body { - val text = it.result().toString() - // Print JSON string formatted with Jackson - val json = jacksonObjectMapper().readValue(text) - println(jacksonObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(json)) - } - } -} - @Singleton @Alternative @Priority(1) @@ -80,7 +60,8 @@ class OracleDataConnectorService @Inject constructor( } override val jooqDialect = SQLDialect.ORACLE21C - override val jooqSettings = commonDSLContextSettings.withRenderQuotedNames(RenderQuotedNames.EXPLICIT_DEFAULT_UNQUOTED) + override val jooqSettings = + commonDSLContextSettings.withRenderQuotedNames(RenderQuotedNames.EXPLICIT_DEFAULT_UNQUOTED) override val sqlGenerator = JsonQueryGenerator override val mutationTranslator = MutationTranslator } diff --git a/ndc-connector-phoenix.dockerfile b/ndc-connector-phoenix.dockerfile index 94dace9..ca44646 100644 --- a/ndc-connector-phoenix.dockerfile +++ b/ndc-connector-phoenix.dockerfile @@ -1,7 +1,13 @@ # Build stage FROM registry.access.redhat.com/ubi9/openjdk-21:1.20-2 AS build +ARG JOOQ_PRO_EMAIL +ARG JOOQ_PRO_LICENSE + ENV LANG='en_US.UTF-8' LANGUAGE='en_US:en' +ENV JOOQ_PRO_EMAIL=${JOOQ_PRO_EMAIL} +ENV JOOQ_PRO_LICENSE=${JOOQ_PRO_LICENSE} + WORKDIR /build COPY . /build @@ -13,7 +19,7 @@ RUN ./gradlew :ndc-connector-phoenix:build --no-daemon --console=plain -x test FROM registry.access.redhat.com/ubi9/openjdk-21:1.20-2 ENV LANG='en_US.UTF-8' LANGUAGE='en_US:en' -ENV JAVA_OPTS_APPEND="-Dquarkus.http.host=0.0.0.0 -Djava.util.logging.manager=org.jboss.logmanager.LogManager" +ENV JAVA_OPTS_APPEND="-Dquarkus.http.host=0.0.0.0 -Djava.util.logging.manager=org.jboss.logmanager.LogManager --add-opens java.base/java.nio=ALL-UNNAMED --add-opens java.base/java.lang=ALL-UNNAMED" ENV JAVA_APP_JAR="/app/quarkus-run.jar" WORKDIR /app @@ -21,4 +27,4 @@ WORKDIR /app COPY --from=build /build/ndc-connector-phoenix/build/quarkus-app /app EXPOSE 8080 5005 -ENTRYPOINT [ "/opt/jboss/container/java/run/run-java.sh" ] \ No newline at end of file +ENTRYPOINT [ "/opt/jboss/container/java/run/run-java.sh" ] diff --git a/ndc-connector-phoenix/build.gradle.kts b/ndc-connector-phoenix/build.gradle.kts index 32466da..9c788a4 100644 --- a/ndc-connector-phoenix/build.gradle.kts +++ b/ndc-connector-phoenix/build.gradle.kts @@ -28,6 +28,7 @@ dependencies { // Phoenix JDBC driver implementation("org.apache.phoenix:phoenix-client-hbase-2.4:5.1.1") + implementation("org.apache.phoenix:phoenix-queryserver-client:5.0.0-HBase-2.0") implementation("org.jooq:jooq:3.19.8") } diff --git a/ndc-connector-phoenix/requests.http b/ndc-connector-phoenix/requests.http new file mode 100644 index 0000000..ed0c44f --- /dev/null +++ b/ndc-connector-phoenix/requests.http @@ -0,0 +1,50 @@ +POST http://localhost:8080/query +Content-Type: application/json + +{ + "collection": "US_POPULATION", + "query": { + "aggregates": null, + "fields": { + "CITY": { + "type": "column", + "column": "CITY" + }, + "STATE": { + "type": "column", + "column": "STATE" + }, + "POPULATION": { + "type": "column", + "column": "POPULATION" + } + }, + "limit": null, + "offset": null, + "order_by": null, + "predicate": { + "type": "binary_comparison_operator", + "operator": "_eq", + "column": { + "type": "column", + "name": "STATE", + "path": [] + }, + "value": { + "type": "variable", + "name": "STATE" + } + } + }, + "arguments": {}, + "collection_relationships": {}, + "variables": [ + { + "STATE": "NY" + }, + { + "STATE": "CA" + } + ], + "root_collection": "US_POPULATION" +} \ No newline at end of file diff --git a/ndc-connector-snowflake/.hasura-connector/connector-metadata.yaml b/ndc-connector-snowflake/.hasura-connector/connector-metadata.yaml index 2453f37..a8b09d5 100644 --- a/ndc-connector-snowflake/.hasura-connector/connector-metadata.yaml +++ b/ndc-connector-snowflake/.hasura-connector/connector-metadata.yaml @@ -1,6 +1,6 @@ packagingDefinition: type: PrebuiltDockerImage - dockerImage: "ghcr.io/hasura/ndc-jvm-snowflake:v0.1.0" + dockerImage: "ghcr.io/hasura/ndc-jvm-snowflake:v1.0.2" supportedEnvironmentVariables: - name: JDBC_URL description: "The JDBC URL to connect to the database" diff --git a/ndc-connector-snowflake/rsa_key.p8 b/ndc-connector-snowflake/rsa_key.p8 new file mode 100644 index 0000000..d6ad158 --- /dev/null +++ b/ndc-connector-snowflake/rsa_key.p8 @@ -0,0 +1,30 @@ +-----BEGIN ENCRYPTED PRIVATE KEY----- +MIIFHDBOBgkqhkiG9w0BBQ0wQTApBgkqhkiG9w0BBQwwHAQIT5DlQO7K/8ACAggA +MAwGCCqGSIb3DQIJBQAwFAYIKoZIhvcNAwcECN52Tor4Zy19BIIEyJ3PRIrNFOrc +UFg8x841OmVPXHZkLO+ABdKcM6kMIcUQHeJ15FAh+yCpTul1dE4GkKoiKNLd4TWW +oJxa7C92Pli4EffIUND+SCL5umoujmfHi+B9huaWhhBo7y0OgbvpLQvLoyux2r9H +gUSq+SC3jKQKgp9XLqlb8ggTFEPr1+zq1z9B06TVwRTMOv2cHLzaV87Etfi23U26 +o7xC88UnPZfBM6O07M9I+28NPoKJd5VFrYZqzqpfkOb5UgySapXyZD5E1tPyg5tN +TPF3PjoxumBwEfP+uKg9zf15bR9/yTtGzyWOdVF8frUE8BgPlUvc4pWA+NNk5Tp7 +I4ACeRmf9LfHM85xWRIli3lXLhHgaoDFePmVX9yuXpzjkoX6Bx3tkH2/RZJGvay4 +32GIm2Q2Z06MNkjvUp4y+kpeGaBhWxqDILUf5/JHg8MKWnCMkj7vNdc42AxKqCyZ +CAhUq4vdIRvzDO6TqCewgqFkSE/mAGibk9WgkyWZKBKHTfOp0BcZWkD2IuUAVYfY +S0pfxRmBjRQEJHHXp3eJP/eOIeiTjlWWqQvPhnd1Utp99RuvaNz+pbVo6kKzr5jN +V7nmAPpx8OreX+kHufyD7cWuJdFFKzYwBS0oE8fMUUhUVnKkHiL330WcvfZwhNM8 +ZI3pCY5IiBJ2p08zZHYQoatH3Q9hLdBUJNbbgZJXbX9nCbInhn06BXh3E3GMOz+c +GXV8spUZHpMuwY03pdZycn82iF7y2ztFDBqH5i730dCLs/lhU+syhnX3vTZZVX6W +lvZYBTOwcx8DfRR3SnvqQbpa0ALUOZEX30NaqwzK5EtK4oPIFASaoD0W65g7gaou +8/1ne9YJwqkNoKfT0+VoLKbrzMT9vsYYcD77X/+ksGo5M/gGZRMeAgVrImGsuTlV +3BcQMRFWLi/voU1P05Zf2dpHQluqBV+dcWA/o8N4eIg7AcodP2NTkA76OaxM9313 +DmiBAi3SaHeH10Eee5hMEqOYXutkAsGDCA+XxBXbrTTi6oIV/CTSPlbyygwmOYtG +Y27dv7fULDc0iMAy5r0KiE0qxM7q9BRoUTN9qa4OdKJaopRIfPNzEyqdk7FbBtOi +THLQHyUwG4yCBT+iurhWb2uSLvbzSZi96JND2/qzvzvORXgFc2sAWuvEx38B5KVL +FX4pCnz2xKfxchTpEMOxLGYY0zVnO4qEUI85YDovNTMGlFg4/XuGodOoxHSZbkDF +u8wguslcbtqqQnASN0ccjcGF9o3IYraa3BkrsN4D9+4sQdFIZlvh1cfq4wUn95RM +Vei9sfRSwdQjne62hneM4UA+U+wHZ8hJcEh4aaScppAmqWTGVAPIGEPQqf7hG1Zy +BEiME0+mSNt8J4uWR/kFda9vBiMoIblvwBNEOdzUf7spKy4h5qB21CkVcumphHcX +Sj8VeT5i1bouvExY3+BxSQSvveWP8tfGJuWyndssmlx3IYzisJoceepjH+vIyzHF +AeF8mdaq3V2OAiXihh88cHqU469HQrymUvdqWdNRTiovBPnMYmUy1rebH9yhGze9 +knD1XXGDku9dVc4Nm4RU6/eBtYXOdPrGTEoFVsCuU1pCe3IbKcE9oQb0IHAj46tG +W9tJsNdts5lH0I53pbGe5w== +-----END ENCRYPTED PRIVATE KEY----- diff --git a/ndc-connector-snowflake/rsa_key.pub b/ndc-connector-snowflake/rsa_key.pub new file mode 100644 index 0000000..a466a4c --- /dev/null +++ b/ndc-connector-snowflake/rsa_key.pub @@ -0,0 +1,9 @@ +-----BEGIN PUBLIC KEY----- +MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAxzVyIqmfTXxY8f0zuG/f +07VVF+iDq/4klM7rgMszQoQH3yXReWLB2QcBsTizQ10VIMWFNpgh7B/AChk2rw7z +KwZlFAwIOyYBIgYiyhtbOBP0mM4J7hGXWjV98f72GhS0DPTN6fIU492qAxY9xHIr +pll1XyjLARBejDkghjT+MYrnZRnVmSK6ntF16CGRoOehrsVy/UppMqmFYmkwfT7n +RUoLK4A0lognathg0JFMY82pnpi8RQmSiu9BUA78oefMQtOfklD2iOAlrxtszlaU +II2afigG5gZRxm+qfURiJ5IEOgRG3KEKjeHtmRjv4NS/08ZK3mC08JZ1UZbkutYi +UwIDAQAB +-----END PUBLIC KEY----- diff --git a/ndc-connector-snowflake/src/main/kotlin/io/hasura/snowflake/CTEQueryGenerator.kt b/ndc-connector-snowflake/src/main/kotlin/io/hasura/snowflake/CTEQueryGenerator.kt new file mode 100644 index 0000000..dd2a979 --- /dev/null +++ b/ndc-connector-snowflake/src/main/kotlin/io/hasura/snowflake/CTEQueryGenerator.kt @@ -0,0 +1,354 @@ +package io.hasura + +import io.hasura.ndc.ir.* +import io.hasura.ndc.ir.extensions.isVariablesRequest +import io.hasura.ndc.sqlgen.BaseQueryGenerator +import io.hasura.ndc.sqlgen.BaseQueryGenerator.Companion.INDEX +import io.hasura.ndc.sqlgen.BaseQueryGenerator.Companion.ROWS_AND_AGGREGATES +import org.jooq.* +import org.jooq.impl.DSL +import io.hasura.ndc.ir.Field as IRField + + +object SnowflakeDSL { + fun rlike(col: Field, field: Field): Condition = DSL.condition("? rlike(?)", col, field) +} + +object CTEQueryGenerator : BaseQueryGenerator() { + override fun queryRequestToSQL( + request: QueryRequest + ): Select<*> { + return buildCTEs(request) + .select(DSL.jsonArrayAgg(DSL.field(DSL.name(listOf("data", ROWS_AND_AGGREGATES))))) + .from(buildSelections(request).asTable("data")) + } + + + override fun forEachQueryRequestToSQL(request: QueryRequest): Select<*> { + return buildCTEs(request, listOf(buildVarsCTE(request))) + .select( + DSL.jsonArrayAgg( + DSL.coalesce( + DSL.field(DSL.name(listOf("data", ROWS_AND_AGGREGATES))) as Field<*>, + DSL.jsonObject("rows", DSL.jsonArray()) + ) + ) + .orderBy(DSL.field(DSL.name(listOf(VARS, INDEX)))) + ) + .from(buildSelections(request).asTable("data")) + .rightJoin(DSL.name(VARS)) + .on( + DSL.field(DSL.name("data", INDEX)) + .eq(DSL.field(DSL.name(VARS, INDEX))) + ) + } + + private fun buildCTEs(request: QueryRequest, varCTE: List> = emptyList()): WithStep { + return DSL.with( + varCTE + + forEachQueryLevelRecursively(request, CTEQueryGenerator::buildCTE).distinct() + ) + } + + private fun getCollectionAsjOOQName(collection: String): Name { + return DSL.name(collection.split(".")) + } + + private fun buildCTE( + request: QueryRequest, + relationship: Relationship?, + relSource: String? + ): CommonTableExpression<*> { + return DSL.name(genCTEName(request.collection)).`as`( + DSL.select(DSL.asterisk()) + .from( + DSL.select(DSL.table(getCollectionAsjOOQName(request.collection)).asterisk(), + DSL.rowNumber().over( + DSL.partitionBy( + mkJoinKeyFields( + relationship, request.collection + ) + ).orderBy( + run { + val orderByFields = translateIROrderByField(request) + + if (request.isVariablesRequest()) listOf( + DSL.field( + DSL.name( + listOf( + VARS, + INDEX + ) + ) + ) + ) else emptyList() + orderByFields.distinct().ifEmpty { listOf(DSL.trueCondition()) } + } + ) + ).`as`(getRNName(request.collection)) + + ) + .apply { + if (request.isVariablesRequest()) + this.select(DSL.table(DSL.name(VARS)).asterisk()) + } + .apply { + if (relationship != null + && (relationship.column_mapping.isNotEmpty() || relationship.arguments.isNotEmpty()) + ) { + from(DSL.name(genCTEName(relSource ?: request.collection))) + .innerJoin(DSL.name(relationship.target_collection)) + .on( + mkSQLJoin( + relationship, + sourceCollection = genCTEName(relSource ?: request.collection) + ) + ) + } else from(getCollectionAsjOOQName(request.collection)) + } + .apply { + addJoinsRequiredForOrderByFields( + this as SelectJoinStep<*>, + request, + sourceCollection = request.collection + ) + } + .apply {// cross join "vars" if this request contains variables + if (request.isVariablesRequest()) + (this as SelectJoinStep<*>).crossJoin(DSL.name(VARS)) + } + .apply { + addJoinsRequiredForPredicate( + request, + this as SelectJoinStep<*> + ) + } + .where(getWhereConditions(request)) + .asTable(request.collection.split(".").joinToString("_")) + ).where(mkOffsetLimit(request, DSL.field(DSL.name(getRNName(request.collection))))) + ) + } + + private fun forEachQueryLevelRecursively( + request: QueryRequest, + elementFn: (request: QueryRequest, rel: Relationship?, relSource: String?) -> T + ): List { + + fun recur( + request: QueryRequest, + relationship: Relationship?, + relSource: String? = null + ): List = buildList { + add(elementFn(request, relationship, relSource)) + + getQueryRelationFields(request.query.fields ?: emptyMap()).flatMapTo(this) { + val rel = request.collection_relationships[it.value.relationship]!! + val args = + if (rel.arguments.isEmpty() && rel.column_mapping.isEmpty() && it.value.arguments.isNotEmpty()) { + it.value.arguments + } else rel.arguments + + recur( + request = request.copy( + collection = rel.target_collection, + query = it.value.query + ), + relationship = rel.copy(arguments = args), + request.collection + ) + } + } + + return recur(request, null) + } + + + private fun genCTEName(collection: String) = "${collection}_CTE".split(".").joinToString("_") + private fun getRNName(collection: String) = "${collection}_RN".split(".").joinToString("_") + + private fun buildRows(request: QueryRequest): Field<*> { + val isObjectTarget = isTargetOfObjRel(request) + val agg = if (isObjectTarget) DSL::jsonArrayAggDistinct else DSL::jsonArrayAgg + return DSL.coalesce( + agg(buildRow(request)) + .orderBy( + setOrderBy(request, isObjectTarget) + ), + DSL.jsonArray() + ) + } + + private fun buildVariableRows(request: QueryRequest): Field<*> { + return DSL.arrayAgg(buildRow(request)) + .over(DSL.partitionBy(DSL.field(DSL.name(listOf(genCTEName(request.collection), INDEX))))) + } + + private fun buildRow(request: QueryRequest): Field<*> { + return DSL.jsonObject( + (request.query.fields?.map { (alias, field) -> + when (field) { + is IRField.ColumnField -> + DSL.jsonEntry( + alias, + DSL.field(DSL.name(genCTEName(request.collection), field.column)) + ) + + is IRField.RelationshipField -> { + val relation = request.collection_relationships[field.relationship]!! + + DSL.jsonEntry( + alias, + DSL.coalesce( + DSL.field( + DSL.name( + createAlias( + relation.target_collection, + isAggOnlyRelationField(field) + ), + ROWS_AND_AGGREGATES + ) + ) as Field<*>, + setRelFieldDefaults(field) + ) + ) + } + } + } ?: emptyList>()) + ) + } + + private fun isTargetOfObjRel(request: QueryRequest): Boolean { + return request.collection_relationships.values.find { + it.target_collection == request.collection && it.relationship_type == RelationshipType.Object + } != null + } + + private fun setRelFieldDefaults(field: IRField.RelationshipField): Field<*> { + return if (isAggOnlyRelationField(field)) + DSL.jsonObject("aggregates", setAggregateDefaults(field)) + else if (isAggRelationField(field)) + DSL.jsonObject( + DSL.jsonEntry("rows", DSL.jsonArray()), + DSL.jsonEntry("aggregates", setAggregateDefaults(field)) + ) + else DSL.jsonObject("rows", DSL.jsonArray()) + } + + + private fun isAggRelationField(field: IRField.RelationshipField) = !field.query.aggregates.isNullOrEmpty() + + private fun isAggOnlyRelationField(field: IRField.RelationshipField) = + field.query.fields == null && isAggRelationField(field) + + private fun setAggregateDefaults(field: IRField.RelationshipField): Field<*> = + getDefaultAggregateJsonEntries(field.query.aggregates) + + private fun setOrderBy(request: QueryRequest, isObjectTarget: Boolean): List> { + return if (isObjectTarget /* || request.isNativeQuery() */) emptyList() + else listOf(DSL.field(DSL.name(getRNName(request.collection))) as Field<*>) + } + + private fun buildSelections(request: QueryRequest): Select<*> { + val selects = forEachQueryLevelRecursively(request, CTEQueryGenerator::buildSelect) + + // this is a non-relational query so just return the single select + if (selects.size == 1) return selects.first().second + + + selects.forEachIndexed() { idx, (request, select) -> + val relationships = getQueryRelationFields(request.query.fields).values.map { + val rel = request.collection_relationships[it.relationship]!! + val args = if (rel.arguments.isEmpty() && rel.column_mapping.isEmpty() && it.arguments.isNotEmpty()) { + it.arguments + } else rel.arguments + rel.copy(arguments = args) + } + + relationships.forEach { relationship -> + + val innerSelects = + selects.minus(selects[idx]).filter { it.first.collection == relationship.target_collection } + + innerSelects.forEach { (innerRequest, innerSelect) -> + val innerAlias = createAlias( + innerRequest.collection, isAggregateOnlyRequest(innerRequest) + ) + + run { + select + .leftJoin( + innerSelect.asTable(innerAlias) + ) + .on( + mkSQLJoin( + relationship, + sourceCollection = genCTEName(request.collection), + targetTableNameTransform = { innerAlias } + ) + ) + } + } + } + } + return selects.first().second + } + + private fun getVarCols(request: QueryRequest): List> { + fun getVars(e: Expression): List> { + return when (e) { + is Expression.And -> e.expressions.flatMap { getVars(it) } + is Expression.Or -> e.expressions.flatMap { getVars(it) } + is Expression.Not -> getVars(e.expression) + is Expression.ApplyBinaryArrayComparison -> + if (e.values.filterIsInstance().isNotEmpty()) + listOf(DSL.field(DSL.name(e.column.name))) + else emptyList() + + is Expression.ApplyBinaryComparison -> + if (e.value is ComparisonValue.VariableComp) + listOf(DSL.field(DSL.name(e.column.name))) + else emptyList() + + else -> emptyList() + } + } + + return request.query.predicate?.let { getVars(it) } ?: emptyList() + } + + private fun buildSelect( + request: QueryRequest, + relationship: Relationship? = null, + relSource: String? = null + ): Pair> { + val joinFields = if (relationship != null) + mkJoinKeyFields(relationship, genCTEName(relationship.target_collection)) + else emptyList() + + return Pair( + request, + DSL.selectDistinct( + listOf( + buildOuterStructure( + request, + if (request.isVariablesRequest()) CTEQueryGenerator::buildVariableRows else CTEQueryGenerator::buildRows + ).`as`(ROWS_AND_AGGREGATES) + ) + + if (request.isVariablesRequest()) + (getVarCols(request) + listOf(DSL.field(DSL.name(INDEX)))) + else emptyList() + ) + .apply { + this.select(joinFields) + } + .from(DSL.name(genCTEName(request.collection))) + .apply { + if (joinFields.isNotEmpty()) groupBy(joinFields) + } + ) + } + + private fun createAlias(collection: String, isAggregateOnly: Boolean): String { + return "$collection${if (isAggregateOnly) "_AGG" else ""}".replace(".", "_") + } + +} \ No newline at end of file diff --git a/ndc-connector-snowflake/src/main/kotlin/io/hasura/snowflake/JSONGenerator.kt b/ndc-connector-snowflake/src/main/kotlin/io/hasura/snowflake/JSONGenerator.kt deleted file mode 100644 index 142427c..0000000 --- a/ndc-connector-snowflake/src/main/kotlin/io/hasura/snowflake/JSONGenerator.kt +++ /dev/null @@ -1,319 +0,0 @@ -package io.hasura.snowflake - -import io.hasura.ndc.app.services.ConnectorConfigurationLoader -import io.hasura.ndc.common.NDCScalar -import io.hasura.ndc.ir.* -import io.hasura.ndc.ir.Field.ColumnField -import io.hasura.ndc.ir.Field as IRField -import io.hasura.ndc.sqlgen.BaseQueryGenerator -import org.gradle.internal.impldep.org.junit.platform.commons.util.FunctionUtils.where -import org.jooq.* -import org.jooq.Field -import org.jooq.impl.CustomField -import org.jooq.impl.DSL -import org.jooq.impl.DSL.jsonArrayAgg -import org.jooq.impl.DSL.orderBy -import org.jooq.impl.SQLDataType - - -object JsonQueryGenerator : BaseQueryGenerator() { - override fun buildComparison( - col: Field, - operator: ApplyBinaryComparisonOperator, - value: Field - ): Condition { - return when (operator) { - ApplyBinaryComparisonOperator.EQ -> col.eq(value) - ApplyBinaryComparisonOperator.GT -> col.gt(value) - ApplyBinaryComparisonOperator.GTE -> col.ge(value) - ApplyBinaryComparisonOperator.LT -> col.lt(value) - ApplyBinaryComparisonOperator.LTE -> col.le(value) - ApplyBinaryComparisonOperator.IN -> col.`in`(value) - ApplyBinaryComparisonOperator.IS_NULL -> col.isNull - ApplyBinaryComparisonOperator.LIKE -> col.like(value as Field) - } - } - - override fun forEachQueryRequestToSQL(request: QueryRequest): Select<*> { - TODO("Not yet implemented") - } - - override fun queryRequestToSQL(request: QueryRequest): Select<*> { - return queryRequestToSQLInternal(request) - } - - fun queryRequestToSQLInternal( - request: QueryRequest, - parentTable: String? = null, - parentRelationship: Relationship? = null, - ): SelectHavingStep> { - val requiredJoinTables = if (request.query.predicate != null) { - collectRequiredJoinTablesForWhereClause( - where = request.query.predicate!!, - collectionRelationships = request.collection_relationships - ) - } else { - emptySet() - } - - val referencedColumns = collectColumnsReferencedByRelationships( - fields = request.query.fields ?: emptyMap(), - collectionRelationships = request.collection_relationships - ) - - return DSL.select( - DSL.jsonObject( - DSL.jsonEntry( - "rows", - DSL.select( - DSL.jsonArrayAgg( - DSL.jsonObject( - (request.query.fields ?: emptyMap()).map { (alias, field) -> - when (field) { - is ColumnField -> { - DSL.jsonEntry( - alias, - DSL.field(DSL.name(field.column)) - ) - } - - is IRField.RelationshipField -> { - val relationship = - request.collection_relationships[field.relationship] - ?: error("Relationship ${field.relationship} not found") - - val subQuery = queryRequestToSQLInternal( - parentTable = request.collection, - parentRelationship = relationship, - request = QueryRequest( - collection = relationship.target_collection, - collection_relationships = request.collection_relationships, - query = field.query, - arguments = field.arguments, - variables = null - ) - ) - - DSL.jsonEntry( - alias, - DSL.select( - subQuery.asField(alias) - ) - ) - } - } - } - ) - ).orderBy( - DSL.field(DSL.name("rn")) - ) - ).from( - DSL.select( - buildList> { - if (request.query.fields != null) { - addAll( - request.query.fields!!.filter { (_, field) -> field is ColumnField } - .map { (alias, field) -> - field as ColumnField - DSL.field(DSL.name(request.collection, field.column)).`as`(alias) - } - ) - } - if (referencedColumns.isNotEmpty()) { - addAll( - referencedColumns.map { - DSL.field(DSL.name(request.collection, it)) - } - ) - } - add( - DSL.rowNumber().over( - DSL.partitionBy( - parentRelationship?.column_mapping?.values?.map { - DSL.field(DSL.name(parentRelationship.target_collection, it)) - } ?: emptyList() - ).orderBy( - translateIROrderByField( - orderBy = request.query.order_by, - currentCollection = getTableName(request.collection), - relationships = request.collection_relationships - ) - ) - ).`as`( - DSL.name("rn") - ) - ) - } - ).from( - run> { - val table = DSL.table(DSL.name(request.collection)) - if (request.query.predicate == null) { - table - } else { - requiredJoinTables.foldIndexed(table) { index, acc, relationship -> - val parentTable = if (index == 0) { - request.collection - } else { - requiredJoinTables.elementAt(index - 1).target_collection - } - - val joinTable = DSL.table(DSL.name(relationship.target_collection)) - acc.join(joinTable).on( - mkJoinWhereClause( - sourceTable = parentTable, - parentRelationship = relationship - ) - ) - } - } - } - ).apply { - if (request.query.predicate != null) { - where(getWhereConditions(request)) - } - }.asTable( - DSL.name(getTableName(request.collection)) - ) - ).apply { - if (parentRelationship != null) { - where( - parentRelationship.column_mapping.map { (from, to) -> - DSL.field(DSL.name(to)).eq( - DSL.field(DSL.name(getTableName(parentTable!!), from)) - ) - } - ) - } - if (request.query.offset != null) { - where(DSL.field(DSL.name("rn")).gt(request.query.offset)) - } - if (request.query.limit != null) { - val offset = request.query.offset ?: 0 - val upperBound = offset + request.query.limit!! - where(DSL.field(DSL.name("rn")).le(upperBound)) - } - } - ) - ) - ) - } - - fun collectRequiredJoinTablesForWhereClause( - where: Expression, - collectionRelationships: Map, - previousTableName: String? = null - ): Set { - return when (where) { - is ExpressionOnColumn -> when (val column = where.column) { - is ComparisonColumn.Column -> { - column.path.fold(emptySet()) { acc, path -> - val relationship = collectionRelationships[path.relationship] - ?: error("Relationship ${path.relationship} not found") - - acc + relationship - } - } - - else -> emptySet() - } - - is Expression.And -> where.expressions.fold(emptySet()) { acc, expr -> - acc + collectRequiredJoinTablesForWhereClause(expr, collectionRelationships) - } - - is Expression.Or -> where.expressions.fold(emptySet()) { acc, expr -> - acc + collectRequiredJoinTablesForWhereClause(expr, collectionRelationships) - } - - is Expression.Not -> collectRequiredJoinTablesForWhereClause(where.expression, collectionRelationships) - - else -> emptySet() - } - } - - // Returns all the columns in a parent table which are referenced - // by fields of type "relationship" and needed to join the two tables - // - // If a join column is already present in the requested fields, we skip it to avoid duplication - fun collectColumnsReferencedByRelationships( - fields: Map, - collectionRelationships: Map - ): Set { - val columnFields = fields.values - .filterIsInstance() - .map { it.column }.toSet() - - return fields.values - .filterIsInstance() - .mapNotNull { field -> - collectionRelationships[field.relationship] - ?.column_mapping - ?.values - ?.filterNot(columnFields::contains) - } - .flatten() - .toSet() - } - - private fun columnTypeTojOOQType(collection: String, field: ColumnField): org.jooq.DataType { - val connectorConfig = ConnectorConfigurationLoader.config - - val table = connectorConfig.tables.find { it.tableName == collection } - ?: error("Table $collection not found in connector configuration") - - val column = table.columns.find { it.name == field.column } - ?: error("Column ${field.column} not found in table $collection") - - val scalarType = SnowflakeJDBCSchemaGenerator.mapScalarType(column.type, column.numeric_scale) - return when (scalarType) { - NDCScalar.BOOLEAN -> SQLDataType.BOOLEAN - NDCScalar.INT -> SQLDataType.INTEGER - NDCScalar.FLOAT -> SQLDataType.FLOAT - NDCScalar.STRING -> SQLDataType.CLOB - NDCScalar.DATE -> SQLDataType.DATE - NDCScalar.DATETIME -> SQLDataType.TIMESTAMP - NDCScalar.DATETIME_WITH_TIMEZONE -> SQLDataType.TIMESTAMP - NDCScalar.TIME -> SQLDataType.TIME - NDCScalar.TIME_WITH_TIMEZONE -> SQLDataType.TIME - } - } - - private fun getAggregatejOOQFunction(aggregate: Aggregate) = when (aggregate) { - is Aggregate.StarCount -> DSL.count() - is Aggregate.SingleColumn -> { - val col = DSL.field(DSL.name(aggregate.column)) as Field - when (aggregate.function) { - SingleColumnAggregateFunction.AVG -> DSL.avg(col) - SingleColumnAggregateFunction.MAX -> DSL.max(col) - SingleColumnAggregateFunction.MIN -> DSL.min(col) - SingleColumnAggregateFunction.SUM -> DSL.sum(col) - SingleColumnAggregateFunction.STDDEV_POP -> DSL.stddevPop(col) - SingleColumnAggregateFunction.STDDEV_SAMP -> DSL.stddevSamp(col) - SingleColumnAggregateFunction.VAR_POP -> DSL.varPop(col) - SingleColumnAggregateFunction.VAR_SAMP -> DSL.varSamp(col) - } - } - - is Aggregate.ColumnCount -> { - val col = DSL.field(DSL.name(aggregate.column)) - if (aggregate.distinct) DSL.countDistinct(col) else DSL.count(col) - } - } - - private fun mkJoinWhereClause( - sourceTable: String, - parentRelationship: Relationship, - parentTableAlias: String? = null - ) = DSL.and( - parentRelationship.column_mapping.map { (from, to) -> - val childField = DSL.field(DSL.name(getTableName(sourceTable), from)) - val parentField = DSL.field(DSL.name(parentTableAlias ?: parentRelationship.target_collection, to)) - childField.eq(parentField) - } - ) - - private fun getTableName(collection: String): String { - return collection.split('.').last() - } - -} diff --git a/ndc-connector-snowflake/src/main/kotlin/io/hasura/snowflake/SnowflakeDataConnectorService.kt b/ndc-connector-snowflake/src/main/kotlin/io/hasura/snowflake/SnowflakeDataConnectorService.kt index 42fde11..0718383 100644 --- a/ndc-connector-snowflake/src/main/kotlin/io/hasura/snowflake/SnowflakeDataConnectorService.kt +++ b/ndc-connector-snowflake/src/main/kotlin/io/hasura/snowflake/SnowflakeDataConnectorService.kt @@ -1,38 +1,20 @@ package io.hasura.snowflake -import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import com.fasterxml.jackson.module.kotlin.readValue +import io.hasura.CTEQueryGenerator import io.hasura.ndc.app.interfaces.IDataSourceProvider -import io.hasura.ndc.app.services.ConnectorConfigurationLoader import io.hasura.ndc.app.services.dataConnectors.BaseDataConnectorService -import io.opentelemetry.api.trace.Tracer -import io.vertx.core.http.HttpServerRequest -import jakarta.inject.Inject -import jakarta.inject.Singleton -import jakarta.ws.rs.container.ContainerRequestContext -import jakarta.ws.rs.core.UriInfo import io.hasura.ndc.ir.* import io.hasura.ndc.sqlgen.MutationTranslator +import io.opentelemetry.api.trace.Tracer import jakarta.annotation.Priority import jakarta.enterprise.inject.Alternative -import org.jboss.resteasy.reactive.server.ServerRequestFilter +import jakarta.inject.Inject +import jakarta.inject.Singleton import org.jooq.SQLDialect import org.jooq.conf.RenderQuotedNames -class Filters { - - @ServerRequestFilter(priority = 0) - fun logBodyFilter(info: UriInfo, request: HttpServerRequest, ctx: ContainerRequestContext) { - request.body { - val text = it.result().toString() - // Print JSON string formatted with Jackson - val json = jacksonObjectMapper().readValue(text) - println(jacksonObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(json)) - } - } -} - @Singleton @Alternative @Priority(1) @@ -62,30 +44,24 @@ class SnowflakeDataConnectorService @Inject constructor( ) override fun handleQuery(request: QueryRequest): List { - println(ConnectorConfigurationLoader.config) - val dslCtx = mkDSLCtx() - val query = JsonQueryGenerator.queryRequestToSQL(request) - println( - dslCtx - .renderInlined(query) - ) + val query = if (!request.variables.isNullOrEmpty()) { + CTEQueryGenerator.forEachQueryRequestToSQL(request) + } else { + CTEQueryGenerator.queryRequestToSQL(request) + } val rows = executeDbQuery(query, dslCtx) val json = rows.getValue(0, 0).toString() - val rowset = objectMapper.readValue(json) - return if (rowset == null) { - listOf(RowSet(rows = emptyList(), aggregates = emptyMap())) - } else { - listOf(rowset) - } + val rowsets = objectMapper.readValue>(json) + return rowsets } override val jooqDialect = SQLDialect.SNOWFLAKE override val jooqSettings = - commonDSLContextSettings.withRenderQuotedNames(RenderQuotedNames.EXPLICIT_DEFAULT_UNQUOTED) - override val sqlGenerator = JsonQueryGenerator + commonDSLContextSettings.withRenderQuotedNames(RenderQuotedNames.EXPLICIT_DEFAULT_QUOTED) + override val sqlGenerator = CTEQueryGenerator override val mutationTranslator = MutationTranslator } diff --git a/ndc-ir/src/main/kotlin/io/hasura/ndc/common/ConnectorConfiguration.kt b/ndc-ir/src/main/kotlin/io/hasura/ndc/common/ConnectorConfiguration.kt index 72e37b3..4e424a1 100644 --- a/ndc-ir/src/main/kotlin/io/hasura/ndc/common/ConnectorConfiguration.kt +++ b/ndc-ir/src/main/kotlin/io/hasura/ndc/common/ConnectorConfiguration.kt @@ -1,10 +1,39 @@ package io.hasura.ndc.common +import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper +import com.fasterxml.jackson.module.kotlin.readValue +import java.io.File +import java.nio.file.Path + data class ConnectorConfiguration( - val jdbcUrl: String, + val jdbcUrl: String = "", val jdbcProperties: Map = emptyMap(), val schemas: List = emptyList(), val tables: List = emptyList(), val functions: List = emptyList(), val nativeQueries: Map = emptyMap() -) \ No newline at end of file +) { + + object Loader { + private val mapper = jacksonObjectMapper() + + private const val DEFAULT_CONFIG_DIRECTORY = "/etc/connector" + private const val CONFIG_FILE_NAME = "configuration.json" + + val CONFIG_DIRECTORY: String = System.getenv("HASURA_CONFIGURATION_DIRECTORY") ?: DEFAULT_CONFIG_DIRECTORY + val config: ConnectorConfiguration = loadConfigFile(getConfigFilePath()) + + fun getConfigFilePath(): Path { + return Path.of(CONFIG_DIRECTORY, CONFIG_FILE_NAME) + } + + fun loadConfigFile(path: Path): ConnectorConfiguration { + val file = File(path.toString()) + return if (!file.exists()) { + ConnectorConfiguration() + } else { + mapper.readValue(file) + } + } + } +} \ No newline at end of file diff --git a/ndc-sqlgen/src/main/kotlin/io/hasura/ndc/sqlgen/BaseQueryGenerator.kt b/ndc-sqlgen/src/main/kotlin/io/hasura/ndc/sqlgen/BaseQueryGenerator.kt index cd944b7..270b149 100644 --- a/ndc-sqlgen/src/main/kotlin/io/hasura/ndc/sqlgen/BaseQueryGenerator.kt +++ b/ndc-sqlgen/src/main/kotlin/io/hasura/ndc/sqlgen/BaseQueryGenerator.kt @@ -1,5 +1,8 @@ package io.hasura.ndc.sqlgen +import io.hasura.ndc.common.ConnectorConfiguration +import io.hasura.ndc.common.NativeQueryInfo +import io.hasura.ndc.common.NativeQueryPart import io.hasura.ndc.ir.* import io.hasura.ndc.ir.extensions.isVariablesRequest import io.hasura.ndc.ir.Field as IRField @@ -22,13 +25,49 @@ abstract class BaseQueryGenerator : BaseGenerator { throw NotImplementedError("Mutation not supported for this data source") } + fun mkNativeQueryCTE( + request: QueryRequest + ): org.jooq.WithStep { + val config = ConnectorConfiguration.Loader.config + + fun renderNativeQuerySQL( + nativeQuery: NativeQueryInfo, + arguments: Map + ): String { + val sql = nativeQuery.sql + val parts = sql.parts + + return parts.joinToString("") { part -> + when (part) { + is NativeQueryPart.Text -> part.value + is NativeQueryPart.Parameter -> { + val argument = arguments[part.value] ?: error("Argument ${part.value} not found") + when (argument) { + is Argument.Literal -> argument.value.toString() + else -> error("Only literals are supported in Native Queries in this version") + } + } + } + } + } + + val nativeQuery = config.nativeQueries[request.collection]!! + val nativeQuerySQL = renderNativeQuerySQL(nativeQuery, request.arguments) + + return DSL.with( + DSL.name(request.collection) + ).`as`( + DSL.resultQuery(nativeQuerySQL) + ) + } + fun getQueryColumnFields(fields: Map): Map { return fields .filterValues { it is IRField.ColumnField } .mapValues { it.value as IRField.ColumnField } } - fun getQueryRelationFields(fields: Map?): Map { + protected fun getQueryRelationFields(fields: Map?): Map { return fields ?.filterValues { it is IRField.RelationshipField } ?.mapValues { it.value as IRField.RelationshipField } @@ -373,7 +412,7 @@ abstract class BaseQueryGenerator : BaseGenerator { val fields = request.variables!!.flatMap { it.keys }.toSet() return DSL .name(VARS + suffix) - .fields(*fields.toTypedArray().plus(INDEX)) + .fields(*fields.plus(INDEX).map { DSL.quotedName(it) }.toTypedArray()) .`as`( request.variables!!.mapIndexed { idx, variable -> val f = variable.values.map { value -> @@ -407,7 +446,9 @@ abstract class BaseQueryGenerator : BaseGenerator { e = where, request ) - } ?: DSL.noCondition())) + } ?: DSL.noCondition())).also { + println("Where conditions: $it") + } } protected fun getDefaultAggregateJsonEntries(aggregates: Map?): Field<*> { @@ -429,7 +470,7 @@ abstract class BaseQueryGenerator : BaseGenerator { const val MAX_QUERY_ROWS = 2147483647 const val FOREACH_ROWS = "foreach_rows" const val VARS = "vars" - const val INDEX = "index" + const val INDEX = "idx" const val ROWS_AND_AGGREGATES = "rows_and_aggregates" } } diff --git a/requests.http b/requests.http new file mode 100644 index 0000000..ff17d50 --- /dev/null +++ b/requests.http @@ -0,0 +1,126 @@ +POST http://0.0.0.0:8080/query +Content-Type: application/json + +{ + "collection": "CHINOOK.PUBLIC.ARTIST", + "query": { + "fields": { + "artistid": { + "type": "column", + "column": "ARTISTID", + "fields": null + }, + "name": { + "type": "column", + "column": "NAME", + "fields": null + }, + "chinookAlbums": { + "type": "relationship", + "query": { + "fields": { + "albumid": { + "type": "column", + "column": "ALBUMID", + "fields": null + }, + "artistid": { + "type": "column", + "column": "ARTISTID", + "fields": null + }, + "title": { + "type": "column", + "column": "TITLE", + "fields": null + } + }, + "limit": 1, + "order_by": { + "elements": [ + { + "order_direction": "asc", + "target": { + "type": "column", + "name": "ALBUMID", + "path": [] + } + } + ] + } + }, + "relationship": "app___ChinookArtist__chinookAlbums", + "arguments": {} + } + }, + "limit": 25, + "order_by": { + "elements": [ + { + "order_direction": "asc", + "target": { + "type": "column", + "name": "ARTISTID", + "path": [] + } + } + ] + } + }, + "arguments": {}, + "collection_relationships": { + "app___ChinookArtist__chinookAlbums": { + "column_mapping": { + "ARTISTID": "ARTISTID" + }, + "relationship_type": "array", + "target_collection": "CHINOOK.PUBLIC.ALBUM", + "arguments": {} + } + } +} + +### +POST http://0.0.0.0:8080/query +Content-Type: application/json + +{ + "collection": "CHINOOK.PUBLIC.ARTIST", + "query": { + "fields": { + "artistid": { + "type": "column", + "column": "ARTISTID", + "fields": null + }, + "name": { + "type": "column", + "column": "NAME", + "fields": null + } + }, + "predicate": { + "type": "binary_comparison_operator", + "operator": "_eq", + "column": { + "type": "column", + "name": "ARTISTID", + "path": [] + }, + "value": { + "type": "variable", + "name": "$ARTISTID" + } + } + }, + "arguments": {}, + "variables": [ + { + "$ARTISTID": 1 + }, + { + "$ARTISTID": 2 + } + ], + "root_collection": "CHINOOK.PUBLIC.ARTIST" +} \ No newline at end of file diff --git a/rsa_key.p8 b/rsa_key.p8 new file mode 100644 index 0000000..d6ad158 --- /dev/null +++ b/rsa_key.p8 @@ -0,0 +1,30 @@ +-----BEGIN ENCRYPTED PRIVATE KEY----- +MIIFHDBOBgkqhkiG9w0BBQ0wQTApBgkqhkiG9w0BBQwwHAQIT5DlQO7K/8ACAggA +MAwGCCqGSIb3DQIJBQAwFAYIKoZIhvcNAwcECN52Tor4Zy19BIIEyJ3PRIrNFOrc +UFg8x841OmVPXHZkLO+ABdKcM6kMIcUQHeJ15FAh+yCpTul1dE4GkKoiKNLd4TWW +oJxa7C92Pli4EffIUND+SCL5umoujmfHi+B9huaWhhBo7y0OgbvpLQvLoyux2r9H +gUSq+SC3jKQKgp9XLqlb8ggTFEPr1+zq1z9B06TVwRTMOv2cHLzaV87Etfi23U26 +o7xC88UnPZfBM6O07M9I+28NPoKJd5VFrYZqzqpfkOb5UgySapXyZD5E1tPyg5tN +TPF3PjoxumBwEfP+uKg9zf15bR9/yTtGzyWOdVF8frUE8BgPlUvc4pWA+NNk5Tp7 +I4ACeRmf9LfHM85xWRIli3lXLhHgaoDFePmVX9yuXpzjkoX6Bx3tkH2/RZJGvay4 +32GIm2Q2Z06MNkjvUp4y+kpeGaBhWxqDILUf5/JHg8MKWnCMkj7vNdc42AxKqCyZ +CAhUq4vdIRvzDO6TqCewgqFkSE/mAGibk9WgkyWZKBKHTfOp0BcZWkD2IuUAVYfY +S0pfxRmBjRQEJHHXp3eJP/eOIeiTjlWWqQvPhnd1Utp99RuvaNz+pbVo6kKzr5jN +V7nmAPpx8OreX+kHufyD7cWuJdFFKzYwBS0oE8fMUUhUVnKkHiL330WcvfZwhNM8 +ZI3pCY5IiBJ2p08zZHYQoatH3Q9hLdBUJNbbgZJXbX9nCbInhn06BXh3E3GMOz+c +GXV8spUZHpMuwY03pdZycn82iF7y2ztFDBqH5i730dCLs/lhU+syhnX3vTZZVX6W +lvZYBTOwcx8DfRR3SnvqQbpa0ALUOZEX30NaqwzK5EtK4oPIFASaoD0W65g7gaou +8/1ne9YJwqkNoKfT0+VoLKbrzMT9vsYYcD77X/+ksGo5M/gGZRMeAgVrImGsuTlV +3BcQMRFWLi/voU1P05Zf2dpHQluqBV+dcWA/o8N4eIg7AcodP2NTkA76OaxM9313 +DmiBAi3SaHeH10Eee5hMEqOYXutkAsGDCA+XxBXbrTTi6oIV/CTSPlbyygwmOYtG +Y27dv7fULDc0iMAy5r0KiE0qxM7q9BRoUTN9qa4OdKJaopRIfPNzEyqdk7FbBtOi +THLQHyUwG4yCBT+iurhWb2uSLvbzSZi96JND2/qzvzvORXgFc2sAWuvEx38B5KVL +FX4pCnz2xKfxchTpEMOxLGYY0zVnO4qEUI85YDovNTMGlFg4/XuGodOoxHSZbkDF +u8wguslcbtqqQnASN0ccjcGF9o3IYraa3BkrsN4D9+4sQdFIZlvh1cfq4wUn95RM +Vei9sfRSwdQjne62hneM4UA+U+wHZ8hJcEh4aaScppAmqWTGVAPIGEPQqf7hG1Zy +BEiME0+mSNt8J4uWR/kFda9vBiMoIblvwBNEOdzUf7spKy4h5qB21CkVcumphHcX +Sj8VeT5i1bouvExY3+BxSQSvveWP8tfGJuWyndssmlx3IYzisJoceepjH+vIyzHF +AeF8mdaq3V2OAiXihh88cHqU469HQrymUvdqWdNRTiovBPnMYmUy1rebH9yhGze9 +knD1XXGDku9dVc4Nm4RU6/eBtYXOdPrGTEoFVsCuU1pCe3IbKcE9oQb0IHAj46tG +W9tJsNdts5lH0I53pbGe5w== +-----END ENCRYPTED PRIVATE KEY----- diff --git a/rsa_key.pub b/rsa_key.pub new file mode 100644 index 0000000..a466a4c --- /dev/null +++ b/rsa_key.pub @@ -0,0 +1,9 @@ +-----BEGIN PUBLIC KEY----- +MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAxzVyIqmfTXxY8f0zuG/f +07VVF+iDq/4klM7rgMszQoQH3yXReWLB2QcBsTizQ10VIMWFNpgh7B/AChk2rw7z +KwZlFAwIOyYBIgYiyhtbOBP0mM4J7hGXWjV98f72GhS0DPTN6fIU492qAxY9xHIr +pll1XyjLARBejDkghjT+MYrnZRnVmSK6ntF16CGRoOehrsVy/UppMqmFYmkwfT7n +RUoLK4A0lognathg0JFMY82pnpi8RQmSiu9BUA78oefMQtOfklD2iOAlrxtszlaU +II2afigG5gZRxm+qfURiJ5IEOgRG3KEKjeHtmRjv4NS/08ZK3mC08JZ1UZbkutYi +UwIDAQAB +-----END PUBLIC KEY-----