Skip to content

Commit

Permalink
Fix after merge master
Browse files Browse the repository at this point in the history
  • Loading branch information
EnricoMi committed Apr 20, 2024
1 parent fc707f1 commit 803e5de
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class JdbcRelationProvider extends CreatableRelationProvider
}

case SaveMode.Append =>
if (options.isUpsert && !dialect.supportsUpsert) {
if (options.isUpsert && !dialect.supportsUpsert()) {
throw QueryCompilationErrors.tableDoesNotSupportUpsertsError(options.table)
}
val tableSchema = JdbcUtils.getSchemaOption(conn, options)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,35 +143,34 @@ private case class MsSqlServerDialect() extends JdbcDialect {

override def getUpsertStatement(
tableName: String,
columns: Array[String],
types: Array[DataType],
columns: Array[StructField],
isCaseSensitive: Boolean,
options: JDBCOptions): String = {
val insertColumns = columns.mkString(", ")
val inputs = types
val insertColumns = columns.map(_.name).map(quoteIdentifier)
val inputs = columns
.map(_.dataType)
.map(t => JdbcUtils.getJdbcType(t, this).databaseTypeDefinition)
.zipWithIndex.map {
case (t, idx) => s"DECLARE @param$idx $t; SET @param$idx = ?;"
}.mkString("\n")
val values = columns.indices.map(i => s"@param$i").mkString(", ")
val quotedUpsertKeyColumns = options.upsertKeyColumns.map(quoteIdentifier)
val keyColumns = columns.zipWithIndex.filter {
case (col, _) => quotedUpsertKeyColumns.contains(col)
case (col, _) => options.upsertKeyColumns.contains(col.name)
}
val updateColumns = columns.zipWithIndex.filterNot {
case (col, _) => quotedUpsertKeyColumns.contains(col)
case (col, _) => options.upsertKeyColumns.contains(col.name)
}
val whereClause = keyColumns.map {
case (key, idx) => s"$key = @param$idx"
case (key, idx) => s"${quoteIdentifier(key.name)} = @param$idx"
}.mkString(" AND ")
val updateClause = updateColumns.map {
case (col, idx) => s"$col = @param$idx"
case (col, idx) => s"${quoteIdentifier(col.name)} = @param$idx"
}.mkString(", ")

s"""
|$inputs
|
|INSERT $tableName ($insertColumns)
|INSERT $tableName (${insertColumns.mkString(", ")})
|SELECT $values
|WHERE NOT EXISTS (
| SELECT 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,15 +181,15 @@ private case class MySQLDialect() extends JdbcDialect with SQLConfHelper {
columns: Array[StructField],
isCaseSensitive: Boolean,
options: JDBCOptions): String = {
val insertColumns = columns.mkString(", ")
val insertColumns = columns.map(_.name).map(quoteIdentifier)
val placeholders = columns.map(_ => "?").mkString(",")
val upsertKeyColumns = options.upsertKeyColumns.map(quoteIdentifier)
val updateColumns = columns.filterNot(c => upsertKeyColumns.contains(c.name))
val updateColumns = insertColumns.filterNot(upsertKeyColumns.contains)
val updateClause =
updateColumns.map(x => s"$x = VALUES($x)").mkString(", ")

s"""
|INSERT INTO $tableName ($insertColumns)
|INSERT INTO $tableName (${insertColumns.mkString(", ")})
|VALUES ( $placeholders )
|ON DUPLICATE KEY UPDATE $updateClause
|""".stripMargin
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,19 +166,18 @@ private case class PostgresDialect() extends JdbcDialect with SQLConfHelper {

override def getUpsertStatement(
tableName: String,
columns: Array[String],
types: Array[DataType],
columns: Array[StructField],
isCaseSensitive: Boolean,
options: JDBCOptions): String = {
val insertColumns = columns.mkString(", ")
val insertColumns = columns.map(_.name).map(quoteIdentifier)
val placeholders = columns.map(_ => "?").mkString(",")
val upsertKeyColumns = options.upsertKeyColumns.map(quoteIdentifier)
val updateColumns = columns.filterNot(upsertKeyColumns.contains)
val updateColumns = insertColumns.filterNot(upsertKeyColumns.contains)
val updateClause =
updateColumns.map(x => s"$x = EXCLUDED.$x").mkString(", ")

s"""
|INSERT INTO $tableName ($insertColumns)
|INSERT INTO $tableName (${insertColumns.mkString(", ")})
|VALUES ( $placeholders )
|ON CONFLICT (${upsertKeyColumns.mkString(", ")})
|DO UPDATE SET $updateClause
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1206,11 +1206,14 @@ class JDBCSuite extends QueryTest with SharedSparkSession {
}

val table = "table"
val columns = Array("id", "time", "value", "comment")
val quotedColumns = columns.map(dialect.quoteIdentifier)
val types: Array[DataType] = Array(LongType, TimestampType, DoubleType, StringType)
val columns = Array(
StructField("id", LongType),
StructField("time", TimestampType),
StructField("value", DoubleType),
StructField("comment", StringType)
)
val isCaseSensitive = false
val stmt = dialect.getUpsertStatement(table, quotedColumns, types, isCaseSensitive, options)
val stmt = dialect.getUpsertStatement(table, columns, isCaseSensitive, options)

assert(stmt === expected)
}
Expand Down

0 comments on commit 803e5de

Please sign in to comment.