diff --git a/project/SparkRedshiftBuild.scala b/project/SparkRedshiftBuild.scala index 1a5301f9..9f4ce429 100644 --- a/project/SparkRedshiftBuild.scala +++ b/project/SparkRedshiftBuild.scala @@ -51,7 +51,7 @@ object SparkRedshiftBuild extends Build { testSparkVersion := sys.props.get("spark.testVersion").getOrElse(sparkVersion.value), testSparkAvroVersion := sys.props.get("sparkAvro.testVersion").getOrElse("3.0.0"), testHadoopVersion := sys.props.get("hadoop.testVersion").getOrElse("2.2.0"), - testAWSJavaSDKVersion := sys.props.get("aws.testVersion").getOrElse("1.10.22"), + testAWSJavaSDKVersion := sys.props.get("aws.testVersion").getOrElse("1.11.166"), spName := "databricks/spark-redshift", sparkComponents ++= Seq("sql", "hive"), spIgnoreProvided := true, diff --git a/src/main/scala/com/databricks/spark/redshift/Utils.scala b/src/main/scala/com/databricks/spark/redshift/Utils.scala index 82c48c3a..5fabb0d1 100644 --- a/src/main/scala/com/databricks/spark/redshift/Utils.scala +++ b/src/main/scala/com/databricks/spark/redshift/Utils.scala @@ -24,6 +24,7 @@ import scala.util.control.NonFatal import com.amazonaws.services.s3.{AmazonS3URI, AmazonS3Client} import com.amazonaws.services.s3.model.BucketLifecycleConfiguration +import com.amazonaws.services.s3.model.lifecycle.{LifecycleAndOperator, LifecyclePredicateVisitor, LifecyclePrefixPredicate, LifecycleTagPredicate} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.FileSystem import org.slf4j.LoggerFactory @@ -133,11 +134,17 @@ private[redshift] object Utils { val rules = Option(s3Client.getBucketLifecycleConfiguration(bucket)) .map(_.getRules.asScala) .getOrElse(Seq.empty) + val keyPrefixMatchingVisitor = new KeyPrefixMatchingVisitor(key) rules.exists { rule => // Note: this only checks that there is an active rule which matches the temp directory; // it does not actually check that the rule will delete the files. This check is still // better than nothing, though, and we can always improve it later. - rule.getStatus == BucketLifecycleConfiguration.ENABLED && key.startsWith(rule.getPrefix) + rule.getFilter.getPredicate.accept(keyPrefixMatchingVisitor) + if (rule.getStatus == BucketLifecycleConfiguration.ENABLED) { + keyPrefixMatchingVisitor.matchFound + } else { + false + } } } if (!hasMatchingBucketLifecycleRule) { @@ -205,3 +212,17 @@ private[redshift] object Utils { } } } + +private class KeyPrefixMatchingVisitor(key: String) extends LifecyclePredicateVisitor { + var matchFound = false + + override def visit(lifecyclePrefixPredicate: LifecyclePrefixPredicate): Unit = { + if (!matchFound && key.startsWith(lifecyclePrefixPredicate.getPrefix)) { + matchFound = true + } + } + + override def visit(lifecycleTagPredicate: LifecycleTagPredicate): Unit = {} + + override def visit(lifecycleAndOperator: LifecycleAndOperator): Unit = {} +}