From 49829dee20ee611322ef5925f99938dbd8db7e59 Mon Sep 17 00:00:00 2001 From: Aireed Date: Thu, 2 Jan 2025 19:31:10 +0800 Subject: [PATCH] fix: create table like using mixed_hive --- .../analysis/RewriteMixedFormatCommand.scala | 69 ++++++++++++------- 1 file changed, 43 insertions(+), 26 deletions(-) diff --git a/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/src/main/scala/org/apache/amoro/spark/sql/catalyst/analysis/RewriteMixedFormatCommand.scala b/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/src/main/scala/org/apache/amoro/spark/sql/catalyst/analysis/RewriteMixedFormatCommand.scala index aa21253abe..89ce973951 100644 --- a/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/src/main/scala/org/apache/amoro/spark/sql/catalyst/analysis/RewriteMixedFormatCommand.scala +++ b/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/src/main/scala/org/apache/amoro/spark/sql/catalyst/analysis/RewriteMixedFormatCommand.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.connector.catalog.TableCatalog import org.apache.spark.sql.execution.command.CreateTableLikeCommand -import org.apache.amoro.spark.{MixedFormatSparkCatalog, MixedFormatSparkSessionCatalog} +import org.apache.amoro.spark.{MixedFormatSparkCatalog, MixedFormatSparkSessionCatalog, SparkUnifiedCatalog, SparkUnifiedSessionCatalog} import org.apache.amoro.spark.mixed.MixedSessionCatalogBase import org.apache.amoro.spark.sql.MixedFormatExtensionUtils.buildCatalogAndIdentifier import org.apache.amoro.spark.sql.catalyst.plans.{AlterMixedFormatTableDropPartition, TruncateMixedFormatTable} @@ -51,6 +51,10 @@ case class RewriteMixedFormatCommand(sparkSession: SparkSession) extends Rule[Lo case _: MixedFormatSparkSessionCatalog[_] => provider.isDefined && MixedSessionCatalogBase.SUPPORTED_PROVIDERS.contains( provider.get.toLowerCase) + case _: SparkUnifiedCatalog => true + case _: SparkUnifiedSessionCatalog[_] => + provider.isDefined && MixedSessionCatalogBase.SUPPORTED_PROVIDERS.contains( + provider.get.toLowerCase) case _ => false } } @@ -82,35 +86,48 @@ case class RewriteMixedFormatCommand(sparkSession: SparkSession) extends Rule[Lo optionsMap += (WriteMode.WRITE_MODE_KEY -> WriteMode.OVERWRITE_DYNAMIC.mode) val newTableSpec = tableSpec.copy(properties = propertiesMap) c.copy(tableSpec = newTableSpec, writeOptions = optionsMap) - case CreateTableLikeCommand(targetTable, sourceTable, _, provider, properties, ifNotExists) - if isCreateMixedFormatTableLikeCommand(targetTable, provider) => - val (sourceCatalog, sourceIdentifier) = buildCatalogAndIdentifier(sparkSession, sourceTable) - val (targetCatalog, targetIdentifier) = buildCatalogAndIdentifier(sparkSession, targetTable) - val table = sourceCatalog.loadTable(sourceIdentifier) - var targetProperties = properties - targetProperties += ("provider" -> "arctic") - table match { - case keyedTable: MixedSparkTable => - keyedTable.table() match { - case table: KeyedTable => - targetProperties += ("primary.keys" -> String.join( - ",", - table.primaryKeySpec().fieldNames())) + case CreateTableLikeCommand(targetTable, sourceTable, _, provider, properties, ifNotExists) => + isCreateMixedFormatTableLikeCommand(targetTable, provider) match { + case true => + val (sourceCatalog, sourceIdentifier) = + buildCatalogAndIdentifier(sparkSession, sourceTable) + val (targetCatalog, targetIdentifier) = + buildCatalogAndIdentifier(sparkSession, targetTable) + val table = sourceCatalog.loadTable(sourceIdentifier) + var targetProperties = properties + targetProperties += ("provider" -> "mixed_hive") + table match { + case keyedTable: MixedSparkTable => + keyedTable.table() match { + case table: KeyedTable => + targetProperties += ("primary.keys" -> String.join( + ",", + table.primaryKeySpec().fieldNames())) + case _ => + } case _ => } + val tableSpec = TableSpec( + properties = targetProperties.toMap, + provider = provider, + options = Map.empty, + location = None, + comment = None, + serde = None, + external = false) + val seq: Seq[String] = Seq( + targetTable.database.getOrElse(sparkSession.catalog.currentDatabase), + targetTable.identifier) + val name = ResolvedDBObjectName(targetCatalog, seq) + CreateTable(name, table.schema(), table.partitioning(), tableSpec, ifNotExists) case _ => + provider.isDefined match { + case true => + throw new UnsupportedOperationException( + s"format ${provider.get} does not support create table like command!!!") + case false => plan + } } - val tableSpec = TableSpec( - properties = targetProperties.toMap, - provider = provider, - options = Map.empty, - location = None, - comment = None, - serde = None, - external = false) - val seq: Seq[String] = Seq(targetTable.database.get, targetTable.identifier) - val name = ResolvedDBObjectName(targetCatalog, seq) - CreateTable(name, table.schema(), table.partitioning(), tableSpec, ifNotExists) case _ => plan } }