From c6775aa546f0916956c58af929787c69d2f852b1 Mon Sep 17 00:00:00 2001 From: Jeremy Parr-Pearson <94406158+jeremyp-bq@users.noreply.github.com> Date: Wed, 8 Dec 2021 13:09:28 -0800 Subject: [PATCH 1/3] Always commit rejected data table (#275) --- README.md | 6 ++++++ .../core/VerticaDistributedFilesystemWritePipe.scala | 4 ++-- .../VerticaDistributedFilesystemWritePipeTest.scala | 12 ++++++------ 3 files changed, 14 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index 9a76070a0..386f462cc 100644 --- a/README.md +++ b/README.md @@ -147,6 +147,12 @@ Below is a detailed list of connector options that are used in the options map: Note: If you are using the S3 properties, the connector options has priority over the Spark configuration, which has priority over the environment variables. +### Status and Rejected Data Tables + +When data is written to Vertica there are some metadata tables that may be populated with information about the transaction. These tables can help troubleshoot issues that occur during write operations. + +Both tables are added under the target schema (`dbschema` option). The status table is updated for every write operation, and is named `S2V_JOB_STATUS_USER_` (e.g. `S2V_JOB_STATUS_USER_DBADMIN`, where the `user` option was `dbadmin`). A new rejected data table is created each time there is at least one rejected row in a transaction, and is named `__COMMITS` (e.g. `dftest_f1804318_57cf_4d9d_99d6_044d06db5e22_COMMITS`, where the `table` option was `dftest` and the generated session id was `f1804318_57cf_4d9d_99d6_044d06db5e22`). The `session_id` part of the rejected data table name is the same UUID that is used when writing data to HDFS and also when adding entries to job status table, so a specific operation can be traced to see if there were any issues. + ## Examples If you would like to try out the connector, we have several example applications you can run [in the examples folder](https://github.com/vertica/spark-connector/tree/main/examples). diff --git a/connector/src/main/scala/com/vertica/spark/datasource/core/VerticaDistributedFilesystemWritePipe.scala b/connector/src/main/scala/com/vertica/spark/datasource/core/VerticaDistributedFilesystemWritePipe.scala index 93334d316..c79b1c915 100644 --- a/connector/src/main/scala/com/vertica/spark/datasource/core/VerticaDistributedFilesystemWritePipe.scala +++ b/connector/src/main/scala/com/vertica/spark/datasource/core/VerticaDistributedFilesystemWritePipe.scala @@ -194,10 +194,10 @@ class VerticaDistributedFilesystemWritePipe(val config: DistributedFilesystemWri def buildCopyStatement(targetTable: String, columnList: String, url: String, rejectsTableName: String, fileFormat: String): String = { if (config.mergeKey.isDefined) { - s"COPY $targetTable FROM '$url' ON ANY NODE $fileFormat REJECTED DATA AS TABLE $rejectsTableName NO COMMIT" + s"COPY $targetTable FROM '$url' ON ANY NODE $fileFormat REJECTED DATA AS TABLE $rejectsTableName" } else { - s"COPY $targetTable $columnList FROM '$url' ON ANY NODE $fileFormat REJECTED DATA AS TABLE $rejectsTableName NO COMMIT" + s"COPY $targetTable $columnList FROM '$url' ON ANY NODE $fileFormat REJECTED DATA AS TABLE $rejectsTableName" } } diff --git a/connector/src/test/scala/com/vertica/spark/datasource/core/VerticaDistributedFilesystemWritePipeTest.scala b/connector/src/test/scala/com/vertica/spark/datasource/core/VerticaDistributedFilesystemWritePipeTest.scala index c02232a5e..5f7046281 100644 --- a/connector/src/test/scala/com/vertica/spark/datasource/core/VerticaDistributedFilesystemWritePipeTest.scala +++ b/connector/src/test/scala/com/vertica/spark/datasource/core/VerticaDistributedFilesystemWritePipeTest.scala @@ -317,7 +317,7 @@ class VerticaDistributedFilesystemWritePipeTest extends AnyFlatSpec with BeforeA val uniqueId = "unique-id" - val expected = "COPY \"dummy\" FROM 'hdfs://example-hdfs:8020/tmp/test/*.parquet' ON ANY NODE parquet REJECTED DATA AS TABLE \"dummy_id_COMMITS\" NO COMMIT" + val expected = "COPY \"dummy\" FROM 'hdfs://example-hdfs:8020/tmp/test/*.parquet' ON ANY NODE parquet REJECTED DATA AS TABLE \"dummy_id_COMMITS\"" val jdbcLayerInterface = mock[JdbcLayerInterface] (jdbcLayerInterface.configureSession _).expects(*).returning(Right(())) @@ -347,7 +347,7 @@ class VerticaDistributedFilesystemWritePipeTest extends AnyFlatSpec with BeforeA it should "call vertica copy upon commit with a custom copy list" in { val config = createWriteConfig().copy(copyColumnList = ValidColumnList("col1 INTEGER, col2 FLOAT").getOrElse(None)) - val expected = "COPY \"dummy\" (col1 INTEGER,col2 FLOAT) FROM 'hdfs://example-hdfs:8020/tmp/test/*.parquet' ON ANY NODE parquet REJECTED DATA AS TABLE \"dummy_id_COMMITS\" NO COMMIT" + val expected = "COPY \"dummy\" (col1 INTEGER,col2 FLOAT) FROM 'hdfs://example-hdfs:8020/tmp/test/*.parquet' ON ANY NODE parquet REJECTED DATA AS TABLE \"dummy_id_COMMITS\"" val jdbcLayerInterface = mock[JdbcLayerInterface] (jdbcLayerInterface.configureSession _).expects(*).returning(Right(())) @@ -442,7 +442,7 @@ class VerticaDistributedFilesystemWritePipeTest extends AnyFlatSpec with BeforeA (jdbcLayerInterface.configureSession _).expects(*).returning(Right(())) (jdbcLayerInterface.executeUpdate _).expects(*,*).returning(Right(1)) (jdbcLayerInterface.query _).expects(*,*).returning(Right(getEmptyResultSet)) - (jdbcLayerInterface.executeUpdate _).expects("COPY \"dummy\" (col1,col3,col2) FROM 'hdfs://example-hdfs:8020/tmp/test/*.parquet' ON ANY NODE parquet REJECTED DATA AS TABLE \"dummy_id_COMMITS\" NO COMMIT", *).returning(Right(1)) + (jdbcLayerInterface.executeUpdate _).expects("COPY \"dummy\" (col1,col3,col2) FROM 'hdfs://example-hdfs:8020/tmp/test/*.parquet' ON ANY NODE parquet REJECTED DATA AS TABLE \"dummy_id_COMMITS\"", *).returning(Right(1)) (jdbcLayerInterface.query _).expects(*, *).returning(Right(getCountTableResultSet())) (jdbcLayerInterface.execute _).expects(*,*).returning(Right(())) (jdbcLayerInterface.close _).expects().returning(Right(())) @@ -469,7 +469,7 @@ class VerticaDistributedFilesystemWritePipeTest extends AnyFlatSpec with BeforeA (jdbcLayerInterface.configureSession _).expects(*).returning(Right(())) (jdbcLayerInterface.executeUpdate _).expects(*,*).returning(Right(1)) (jdbcLayerInterface.query _).expects(*,*).returning(Right(getEmptyResultSet)) - (jdbcLayerInterface.executeUpdate _).expects("COPY \"dummy\" (\"col1\",\"col5\") FROM 'hdfs://example-hdfs:8020/tmp/test/*.parquet' ON ANY NODE parquet REJECTED DATA AS TABLE \"dummy_id_COMMITS\" NO COMMIT",*).returning(Right(1)) + (jdbcLayerInterface.executeUpdate _).expects("COPY \"dummy\" (\"col1\",\"col5\") FROM 'hdfs://example-hdfs:8020/tmp/test/*.parquet' ON ANY NODE parquet REJECTED DATA AS TABLE \"dummy_id_COMMITS\"",*).returning(Right(1)) (jdbcLayerInterface.query _).expects(*,*).returning(Right(getCountTableResultSet())) (jdbcLayerInterface.execute _).expects(*,*).returning(Right(())) (jdbcLayerInterface.close _).expects().returning(Right(())) @@ -738,7 +738,7 @@ class VerticaDistributedFilesystemWritePipeTest extends AnyFlatSpec with BeforeA val jdbcLayerInterface = mock[JdbcLayerInterface] (jdbcLayerInterface.configureSession _).expects(*).returning(Right(())) (jdbcLayerInterface.executeUpdate _).expects(*,*).returning(Right(1)) - (jdbcLayerInterface.executeUpdate _).expects("COPY \"dummy_id\" FROM 'hdfs://example-hdfs:8020/tmp/test/*.parquet' ON ANY NODE parquet REJECTED DATA AS TABLE \"dummy_id_COMMITS\" NO COMMIT",*).returning(Right(1)) + (jdbcLayerInterface.executeUpdate _).expects("COPY \"dummy_id\" FROM 'hdfs://example-hdfs:8020/tmp/test/*.parquet' ON ANY NODE parquet REJECTED DATA AS TABLE \"dummy_id_COMMITS\"",*).returning(Right(1)) (jdbcLayerInterface.query _).expects(*,*).returning(Right(getEmptyResultSet)) (jdbcLayerInterface.query _).expects(*,*).returning(Right(getCountTableResultSet())) (jdbcLayerInterface.query _).expects(*,*).returning(Right(getEmptyResultSet)) @@ -802,7 +802,7 @@ class VerticaDistributedFilesystemWritePipeTest extends AnyFlatSpec with BeforeA val fsConfig: FileStoreConfig = FileStoreConfig("hdfs://example-hdfs:8020/tmp/", "test", true, AWSOptions(None, None, None, None, None, None)) val config = createWriteConfig().copy(fileStoreConfig = fsConfig) - val expected = "COPY \"dummy\" FROM 'hdfs://example-hdfs:8020/tmp/test/*.parquet' ON ANY NODE parquet REJECTED DATA AS TABLE \"dummy_id_COMMITS\" NO COMMIT" + val expected = "COPY \"dummy\" FROM 'hdfs://example-hdfs:8020/tmp/test/*.parquet' ON ANY NODE parquet REJECTED DATA AS TABLE \"dummy_id_COMMITS\"" val jdbcLayerInterface = mock[JdbcLayerInterface] (jdbcLayerInterface.configureSession _).expects(*).returning(Right(())) From 441887aea222030a59251529dd037f108aa28671 Mon Sep 17 00:00:00 2001 From: Jeremy Parr-Pearson <94406158+jeremyp-bq@users.noreply.github.com> Date: Mon, 13 Dec 2021 13:35:31 -0800 Subject: [PATCH 2/3] Revert removal of NO COMMIT (#275) --- .../core/VerticaDistributedFilesystemWritePipe.scala | 4 ++-- .../VerticaDistributedFilesystemWritePipeTest.scala | 12 ++++++------ 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/connector/src/main/scala/com/vertica/spark/datasource/core/VerticaDistributedFilesystemWritePipe.scala b/connector/src/main/scala/com/vertica/spark/datasource/core/VerticaDistributedFilesystemWritePipe.scala index c79b1c915..93334d316 100644 --- a/connector/src/main/scala/com/vertica/spark/datasource/core/VerticaDistributedFilesystemWritePipe.scala +++ b/connector/src/main/scala/com/vertica/spark/datasource/core/VerticaDistributedFilesystemWritePipe.scala @@ -194,10 +194,10 @@ class VerticaDistributedFilesystemWritePipe(val config: DistributedFilesystemWri def buildCopyStatement(targetTable: String, columnList: String, url: String, rejectsTableName: String, fileFormat: String): String = { if (config.mergeKey.isDefined) { - s"COPY $targetTable FROM '$url' ON ANY NODE $fileFormat REJECTED DATA AS TABLE $rejectsTableName" + s"COPY $targetTable FROM '$url' ON ANY NODE $fileFormat REJECTED DATA AS TABLE $rejectsTableName NO COMMIT" } else { - s"COPY $targetTable $columnList FROM '$url' ON ANY NODE $fileFormat REJECTED DATA AS TABLE $rejectsTableName" + s"COPY $targetTable $columnList FROM '$url' ON ANY NODE $fileFormat REJECTED DATA AS TABLE $rejectsTableName NO COMMIT" } } diff --git a/connector/src/test/scala/com/vertica/spark/datasource/core/VerticaDistributedFilesystemWritePipeTest.scala b/connector/src/test/scala/com/vertica/spark/datasource/core/VerticaDistributedFilesystemWritePipeTest.scala index 5f7046281..c02232a5e 100644 --- a/connector/src/test/scala/com/vertica/spark/datasource/core/VerticaDistributedFilesystemWritePipeTest.scala +++ b/connector/src/test/scala/com/vertica/spark/datasource/core/VerticaDistributedFilesystemWritePipeTest.scala @@ -317,7 +317,7 @@ class VerticaDistributedFilesystemWritePipeTest extends AnyFlatSpec with BeforeA val uniqueId = "unique-id" - val expected = "COPY \"dummy\" FROM 'hdfs://example-hdfs:8020/tmp/test/*.parquet' ON ANY NODE parquet REJECTED DATA AS TABLE \"dummy_id_COMMITS\"" + val expected = "COPY \"dummy\" FROM 'hdfs://example-hdfs:8020/tmp/test/*.parquet' ON ANY NODE parquet REJECTED DATA AS TABLE \"dummy_id_COMMITS\" NO COMMIT" val jdbcLayerInterface = mock[JdbcLayerInterface] (jdbcLayerInterface.configureSession _).expects(*).returning(Right(())) @@ -347,7 +347,7 @@ class VerticaDistributedFilesystemWritePipeTest extends AnyFlatSpec with BeforeA it should "call vertica copy upon commit with a custom copy list" in { val config = createWriteConfig().copy(copyColumnList = ValidColumnList("col1 INTEGER, col2 FLOAT").getOrElse(None)) - val expected = "COPY \"dummy\" (col1 INTEGER,col2 FLOAT) FROM 'hdfs://example-hdfs:8020/tmp/test/*.parquet' ON ANY NODE parquet REJECTED DATA AS TABLE \"dummy_id_COMMITS\"" + val expected = "COPY \"dummy\" (col1 INTEGER,col2 FLOAT) FROM 'hdfs://example-hdfs:8020/tmp/test/*.parquet' ON ANY NODE parquet REJECTED DATA AS TABLE \"dummy_id_COMMITS\" NO COMMIT" val jdbcLayerInterface = mock[JdbcLayerInterface] (jdbcLayerInterface.configureSession _).expects(*).returning(Right(())) @@ -442,7 +442,7 @@ class VerticaDistributedFilesystemWritePipeTest extends AnyFlatSpec with BeforeA (jdbcLayerInterface.configureSession _).expects(*).returning(Right(())) (jdbcLayerInterface.executeUpdate _).expects(*,*).returning(Right(1)) (jdbcLayerInterface.query _).expects(*,*).returning(Right(getEmptyResultSet)) - (jdbcLayerInterface.executeUpdate _).expects("COPY \"dummy\" (col1,col3,col2) FROM 'hdfs://example-hdfs:8020/tmp/test/*.parquet' ON ANY NODE parquet REJECTED DATA AS TABLE \"dummy_id_COMMITS\"", *).returning(Right(1)) + (jdbcLayerInterface.executeUpdate _).expects("COPY \"dummy\" (col1,col3,col2) FROM 'hdfs://example-hdfs:8020/tmp/test/*.parquet' ON ANY NODE parquet REJECTED DATA AS TABLE \"dummy_id_COMMITS\" NO COMMIT", *).returning(Right(1)) (jdbcLayerInterface.query _).expects(*, *).returning(Right(getCountTableResultSet())) (jdbcLayerInterface.execute _).expects(*,*).returning(Right(())) (jdbcLayerInterface.close _).expects().returning(Right(())) @@ -469,7 +469,7 @@ class VerticaDistributedFilesystemWritePipeTest extends AnyFlatSpec with BeforeA (jdbcLayerInterface.configureSession _).expects(*).returning(Right(())) (jdbcLayerInterface.executeUpdate _).expects(*,*).returning(Right(1)) (jdbcLayerInterface.query _).expects(*,*).returning(Right(getEmptyResultSet)) - (jdbcLayerInterface.executeUpdate _).expects("COPY \"dummy\" (\"col1\",\"col5\") FROM 'hdfs://example-hdfs:8020/tmp/test/*.parquet' ON ANY NODE parquet REJECTED DATA AS TABLE \"dummy_id_COMMITS\"",*).returning(Right(1)) + (jdbcLayerInterface.executeUpdate _).expects("COPY \"dummy\" (\"col1\",\"col5\") FROM 'hdfs://example-hdfs:8020/tmp/test/*.parquet' ON ANY NODE parquet REJECTED DATA AS TABLE \"dummy_id_COMMITS\" NO COMMIT",*).returning(Right(1)) (jdbcLayerInterface.query _).expects(*,*).returning(Right(getCountTableResultSet())) (jdbcLayerInterface.execute _).expects(*,*).returning(Right(())) (jdbcLayerInterface.close _).expects().returning(Right(())) @@ -738,7 +738,7 @@ class VerticaDistributedFilesystemWritePipeTest extends AnyFlatSpec with BeforeA val jdbcLayerInterface = mock[JdbcLayerInterface] (jdbcLayerInterface.configureSession _).expects(*).returning(Right(())) (jdbcLayerInterface.executeUpdate _).expects(*,*).returning(Right(1)) - (jdbcLayerInterface.executeUpdate _).expects("COPY \"dummy_id\" FROM 'hdfs://example-hdfs:8020/tmp/test/*.parquet' ON ANY NODE parquet REJECTED DATA AS TABLE \"dummy_id_COMMITS\"",*).returning(Right(1)) + (jdbcLayerInterface.executeUpdate _).expects("COPY \"dummy_id\" FROM 'hdfs://example-hdfs:8020/tmp/test/*.parquet' ON ANY NODE parquet REJECTED DATA AS TABLE \"dummy_id_COMMITS\" NO COMMIT",*).returning(Right(1)) (jdbcLayerInterface.query _).expects(*,*).returning(Right(getEmptyResultSet)) (jdbcLayerInterface.query _).expects(*,*).returning(Right(getCountTableResultSet())) (jdbcLayerInterface.query _).expects(*,*).returning(Right(getEmptyResultSet)) @@ -802,7 +802,7 @@ class VerticaDistributedFilesystemWritePipeTest extends AnyFlatSpec with BeforeA val fsConfig: FileStoreConfig = FileStoreConfig("hdfs://example-hdfs:8020/tmp/", "test", true, AWSOptions(None, None, None, None, None, None)) val config = createWriteConfig().copy(fileStoreConfig = fsConfig) - val expected = "COPY \"dummy\" FROM 'hdfs://example-hdfs:8020/tmp/test/*.parquet' ON ANY NODE parquet REJECTED DATA AS TABLE \"dummy_id_COMMITS\"" + val expected = "COPY \"dummy\" FROM 'hdfs://example-hdfs:8020/tmp/test/*.parquet' ON ANY NODE parquet REJECTED DATA AS TABLE \"dummy_id_COMMITS\" NO COMMIT" val jdbcLayerInterface = mock[JdbcLayerInterface] (jdbcLayerInterface.configureSession _).expects(*).returning(Right(())) From 2e4d01a9ec54ab933b41d7baa32244788c9845e9 Mon Sep 17 00:00:00 2001 From: Jeremy Parr-Pearson <94406158+jeremyp-bq@users.noreply.github.com> Date: Mon, 13 Dec 2021 15:23:05 -0800 Subject: [PATCH 3/3] Copy rejected data temp table if there is at least one row (#275) --- README.md | 2 +- ...erticaDistributedFilesystemWritePipe.scala | 30 ++++++---- ...caDistributedFilesystemWritePipeTest.scala | 60 ++++--------------- 3 files changed, 29 insertions(+), 63 deletions(-) diff --git a/README.md b/README.md index 386f462cc..d1b187c54 100644 --- a/README.md +++ b/README.md @@ -151,7 +151,7 @@ Note: If you are using the S3 properties, the connector options has priority ove When data is written to Vertica there are some metadata tables that may be populated with information about the transaction. These tables can help troubleshoot issues that occur during write operations. -Both tables are added under the target schema (`dbschema` option). The status table is updated for every write operation, and is named `S2V_JOB_STATUS_USER_` (e.g. `S2V_JOB_STATUS_USER_DBADMIN`, where the `user` option was `dbadmin`). A new rejected data table is created each time there is at least one rejected row in a transaction, and is named `
__COMMITS` (e.g. `dftest_f1804318_57cf_4d9d_99d6_044d06db5e22_COMMITS`, where the `table` option was `dftest` and the generated session id was `f1804318_57cf_4d9d_99d6_044d06db5e22`). The `session_id` part of the rejected data table name is the same UUID that is used when writing data to HDFS and also when adding entries to job status table, so a specific operation can be traced to see if there were any issues. +Both tables are added under the target schema (`dbschema` option). The status table is updated for every write operation, and is named `S2V_JOB_STATUS_USER_` (e.g. `S2V_JOB_STATUS_USER_DBADMIN`, where the `user` option was `dbadmin`). A new rejected data table is created each time there is at least one rejected row in a transaction, and is named `
__REJECTS` (e.g. `dftest_f1804318_57cf_4d9d_99d6_044d06db5e22_REJECTS`, where the `table` option was `dftest` and the generated session id was `f1804318_57cf_4d9d_99d6_044d06db5e22`). The `session_id` part of the rejected data table name is the same UUID that is used when writing data to HDFS and also when adding entries to job status table, so a specific operation can be traced to see if there were any issues. ## Examples diff --git a/connector/src/main/scala/com/vertica/spark/datasource/core/VerticaDistributedFilesystemWritePipe.scala b/connector/src/main/scala/com/vertica/spark/datasource/core/VerticaDistributedFilesystemWritePipe.scala index 91f6b5490..53bf407cd 100644 --- a/connector/src/main/scala/com/vertica/spark/datasource/core/VerticaDistributedFilesystemWritePipe.scala +++ b/connector/src/main/scala/com/vertica/spark/datasource/core/VerticaDistributedFilesystemWritePipe.scala @@ -192,12 +192,12 @@ class VerticaDistributedFilesystemWritePipe(val config: DistributedFilesystemWri } } - def buildCopyStatement(targetTable: String, columnList: String, url: String, rejectsTableName: String, fileFormat: String): String = { + def buildCopyStatement(targetTable: String, columnList: String, url: String, tempRejectsTableName: String, fileFormat: String): String = { if (config.mergeKey.isDefined) { - s"COPY $targetTable FROM '$url' ON ANY NODE $fileFormat REJECTED DATA AS TABLE $rejectsTableName NO COMMIT" + s"COPY $targetTable FROM '$url' ON ANY NODE $fileFormat REJECTED DATA AS TABLE $tempRejectsTableName NO COMMIT" } else { - s"COPY $targetTable $columnList FROM '$url' ON ANY NODE $fileFormat REJECTED DATA AS TABLE $rejectsTableName NO COMMIT" + s"COPY $targetTable $columnList FROM '$url' ON ANY NODE $fileFormat REJECTED DATA AS TABLE $tempRejectsTableName NO COMMIT" } } @@ -316,9 +316,9 @@ class VerticaDistributedFilesystemWritePipe(val config: DistributedFilesystemWri private case class FaultToleranceTestResult(success: Boolean, failedRowsPercent: Double) - private def testFaultTolerance(rowsCopied: Int, rejectsTable: String) : ConnectorResult[FaultToleranceTestResult] = { + private def testFaultTolerance(rowsCopied: Int, tempRejectsTable: String, rejectsTable: String) : ConnectorResult[FaultToleranceTestResult] = { // verify rejects to see if this falls within user tolerance. - val rejectsQuery = "SELECT COUNT(*) as count FROM " + rejectsTable + val rejectsQuery = "SELECT COUNT(*) as count FROM " + tempRejectsTable logger.info(s"Checking number of rejected rows via statement: " + rejectsQuery) for { @@ -362,10 +362,10 @@ class VerticaDistributedFilesystemWritePipe(val config: DistributedFilesystemWri } }) - _ <- if (rejectedCount == 0) { - val dropRejectsTableStatement = "DROP TABLE IF EXISTS " + rejectsTable + " CASCADE" - logger.info(s"Dropping Vertica rejects table now: " + dropRejectsTableStatement) - jdbcLayer.execute(dropRejectsTableStatement) + _ <- if (rejectedCount > 0) { + val copyRejectsTableStatement = "CREATE TABLE " + rejectsTable + " AS SELECT * FROM " + tempRejectsTable + logger.info(s"Copying Vertica rejects table now: " + copyRejectsTableStatement) + jdbcLayer.execute(copyRejectsTableStatement) } else { Right(()) } @@ -416,16 +416,22 @@ class VerticaDistributedFilesystemWritePipe(val config: DistributedFilesystemWri tempTableExists <- tableUtils.tempTableExists(tempTableName) _ <- if (config.mergeKey.isDefined && !tempTableExists) Left(CreateTableError(None)) else Right(()) + tempRejectsTableName = "\"" + + EscapeUtils.sqlEscape(tableName.substring(0,Math.min(tableNameMaxLength,tableName.length))) + + "_" + + sessionId + + "_REJECTS_TMP" + + "\"" rejectsTableName = "\"" + EscapeUtils.sqlEscape(tableName.substring(0,Math.min(tableNameMaxLength,tableName.length))) + "_" + sessionId + - "_COMMITS" + + "_REJECTS" + "\"" fullTableName <- if(config.mergeKey.isDefined) Right(tempTableName.getFullTableName) else Right(config.tablename.getFullTableName) - copyStatement = buildCopyStatement(fullTableName, columnList, url, rejectsTableName, "parquet") + copyStatement = buildCopyStatement(fullTableName, columnList, url, tempRejectsTableName, "parquet") _ = logger.info("The copy statement is: \n" + copyStatement) @@ -436,7 +442,7 @@ class VerticaDistributedFilesystemWritePipe(val config: DistributedFilesystemWri performCopy(copyStatement, config.tablename).left.map(_.context("commit: Failed to copy rows into target table")) } - faultToleranceResults <- testFaultTolerance(rowsCopied, rejectsTableName) + faultToleranceResults <- testFaultTolerance(rowsCopied, tempRejectsTableName, rejectsTableName) .left.map(err => CommitError(err).context("commit: JDBC Error when trying to determine fault tolerance")) _ <- tableUtils.updateJobStatusTable(config.tablename, config.jdbcConfig.auth.user, faultToleranceResults.failedRowsPercent, config.sessionId, faultToleranceResults.success) diff --git a/connector/src/test/scala/com/vertica/spark/datasource/core/VerticaDistributedFilesystemWritePipeTest.scala b/connector/src/test/scala/com/vertica/spark/datasource/core/VerticaDistributedFilesystemWritePipeTest.scala index bb78e6d0e..557707555 100644 --- a/connector/src/test/scala/com/vertica/spark/datasource/core/VerticaDistributedFilesystemWritePipeTest.scala +++ b/connector/src/test/scala/com/vertica/spark/datasource/core/VerticaDistributedFilesystemWritePipeTest.scala @@ -317,16 +317,15 @@ class VerticaDistributedFilesystemWritePipeTest extends AnyFlatSpec with BeforeA val uniqueId = "unique-id" - val expected = "COPY \"dummy\" FROM 'hdfs://example-hdfs:8020/tmp/test/*.parquet' ON ANY NODE parquet REJECTED DATA AS TABLE \"dummy_id_COMMITS\" NO COMMIT" + val expected = "COPY \"dummy\" FROM 'hdfs://example-hdfs:8020/tmp/test/*.parquet' ON ANY NODE parquet REJECTED DATA AS TABLE \"dummy_id_REJECTS_TMP\" NO COMMIT" val jdbcLayerInterface = mock[JdbcLayerInterface] (jdbcLayerInterface.configureSession _).expects(*).returning(Right(())) (jdbcLayerInterface.executeUpdate _).expects(*, *).returning(Right(1)) (jdbcLayerInterface.query _).expects("EXPLAIN " + expected, *).returning(Right(getEmptyResultSet)) (jdbcLayerInterface.executeUpdate _).expects(expected, *).returning(Right(1)) - (jdbcLayerInterface.query _).expects("SELECT COUNT(*) as count FROM \"dummy_id_COMMITS\"", *).returning(Right(getCountTableResultSet())) + (jdbcLayerInterface.query _).expects("SELECT COUNT(*) as count FROM \"dummy_id_REJECTS_TMP\"", *).returning(Right(getCountTableResultSet())) (jdbcLayerInterface.close _).expects().returning(Right(())) - (jdbcLayerInterface.execute _).expects(*,*).returning(Right(())) (jdbcLayerInterface.commit _).expects().returning(Right(())) val schemaToolsInterface = mock[SchemaToolsInterface] @@ -347,7 +346,7 @@ class VerticaDistributedFilesystemWritePipeTest extends AnyFlatSpec with BeforeA it should "call vertica copy upon commit with a custom copy list" in { val config = createWriteConfig().copy(copyColumnList = ValidColumnList("col1 INTEGER, col2 FLOAT").getOrElse(None)) - val expected = "COPY \"dummy\" (col1 INTEGER,col2 FLOAT) FROM 'hdfs://example-hdfs:8020/tmp/test/*.parquet' ON ANY NODE parquet REJECTED DATA AS TABLE \"dummy_id_COMMITS\" NO COMMIT" + val expected = "COPY \"dummy\" (col1 INTEGER,col2 FLOAT) FROM 'hdfs://example-hdfs:8020/tmp/test/*.parquet' ON ANY NODE parquet REJECTED DATA AS TABLE \"dummy_id_REJECTS_TMP\" NO COMMIT" val jdbcLayerInterface = mock[JdbcLayerInterface] (jdbcLayerInterface.configureSession _).expects(*).returning(Right(())) @@ -356,7 +355,6 @@ class VerticaDistributedFilesystemWritePipeTest extends AnyFlatSpec with BeforeA (jdbcLayerInterface.executeUpdate _).expects(expected, *).returning(Right(1)) (jdbcLayerInterface.query _).expects(*, *).returning(Right(getCountTableResultSet())) (jdbcLayerInterface.close _).expects().returning(Right(())) - (jdbcLayerInterface.execute _).expects(*, *).returning(Right(())) (jdbcLayerInterface.commit _).expects().returning(Right(())) val schemaToolsInterface = mock[SchemaToolsInterface] @@ -412,7 +410,6 @@ class VerticaDistributedFilesystemWritePipeTest extends AnyFlatSpec with BeforeA (jdbcLayerInterface.query _).expects(*,*).returning(Right(getEmptyResultSet)) (jdbcLayerInterface.executeUpdate _).expects(*,*).returning(Right(1)) (jdbcLayerInterface.query _).expects(*,*).returning(Right(getCountTableResultSet())) - (jdbcLayerInterface.execute _).expects(*,*).returning(Right(())) (jdbcLayerInterface.close _).expects().returning(Right(())) (jdbcLayerInterface.commit _).expects().returning(Right(())) @@ -441,9 +438,8 @@ class VerticaDistributedFilesystemWritePipeTest extends AnyFlatSpec with BeforeA (jdbcLayerInterface.configureSession _).expects(*).returning(Right(())) (jdbcLayerInterface.executeUpdate _).expects(*,*).returning(Right(1)) (jdbcLayerInterface.query _).expects(*,*).returning(Right(getEmptyResultSet)) - (jdbcLayerInterface.executeUpdate _).expects("COPY \"dummy\" (col1,col3,col2) FROM 'hdfs://example-hdfs:8020/tmp/test/*.parquet' ON ANY NODE parquet REJECTED DATA AS TABLE \"dummy_id_COMMITS\" NO COMMIT", *).returning(Right(1)) + (jdbcLayerInterface.executeUpdate _).expects("COPY \"dummy\" (col1,col3,col2) FROM 'hdfs://example-hdfs:8020/tmp/test/*.parquet' ON ANY NODE parquet REJECTED DATA AS TABLE \"dummy_id_REJECTS_TMP\" NO COMMIT", *).returning(Right(1)) (jdbcLayerInterface.query _).expects(*, *).returning(Right(getCountTableResultSet())) - (jdbcLayerInterface.execute _).expects(*,*).returning(Right(())) (jdbcLayerInterface.close _).expects().returning(Right(())) (jdbcLayerInterface.commit _).expects().returning(Right(())) @@ -468,9 +464,8 @@ class VerticaDistributedFilesystemWritePipeTest extends AnyFlatSpec with BeforeA (jdbcLayerInterface.configureSession _).expects(*).returning(Right(())) (jdbcLayerInterface.executeUpdate _).expects(*,*).returning(Right(1)) (jdbcLayerInterface.query _).expects(*,*).returning(Right(getEmptyResultSet)) - (jdbcLayerInterface.executeUpdate _).expects("COPY \"dummy\" (\"col1\",\"col5\") FROM 'hdfs://example-hdfs:8020/tmp/test/*.parquet' ON ANY NODE parquet REJECTED DATA AS TABLE \"dummy_id_COMMITS\" NO COMMIT",*).returning(Right(1)) + (jdbcLayerInterface.executeUpdate _).expects("COPY \"dummy\" (\"col1\",\"col5\") FROM 'hdfs://example-hdfs:8020/tmp/test/*.parquet' ON ANY NODE parquet REJECTED DATA AS TABLE \"dummy_id_REJECTS_TMP\" NO COMMIT",*).returning(Right(1)) (jdbcLayerInterface.query _).expects(*,*).returning(Right(getCountTableResultSet())) - (jdbcLayerInterface.execute _).expects(*,*).returning(Right(())) (jdbcLayerInterface.close _).expects().returning(Right(())) (jdbcLayerInterface.commit _).expects().returning(Right(())) @@ -499,6 +494,7 @@ class VerticaDistributedFilesystemWritePipeTest extends AnyFlatSpec with BeforeA (jdbcLayerInterface.executeUpdate _).expects(*,*).returning(Right(1)) (jdbcLayerInterface.query _).expects(*,*).returning(Right(getEmptyResultSet)) (jdbcLayerInterface.executeUpdate _).expects(*,*).returning(Right(100)) + (jdbcLayerInterface.execute _).expects(*, *).returning(Right(())) (jdbcLayerInterface.query _).expects(*,*).returning(Right(getCountTableResultSet(6))) (jdbcLayerInterface.close _).expects().returning(Right(())) (jdbcLayerInterface.rollback _).expects().returning(Right(())) @@ -532,6 +528,7 @@ class VerticaDistributedFilesystemWritePipeTest extends AnyFlatSpec with BeforeA (jdbcLayerInterface.executeUpdate _).expects(*,*).returning(Right(1)) (jdbcLayerInterface.query _).expects(*,*).returning(Right(getEmptyResultSet)) (jdbcLayerInterface.executeUpdate _).expects(*,*).returning(Right(100)) + (jdbcLayerInterface.execute _).expects(*, *).returning(Right(())) (jdbcLayerInterface.query _).expects(*,*).returning(Right(getCountTableResultSet(4))) (jdbcLayerInterface.close _).expects().returning(Right(())) (jdbcLayerInterface.commit _).expects().returning(Right(())) @@ -551,39 +548,6 @@ class VerticaDistributedFilesystemWritePipeTest extends AnyFlatSpec with BeforeA checkResult(pipe.commit()) } - it should "Drop rejects table if there are no rejects" in { - val schema = new StructType(Array(StructField("col1", IntegerType))) - val config = createWriteConfig().copy( - failedRowPercentTolerance = 0.05f - ) - - val jdbcLayerInterface = mock[JdbcLayerInterface] - (jdbcLayerInterface.configureSession _).expects(*).returning(Right(())) - (jdbcLayerInterface.executeUpdate _).expects(*,*).returning(Right(1)) - (jdbcLayerInterface.query _).expects(*,*).returning(Right(getEmptyResultSet)) - (jdbcLayerInterface.executeUpdate _).expects(*,*).returning(Right(100)) - (jdbcLayerInterface.query _).expects(*,*).returning(Right(getCountTableResultSet())) - (jdbcLayerInterface.close _).expects().returning(Right(())) - (jdbcLayerInterface.commit _).expects().returning(Right(())) - - // Drop statement - (jdbcLayerInterface.execute _).expects("DROP TABLE IF EXISTS \"dummy_id_COMMITS\" CASCADE", *).returning(Right(())) - - val schemaToolsInterface = mock[SchemaToolsInterface] - (schemaToolsInterface.getCopyColumnList _).expects(jdbcLayerInterface, tablename, schema).returning(Right("")) - - val fileStoreLayerInterface = mock[FileStoreLayerInterface] - (fileStoreLayerInterface.removeDir _).expects(*).returning(Right()) - - val tableUtils = mock[TableUtilsInterface] - (tableUtils.updateJobStatusTable _).expects(*, *, *, *, *).returning(Right(())) - (tableUtils.tempTableExists _).expects(tempTableName).returning(Right(false)) - - val pipe = new VerticaDistributedFilesystemWritePipe(config, fileStoreLayerInterface, jdbcLayerInterface, schemaToolsInterface, tableUtils) - - checkResult(pipe.commit()) - } - it should "create an external table" in { val tname = TableName("testtable", None) val config = createWriteConfig().copy(createExternalTable = Some(NewData), tablename = tname) @@ -708,7 +672,6 @@ class VerticaDistributedFilesystemWritePipeTest extends AnyFlatSpec with BeforeA (jdbcLayerInterface.query _).expects(*,*).returning(Right(getCountTableResultSet())) (jdbcLayerInterface.query _).expects(*,*).returning(Right(getEmptyResultSet)) (jdbcLayerInterface.execute _).expects(*,*).returning(Right(())) - (jdbcLayerInterface.execute _).expects(*,*).returning(Right(())) (jdbcLayerInterface.close _).expects().returning(Right(())) (jdbcLayerInterface.commit _).expects().returning(Right(())) @@ -737,12 +700,11 @@ class VerticaDistributedFilesystemWritePipeTest extends AnyFlatSpec with BeforeA val jdbcLayerInterface = mock[JdbcLayerInterface] (jdbcLayerInterface.configureSession _).expects(*).returning(Right(())) (jdbcLayerInterface.executeUpdate _).expects(*,*).returning(Right(1)) - (jdbcLayerInterface.executeUpdate _).expects("COPY \"dummy_id\" FROM 'hdfs://example-hdfs:8020/tmp/test/*.parquet' ON ANY NODE parquet REJECTED DATA AS TABLE \"dummy_id_COMMITS\" NO COMMIT",*).returning(Right(1)) + (jdbcLayerInterface.executeUpdate _).expects("COPY \"dummy_id\" FROM 'hdfs://example-hdfs:8020/tmp/test/*.parquet' ON ANY NODE parquet REJECTED DATA AS TABLE \"dummy_id_REJECTS_TMP\" NO COMMIT",*).returning(Right(1)) (jdbcLayerInterface.query _).expects(*,*).returning(Right(getEmptyResultSet)) (jdbcLayerInterface.query _).expects(*,*).returning(Right(getCountTableResultSet())) (jdbcLayerInterface.query _).expects(*,*).returning(Right(getEmptyResultSet)) (jdbcLayerInterface.execute _).expects(*,*).returning(Right(())) - (jdbcLayerInterface.execute _).expects(*,*).returning(Right(())) (jdbcLayerInterface.close _).expects().returning(Right(())) (jdbcLayerInterface.commit _).expects().returning(Right(())) @@ -777,7 +739,6 @@ class VerticaDistributedFilesystemWritePipeTest extends AnyFlatSpec with BeforeA (jdbcLayerInterface.query _).expects(*,*).returning(Right(getCountTableResultSet())) (jdbcLayerInterface.query _).expects(*,*).returning(Right(getEmptyResultSet)) (jdbcLayerInterface.execute _).expects(*,*).returning(Right(())) - (jdbcLayerInterface.execute _).expects(*,*).returning(Right(())) (jdbcLayerInterface.close _).expects().returning(Right(())) (jdbcLayerInterface.commit _).expects().returning(Right(())) @@ -801,16 +762,15 @@ class VerticaDistributedFilesystemWritePipeTest extends AnyFlatSpec with BeforeA val fsConfig: FileStoreConfig = FileStoreConfig("hdfs://example-hdfs:8020/tmp/", "test", true, AWSOptions(None, None, None, None, None, None)) val config = createWriteConfig().copy(fileStoreConfig = fsConfig) - val expected = "COPY \"dummy\" FROM 'hdfs://example-hdfs:8020/tmp/test/*.parquet' ON ANY NODE parquet REJECTED DATA AS TABLE \"dummy_id_COMMITS\" NO COMMIT" + val expected = "COPY \"dummy\" FROM 'hdfs://example-hdfs:8020/tmp/test/*.parquet' ON ANY NODE parquet REJECTED DATA AS TABLE \"dummy_id_REJECTS_TMP\" NO COMMIT" val jdbcLayerInterface = mock[JdbcLayerInterface] (jdbcLayerInterface.configureSession _).expects(*).returning(Right(())) (jdbcLayerInterface.executeUpdate _).expects(*, *).returning(Right(1)) (jdbcLayerInterface.query _).expects("EXPLAIN " + expected, *).returning(Right(getEmptyResultSet)) (jdbcLayerInterface.executeUpdate _).expects(expected, *).returning(Right(1)) - (jdbcLayerInterface.query _).expects("SELECT COUNT(*) as count FROM \"dummy_id_COMMITS\"", *).returning(Right(getCountTableResultSet())) + (jdbcLayerInterface.query _).expects("SELECT COUNT(*) as count FROM \"dummy_id_REJECTS_TMP\"", *).returning(Right(getCountTableResultSet())) (jdbcLayerInterface.close _).expects().returning(Right(())) - (jdbcLayerInterface.execute _).expects(*,*).returning(Right(())) (jdbcLayerInterface.commit _).expects().returning(Right(())) val schemaToolsInterface = mock[SchemaToolsInterface]