Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Save job status tables option #311

Merged
merged 18 commits into from
Jan 24, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ final case class DistributedFilesystemWriteConfig(jdbcConfig: JDBCConfig,
failedRowPercentTolerance: Float,
filePermissions: ValidFilePermissions,
createExternalTable: Option[CreateExternalTableOption],
saveMetadataTables: Boolean = false,
jonathanl-bq marked this conversation as resolved.
Show resolved Hide resolved
mergeKey: Option[ValidColumnList] = None
) extends WriteConfig {
private var overwrite: Boolean = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,18 @@ object DSConfigSetupUtils {
}
}

def getSaveMetadataTables(config: Map[String, String]): ValidationResult[Boolean] = {
config.get("save_metadata_tables") match {
case Some(str) =>
str match {
case "true" => true.validNec
case "false" => false.validNec
case _ => InvalidSaveMetadataTablesOption().invalidNec
}
case None => false.validNec
}
}

def getStagingFsUrl(config: Map[String, String]): ValidationResult[String] = {
config.get("staging_fs_url") match {
case Some(address) => address.validNec
Expand Down Expand Up @@ -597,6 +609,7 @@ class DSWriteConfigSetup(val schema: Option[StructType], val pipeFactory: Vertic
DSConfigSetupUtils.getFailedRowsPercentTolerance(config),
DSConfigSetupUtils.getFilePermissions(config),
DSConfigSetupUtils.getCreateExternalTable(config),
DSConfigSetupUtils.getSaveMetadataTables(config),
DSConfigSetupUtils.getMergeKey(config)
).mapN(DistributedFilesystemWriteConfig)
case None =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,10 +150,10 @@ class VerticaDistributedFilesystemWritePipe(val config: DistributedFilesystemWri
_ <- if(existingData) Right(()) else fileStoreLayer.createDir(getAddress(), perm.toString)

// Create job status table / entry
_ <- if(config.createExternalTable.isDefined) {
Right(())
} else {
_ <- if(config.saveMetadataTables) {
tableUtils.createAndInitJobStatusTable(config.tablename, config.jdbcConfig.auth.user, config.sessionId, if(config.isOverwrite) "OVERWRITE" else "APPEND")
} else {
Right(())
}
} yield ()
}
Expand Down Expand Up @@ -451,7 +451,11 @@ class VerticaDistributedFilesystemWritePipe(val config: DistributedFilesystemWri
faultToleranceResults <- testFaultTolerance(rowsCopied, rejectsTableName)
.left.map(err => CommitError(err).context("commit: JDBC Error when trying to determine fault tolerance"))

_ <- tableUtils.updateJobStatusTable(config.tablename, config.jdbcConfig.auth.user, faultToleranceResults.failedRowsPercent, config.sessionId, faultToleranceResults.success)
_ <- if (config.saveMetadataTables) {
tableUtils.updateJobStatusTable(config.tablename, config.jdbcConfig.auth.user, faultToleranceResults.failedRowsPercent, config.sessionId, faultToleranceResults.success)
} else {
Right(())
}

_ <- if (faultToleranceResults.success) Right(()) else Left(FaultToleranceTestFail())

Expand Down Expand Up @@ -499,6 +503,11 @@ class VerticaDistributedFilesystemWritePipe(val config: DistributedFilesystemWri

_ <- tableUtils.validateExternalTable(config.tablename)

_ <- if (config.saveMetadataTables) {
tableUtils.updateJobStatusTable(config.tablename, config.jdbcConfig.auth.user, 0.0, config.sessionId, success = true)
} else {
Right(())
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if we should create the status table for external tables, even if the save_metadata_tables option is true. Since there is no fault tolerance information saved, the only benefit is to track the number of times an external table is created and the session id it was created under.

@alexey-temnikov, thoughts?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. We should have it documented and highlighted in the changelog, that this behaviour will be changed.
@jeremyp-bq, @Aryex, minor nit - should we consider another name instead of save_metadata_tables since it is specific to the rejected rows? Perhaps save_rejected_rows?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alexey-temnikov This PR is to turn off the status table by default and add an option to enable it, but I was thinking we could lump together the creation of the rejected rows table in the future under the same option, which is why it has the generic name save_metadata_tables.

Unless you think it is better to have an option for creation of each (e.g. save_status_table and later save_rejected_rows_table)?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if we should create the status table for external tables, even if the save_metadata_tables option is true. Since there is no fault tolerance information saved, the only benefit is to track the number of times an external table is created and the session id it was created under.

This was purely to mimic the old behavior. That said, it could be extended to provide an option to not record jobs of external tables. Something like: save_metadata_tables: all | none | vertica-only.

This PR is to turn off the status table by default and add an option to enable it, but I was thinking we could lump together the creation of the rejected rows table in the future under the same option.

I think using specific options would be better in the long run?

Copy link
Collaborator

@jeremyprime jeremyprime Jan 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking the logic would be like this, unless we see value in creating the status table for external tables:

if(config.saveMetadataTables && !config.createExternalTable.isDefined) {
  // create table
} else {
  Right(())
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I'm not following the condition. What do you mean by createExternalTable is defined AND not defined?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo, fixed it.

To summarize my questions:

  1. Should we create the status table for the external table case, or not since it doesn't provide much value for the external tables case (even if the option is set to true)?
  2. Should we have separate options for saving the status table and rejected rows table, or use the same option for both?

} yield ()

// External table creation always commits. So, if an error was detected, drop the table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,10 @@ case class InvalidCreateExternalTableOption() extends ConnectorError {
override def getFullContext: String = "The 'create_external_table' param is invalid. Please specify " +
"'new-data' or 'existing-data'."
}
case class InvalidSaveMetadataTablesOption() extends ConnectorError {
override def getFullContext: String = "The 'save_metadata_tables' param is invalid. Please specify " +
"'true' or 'false'. This option will default to false if no value is specified."
}
case class InvalidPreventCleanupOption() extends ConnectorError {
override def getFullContext: String = "The 'prevent_cleanup' param is invalid. Please specify " +
"'true' or 'false'. This option will default to false if no value is specified."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ class DSConfigSetupUtilsTest extends AnyFlatSpec with BeforeAndAfterAll with Moc
val opts = Map("create_external_table" -> "new-data")
val v = getResultOrAssert[Option[CreateExternalTableOption]](DSConfigSetupUtils.getCreateExternalTable(opts))
v match {
case Some(value) => assert (value.toString == "new-data")
case Some(value) => assert(value.toString == "new-data")
case _ => fail
}
}
Expand All @@ -357,7 +357,7 @@ class DSConfigSetupUtilsTest extends AnyFlatSpec with BeforeAndAfterAll with Moc
val opts = Map("create_external_table" -> "existing-data")
val v = getResultOrAssert[Option[CreateExternalTableOption]](DSConfigSetupUtils.getCreateExternalTable(opts))
v match {
case Some(value) => assert (value.toString == "existing-data")
case Some(value) => assert(value.toString == "existing-data")
case _ => fail
}
}
Expand Down Expand Up @@ -393,4 +393,21 @@ class DSConfigSetupUtilsTest extends AnyFlatSpec with BeforeAndAfterAll with Moc
assert(serverName.get == server)
}

it should "parse save_metadata_tables" in {
val opts = Map[String, String]("save_metadata_tables" -> "true")
val save_metadata_tables = getResultOrAssert[Boolean](DSConfigSetupUtils.getSaveMetadataTables(opts))
assert(save_metadata_tables)
}

it should "defaults save_metadata_tables to false" in {
val opts = Map[String, String]()
val save_metadata_tables = getResultOrAssert[Boolean](DSConfigSetupUtils.getSaveMetadataTables(opts))
assert(!save_metadata_tables)
}

it should "error on invalid input to save_metadata_tables" in {
val opts = Map[String, String]("save_metadata_tables" -> "asdf")
val error = getErrorOrAssert[ConnectorError](DSConfigSetupUtils.getSaveMetadataTables(opts))
assert(error.toNonEmptyList.head.isInstanceOf[InvalidSaveMetadataTablesOption])
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,15 @@ class DSWriterTest extends AnyFlatSpec with BeforeAndAfterAll with MockFactory {
strlen = 1024,
copyColumnList = None,
sessionId = "id",
failedRowPercentTolerance = 0.0f,
failedRowPercentTolerance = 0.0f,
filePermissions = ValidFilePermissions("777").getOrElse(throw new Exception("File perm error")),
createExternalTable = None
createExternalTable = None,
saveMetadataTables = false
)

val uniqueId = "unique-id"

private def checkResult(result: ConnectorResult[Unit]): Unit= {
private def checkResult(result: ConnectorResult[Unit]): Unit = {
result match {
case Left(err) => fail(err.getFullContext)
case Right(_) => ()
Expand All @@ -58,7 +59,7 @@ class DSWriterTest extends AnyFlatSpec with BeforeAndAfterAll with MockFactory {
val v1: Int = 1
val v2: Float = 2.0f
val v3: Float = 3.0f
val dataBlock = DataBlock(List(InternalRow(v1,v2), InternalRow(v1,v3)))
val dataBlock = DataBlock(List(InternalRow(v1, v2), InternalRow(v1, v3)))

val pipe = mock[DummyWritePipe]
(pipe.getDataBlockSize _).expects().returning(Right(2))
Expand All @@ -83,7 +84,7 @@ class DSWriterTest extends AnyFlatSpec with BeforeAndAfterAll with MockFactory {
val v1: Int = 1
val v2: Float = 2.0f
val v3: Float = 3.0f
val dataBlock = DataBlock(List(InternalRow(v1,v2), InternalRow(v1,v3)))
val dataBlock = DataBlock(List(InternalRow(v1, v2), InternalRow(v1, v3)))

val pipe = mock[DummyWritePipe]
(pipe.getDataBlockSize _).expects().returning(Right(3))
Expand All @@ -107,8 +108,8 @@ class DSWriterTest extends AnyFlatSpec with BeforeAndAfterAll with MockFactory {
val v1: Int = 1
val v2: Float = 2.0f
val v3: Float = 3.0f
val dataBlock1 = DataBlock(List(InternalRow(v1,v2), InternalRow(v1,v3)))
val dataBlock2 = DataBlock(List(InternalRow(v1,v3), InternalRow(v1,v2)))
val dataBlock1 = DataBlock(List(InternalRow(v1, v2), InternalRow(v1, v3)))
val dataBlock2 = DataBlock(List(InternalRow(v1, v3), InternalRow(v1, v2)))

val pipe = mock[DummyWritePipe]
(pipe.getDataBlockSize _).expects().returning(Right(2))
Expand Down
Loading