Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Persist rejected data table #294

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,12 @@ Below is a detailed list of connector options that are used in the options map:

Note: If you are using the S3 properties, the connector options has priority over the Spark configuration, which has priority over the environment variables.

### Status and Rejected Data Tables

When data is written to Vertica there are some metadata tables that may be populated with information about the transaction. These tables can help troubleshoot issues that occur during write operations.

Both tables are added under the target schema (`dbschema` option). The status table is updated for every write operation, and is named `S2V_JOB_STATUS_USER_<user>` (e.g. `S2V_JOB_STATUS_USER_DBADMIN`, where the `user` option was `dbadmin`). A new rejected data table is created each time there is at least one rejected row in a transaction, and is named `<table>_<session_id>_REJECTS` (e.g. `dftest_f1804318_57cf_4d9d_99d6_044d06db5e22_REJECTS`, where the `table` option was `dftest` and the generated session id was `f1804318_57cf_4d9d_99d6_044d06db5e22`). The `session_id` part of the rejected data table name is the same UUID that is used when writing data to HDFS and also when adding entries to job status table, so a specific operation can be traced to see if there were any issues.

## Examples

If you would like to try out the connector, we have several example applications you can run [in the examples folder](https://github.com/vertica/spark-connector/tree/main/examples).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,12 +192,12 @@ class VerticaDistributedFilesystemWritePipe(val config: DistributedFilesystemWri
}
}

def buildCopyStatement(targetTable: String, columnList: String, url: String, rejectsTableName: String, fileFormat: String): String = {
def buildCopyStatement(targetTable: String, columnList: String, url: String, tempRejectsTableName: String, fileFormat: String): String = {
if (config.mergeKey.isDefined) {
s"COPY $targetTable FROM '$url' ON ANY NODE $fileFormat REJECTED DATA AS TABLE $rejectsTableName NO COMMIT"
s"COPY $targetTable FROM '$url' ON ANY NODE $fileFormat REJECTED DATA AS TABLE $tempRejectsTableName NO COMMIT"
}
else {
s"COPY $targetTable $columnList FROM '$url' ON ANY NODE $fileFormat REJECTED DATA AS TABLE $rejectsTableName NO COMMIT"
s"COPY $targetTable $columnList FROM '$url' ON ANY NODE $fileFormat REJECTED DATA AS TABLE $tempRejectsTableName NO COMMIT"
}
}

Expand Down Expand Up @@ -316,9 +316,9 @@ class VerticaDistributedFilesystemWritePipe(val config: DistributedFilesystemWri

private case class FaultToleranceTestResult(success: Boolean, failedRowsPercent: Double)

private def testFaultTolerance(rowsCopied: Int, rejectsTable: String) : ConnectorResult[FaultToleranceTestResult] = {
private def testFaultTolerance(rowsCopied: Int, tempRejectsTable: String, rejectsTable: String) : ConnectorResult[FaultToleranceTestResult] = {
// verify rejects to see if this falls within user tolerance.
val rejectsQuery = "SELECT COUNT(*) as count FROM " + rejectsTable
val rejectsQuery = "SELECT COUNT(*) as count FROM " + tempRejectsTable
logger.info(s"Checking number of rejected rows via statement: " + rejectsQuery)

for {
Expand Down Expand Up @@ -362,10 +362,10 @@ class VerticaDistributedFilesystemWritePipe(val config: DistributedFilesystemWri
}
})

_ <- if (rejectedCount == 0) {
val dropRejectsTableStatement = "DROP TABLE IF EXISTS " + rejectsTable + " CASCADE"
logger.info(s"Dropping Vertica rejects table now: " + dropRejectsTableStatement)
jdbcLayer.execute(dropRejectsTableStatement)
_ <- if (rejectedCount > 0) {
val copyRejectsTableStatement = "CREATE TABLE " + rejectsTable + " AS SELECT * FROM " + tempRejectsTable
logger.info(s"Copying Vertica rejects table now: " + copyRejectsTableStatement)
jdbcLayer.execute(copyRejectsTableStatement)
} else {
Right(())
}
Expand Down Expand Up @@ -416,16 +416,22 @@ class VerticaDistributedFilesystemWritePipe(val config: DistributedFilesystemWri
tempTableExists <- tableUtils.tempTableExists(tempTableName)
_ <- if (config.mergeKey.isDefined && !tempTableExists) Left(CreateTableError(None)) else Right(())

tempRejectsTableName = "\"" +
EscapeUtils.sqlEscape(tableName.substring(0,Math.min(tableNameMaxLength,tableName.length))) +
"_" +
sessionId +
"_REJECTS_TMP" +
"\""
rejectsTableName = "\"" +
EscapeUtils.sqlEscape(tableName.substring(0,Math.min(tableNameMaxLength,tableName.length))) +
"_" +
sessionId +
"_COMMITS" +
"_REJECTS" +
"\""

fullTableName <- if(config.mergeKey.isDefined) Right(tempTableName.getFullTableName) else Right(config.tablename.getFullTableName)

copyStatement = buildCopyStatement(fullTableName, columnList, url, rejectsTableName, "parquet")
copyStatement = buildCopyStatement(fullTableName, columnList, url, tempRejectsTableName, "parquet")

_ = logger.info("The copy statement is: \n" + copyStatement)

Expand All @@ -436,7 +442,7 @@ class VerticaDistributedFilesystemWritePipe(val config: DistributedFilesystemWri
performCopy(copyStatement, config.tablename).left.map(_.context("commit: Failed to copy rows into target table"))
}

faultToleranceResults <- testFaultTolerance(rowsCopied, rejectsTableName)
faultToleranceResults <- testFaultTolerance(rowsCopied, tempRejectsTableName, rejectsTableName)
.left.map(err => CommitError(err).context("commit: JDBC Error when trying to determine fault tolerance"))

_ <- tableUtils.updateJobStatusTable(config.tablename, config.jdbcConfig.auth.user, faultToleranceResults.failedRowsPercent, config.sessionId, faultToleranceResults.success)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,16 +317,15 @@ class VerticaDistributedFilesystemWritePipeTest extends AnyFlatSpec with BeforeA

val uniqueId = "unique-id"

val expected = "COPY \"dummy\" FROM 'hdfs://example-hdfs:8020/tmp/test/*.parquet' ON ANY NODE parquet REJECTED DATA AS TABLE \"dummy_id_COMMITS\" NO COMMIT"
val expected = "COPY \"dummy\" FROM 'hdfs://example-hdfs:8020/tmp/test/*.parquet' ON ANY NODE parquet REJECTED DATA AS TABLE \"dummy_id_REJECTS_TMP\" NO COMMIT"

val jdbcLayerInterface = mock[JdbcLayerInterface]
(jdbcLayerInterface.configureSession _).expects(*).returning(Right(()))
(jdbcLayerInterface.executeUpdate _).expects(*, *).returning(Right(1))
(jdbcLayerInterface.query _).expects("EXPLAIN " + expected, *).returning(Right(getEmptyResultSet))
(jdbcLayerInterface.executeUpdate _).expects(expected, *).returning(Right(1))
(jdbcLayerInterface.query _).expects("SELECT COUNT(*) as count FROM \"dummy_id_COMMITS\"", *).returning(Right(getCountTableResultSet()))
(jdbcLayerInterface.query _).expects("SELECT COUNT(*) as count FROM \"dummy_id_REJECTS_TMP\"", *).returning(Right(getCountTableResultSet()))
(jdbcLayerInterface.close _).expects().returning(Right(()))
(jdbcLayerInterface.execute _).expects(*,*).returning(Right(()))
(jdbcLayerInterface.commit _).expects().returning(Right(()))

val schemaToolsInterface = mock[SchemaToolsInterface]
Expand All @@ -347,7 +346,7 @@ class VerticaDistributedFilesystemWritePipeTest extends AnyFlatSpec with BeforeA
it should "call vertica copy upon commit with a custom copy list" in {
val config = createWriteConfig().copy(copyColumnList = ValidColumnList("col1 INTEGER, col2 FLOAT").getOrElse(None))

val expected = "COPY \"dummy\" (col1 INTEGER,col2 FLOAT) FROM 'hdfs://example-hdfs:8020/tmp/test/*.parquet' ON ANY NODE parquet REJECTED DATA AS TABLE \"dummy_id_COMMITS\" NO COMMIT"
val expected = "COPY \"dummy\" (col1 INTEGER,col2 FLOAT) FROM 'hdfs://example-hdfs:8020/tmp/test/*.parquet' ON ANY NODE parquet REJECTED DATA AS TABLE \"dummy_id_REJECTS_TMP\" NO COMMIT"

val jdbcLayerInterface = mock[JdbcLayerInterface]
(jdbcLayerInterface.configureSession _).expects(*).returning(Right(()))
Expand All @@ -356,7 +355,6 @@ class VerticaDistributedFilesystemWritePipeTest extends AnyFlatSpec with BeforeA
(jdbcLayerInterface.executeUpdate _).expects(expected, *).returning(Right(1))
(jdbcLayerInterface.query _).expects(*, *).returning(Right(getCountTableResultSet()))
(jdbcLayerInterface.close _).expects().returning(Right(()))
(jdbcLayerInterface.execute _).expects(*, *).returning(Right(()))
(jdbcLayerInterface.commit _).expects().returning(Right(()))

val schemaToolsInterface = mock[SchemaToolsInterface]
Expand Down Expand Up @@ -412,7 +410,6 @@ class VerticaDistributedFilesystemWritePipeTest extends AnyFlatSpec with BeforeA
(jdbcLayerInterface.query _).expects(*,*).returning(Right(getEmptyResultSet))
(jdbcLayerInterface.executeUpdate _).expects(*,*).returning(Right(1))
(jdbcLayerInterface.query _).expects(*,*).returning(Right(getCountTableResultSet()))
(jdbcLayerInterface.execute _).expects(*,*).returning(Right(()))
(jdbcLayerInterface.close _).expects().returning(Right(()))
(jdbcLayerInterface.commit _).expects().returning(Right(()))

Expand Down Expand Up @@ -441,9 +438,8 @@ class VerticaDistributedFilesystemWritePipeTest extends AnyFlatSpec with BeforeA
(jdbcLayerInterface.configureSession _).expects(*).returning(Right(()))
(jdbcLayerInterface.executeUpdate _).expects(*,*).returning(Right(1))
(jdbcLayerInterface.query _).expects(*,*).returning(Right(getEmptyResultSet))
(jdbcLayerInterface.executeUpdate _).expects("COPY \"dummy\" (col1,col3,col2) FROM 'hdfs://example-hdfs:8020/tmp/test/*.parquet' ON ANY NODE parquet REJECTED DATA AS TABLE \"dummy_id_COMMITS\" NO COMMIT", *).returning(Right(1))
(jdbcLayerInterface.executeUpdate _).expects("COPY \"dummy\" (col1,col3,col2) FROM 'hdfs://example-hdfs:8020/tmp/test/*.parquet' ON ANY NODE parquet REJECTED DATA AS TABLE \"dummy_id_REJECTS_TMP\" NO COMMIT", *).returning(Right(1))
(jdbcLayerInterface.query _).expects(*, *).returning(Right(getCountTableResultSet()))
(jdbcLayerInterface.execute _).expects(*,*).returning(Right(()))
(jdbcLayerInterface.close _).expects().returning(Right(()))
(jdbcLayerInterface.commit _).expects().returning(Right(()))

Expand All @@ -468,9 +464,8 @@ class VerticaDistributedFilesystemWritePipeTest extends AnyFlatSpec with BeforeA
(jdbcLayerInterface.configureSession _).expects(*).returning(Right(()))
(jdbcLayerInterface.executeUpdate _).expects(*,*).returning(Right(1))
(jdbcLayerInterface.query _).expects(*,*).returning(Right(getEmptyResultSet))
(jdbcLayerInterface.executeUpdate _).expects("COPY \"dummy\" (\"col1\",\"col5\") FROM 'hdfs://example-hdfs:8020/tmp/test/*.parquet' ON ANY NODE parquet REJECTED DATA AS TABLE \"dummy_id_COMMITS\" NO COMMIT",*).returning(Right(1))
(jdbcLayerInterface.executeUpdate _).expects("COPY \"dummy\" (\"col1\",\"col5\") FROM 'hdfs://example-hdfs:8020/tmp/test/*.parquet' ON ANY NODE parquet REJECTED DATA AS TABLE \"dummy_id_REJECTS_TMP\" NO COMMIT",*).returning(Right(1))
(jdbcLayerInterface.query _).expects(*,*).returning(Right(getCountTableResultSet()))
(jdbcLayerInterface.execute _).expects(*,*).returning(Right(()))
(jdbcLayerInterface.close _).expects().returning(Right(()))
(jdbcLayerInterface.commit _).expects().returning(Right(()))

Expand Down Expand Up @@ -499,6 +494,7 @@ class VerticaDistributedFilesystemWritePipeTest extends AnyFlatSpec with BeforeA
(jdbcLayerInterface.executeUpdate _).expects(*,*).returning(Right(1))
(jdbcLayerInterface.query _).expects(*,*).returning(Right(getEmptyResultSet))
(jdbcLayerInterface.executeUpdate _).expects(*,*).returning(Right(100))
(jdbcLayerInterface.execute _).expects(*, *).returning(Right(()))
(jdbcLayerInterface.query _).expects(*,*).returning(Right(getCountTableResultSet(6)))
(jdbcLayerInterface.close _).expects().returning(Right(()))
(jdbcLayerInterface.rollback _).expects().returning(Right(()))
Expand Down Expand Up @@ -532,6 +528,7 @@ class VerticaDistributedFilesystemWritePipeTest extends AnyFlatSpec with BeforeA
(jdbcLayerInterface.executeUpdate _).expects(*,*).returning(Right(1))
(jdbcLayerInterface.query _).expects(*,*).returning(Right(getEmptyResultSet))
(jdbcLayerInterface.executeUpdate _).expects(*,*).returning(Right(100))
(jdbcLayerInterface.execute _).expects(*, *).returning(Right(()))
(jdbcLayerInterface.query _).expects(*,*).returning(Right(getCountTableResultSet(4)))
(jdbcLayerInterface.close _).expects().returning(Right(()))
(jdbcLayerInterface.commit _).expects().returning(Right(()))
Expand All @@ -551,39 +548,6 @@ class VerticaDistributedFilesystemWritePipeTest extends AnyFlatSpec with BeforeA
checkResult(pipe.commit())
}

it should "Drop rejects table if there are no rejects" in {
val schema = new StructType(Array(StructField("col1", IntegerType)))
val config = createWriteConfig().copy(
failedRowPercentTolerance = 0.05f
)

val jdbcLayerInterface = mock[JdbcLayerInterface]
(jdbcLayerInterface.configureSession _).expects(*).returning(Right(()))
(jdbcLayerInterface.executeUpdate _).expects(*,*).returning(Right(1))
(jdbcLayerInterface.query _).expects(*,*).returning(Right(getEmptyResultSet))
(jdbcLayerInterface.executeUpdate _).expects(*,*).returning(Right(100))
(jdbcLayerInterface.query _).expects(*,*).returning(Right(getCountTableResultSet()))
(jdbcLayerInterface.close _).expects().returning(Right(()))
(jdbcLayerInterface.commit _).expects().returning(Right(()))

// Drop statement
(jdbcLayerInterface.execute _).expects("DROP TABLE IF EXISTS \"dummy_id_COMMITS\" CASCADE", *).returning(Right(()))

val schemaToolsInterface = mock[SchemaToolsInterface]
(schemaToolsInterface.getCopyColumnList _).expects(jdbcLayerInterface, tablename, schema).returning(Right(""))

val fileStoreLayerInterface = mock[FileStoreLayerInterface]
(fileStoreLayerInterface.removeDir _).expects(*).returning(Right())

val tableUtils = mock[TableUtilsInterface]
(tableUtils.updateJobStatusTable _).expects(*, *, *, *, *).returning(Right(()))
(tableUtils.tempTableExists _).expects(tempTableName).returning(Right(false))

val pipe = new VerticaDistributedFilesystemWritePipe(config, fileStoreLayerInterface, jdbcLayerInterface, schemaToolsInterface, tableUtils)

checkResult(pipe.commit())
}

it should "create an external table" in {
val tname = TableName("testtable", None)
val config = createWriteConfig().copy(createExternalTable = Some(NewData), tablename = tname)
Expand Down Expand Up @@ -708,7 +672,6 @@ class VerticaDistributedFilesystemWritePipeTest extends AnyFlatSpec with BeforeA
(jdbcLayerInterface.query _).expects(*,*).returning(Right(getCountTableResultSet()))
(jdbcLayerInterface.query _).expects(*,*).returning(Right(getEmptyResultSet))
(jdbcLayerInterface.execute _).expects(*,*).returning(Right(()))
(jdbcLayerInterface.execute _).expects(*,*).returning(Right(()))
(jdbcLayerInterface.close _).expects().returning(Right(()))
(jdbcLayerInterface.commit _).expects().returning(Right(()))

Expand Down Expand Up @@ -737,12 +700,11 @@ class VerticaDistributedFilesystemWritePipeTest extends AnyFlatSpec with BeforeA
val jdbcLayerInterface = mock[JdbcLayerInterface]
(jdbcLayerInterface.configureSession _).expects(*).returning(Right(()))
(jdbcLayerInterface.executeUpdate _).expects(*,*).returning(Right(1))
(jdbcLayerInterface.executeUpdate _).expects("COPY \"dummy_id\" FROM 'hdfs://example-hdfs:8020/tmp/test/*.parquet' ON ANY NODE parquet REJECTED DATA AS TABLE \"dummy_id_COMMITS\" NO COMMIT",*).returning(Right(1))
(jdbcLayerInterface.executeUpdate _).expects("COPY \"dummy_id\" FROM 'hdfs://example-hdfs:8020/tmp/test/*.parquet' ON ANY NODE parquet REJECTED DATA AS TABLE \"dummy_id_REJECTS_TMP\" NO COMMIT",*).returning(Right(1))
(jdbcLayerInterface.query _).expects(*,*).returning(Right(getEmptyResultSet))
(jdbcLayerInterface.query _).expects(*,*).returning(Right(getCountTableResultSet()))
(jdbcLayerInterface.query _).expects(*,*).returning(Right(getEmptyResultSet))
(jdbcLayerInterface.execute _).expects(*,*).returning(Right(()))
(jdbcLayerInterface.execute _).expects(*,*).returning(Right(()))
(jdbcLayerInterface.close _).expects().returning(Right(()))
(jdbcLayerInterface.commit _).expects().returning(Right(()))

Expand Down Expand Up @@ -777,7 +739,6 @@ class VerticaDistributedFilesystemWritePipeTest extends AnyFlatSpec with BeforeA
(jdbcLayerInterface.query _).expects(*,*).returning(Right(getCountTableResultSet()))
(jdbcLayerInterface.query _).expects(*,*).returning(Right(getEmptyResultSet))
(jdbcLayerInterface.execute _).expects(*,*).returning(Right(()))
(jdbcLayerInterface.execute _).expects(*,*).returning(Right(()))
(jdbcLayerInterface.close _).expects().returning(Right(()))
(jdbcLayerInterface.commit _).expects().returning(Right(()))

Expand All @@ -801,16 +762,15 @@ class VerticaDistributedFilesystemWritePipeTest extends AnyFlatSpec with BeforeA
val fsConfig: FileStoreConfig = FileStoreConfig("hdfs://example-hdfs:8020/tmp/", "test", true, AWSOptions(None, None, None, None, None, None))
val config = createWriteConfig().copy(fileStoreConfig = fsConfig)

val expected = "COPY \"dummy\" FROM 'hdfs://example-hdfs:8020/tmp/test/*.parquet' ON ANY NODE parquet REJECTED DATA AS TABLE \"dummy_id_COMMITS\" NO COMMIT"
val expected = "COPY \"dummy\" FROM 'hdfs://example-hdfs:8020/tmp/test/*.parquet' ON ANY NODE parquet REJECTED DATA AS TABLE \"dummy_id_REJECTS_TMP\" NO COMMIT"

val jdbcLayerInterface = mock[JdbcLayerInterface]
(jdbcLayerInterface.configureSession _).expects(*).returning(Right(()))
(jdbcLayerInterface.executeUpdate _).expects(*, *).returning(Right(1))
(jdbcLayerInterface.query _).expects("EXPLAIN " + expected, *).returning(Right(getEmptyResultSet))
(jdbcLayerInterface.executeUpdate _).expects(expected, *).returning(Right(1))
(jdbcLayerInterface.query _).expects("SELECT COUNT(*) as count FROM \"dummy_id_COMMITS\"", *).returning(Right(getCountTableResultSet()))
(jdbcLayerInterface.query _).expects("SELECT COUNT(*) as count FROM \"dummy_id_REJECTS_TMP\"", *).returning(Right(getCountTableResultSet()))
(jdbcLayerInterface.close _).expects().returning(Right(()))
(jdbcLayerInterface.execute _).expects(*,*).returning(Right(()))
(jdbcLayerInterface.commit _).expects().returning(Right(()))

val schemaToolsInterface = mock[SchemaToolsInterface]
Expand Down