Skip to content

Commit

Permalink
Update bucket lifecycle check
Browse files Browse the repository at this point in the history
`getPrefix` method on `Rule` [got deprecated](https://github.com/aws/aws-sdk-java/blob/355424771b951ef0066b19c3eab4b4356e270cf4/aws-java-sdk-s3/src/main/java/com/amazonaws/services/s3/model/BucketLifecycleConfiguration.java#L145-L153)
It seems that reponse on the wire was also changed so this method no
longer returns the prefix even on older versions of AWS SDK (as the one
used by this project).

I've bumped the AWS SDK dependencies version and implemented the check
using new visitor pattern. I am not sure it is the nicest scala code,
but I think it works. Tests stil pass.

I believe this fixes databricks#346.
  • Loading branch information
BorePlusPlus committed Jul 21, 2017
1 parent 8adfe95 commit a8f3bae
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 2 deletions.
2 changes: 1 addition & 1 deletion project/SparkRedshiftBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ object SparkRedshiftBuild extends Build {
testSparkVersion := sys.props.get("spark.testVersion").getOrElse(sparkVersion.value),
testSparkAvroVersion := sys.props.get("sparkAvro.testVersion").getOrElse("3.0.0"),
testHadoopVersion := sys.props.get("hadoop.testVersion").getOrElse("2.2.0"),
testAWSJavaSDKVersion := sys.props.get("aws.testVersion").getOrElse("1.10.22"),
testAWSJavaSDKVersion := sys.props.get("aws.testVersion").getOrElse("1.11.166"),
spName := "databricks/spark-redshift",
sparkComponents ++= Seq("sql", "hive"),
spIgnoreProvided := true,
Expand Down
23 changes: 22 additions & 1 deletion src/main/scala/com/databricks/spark/redshift/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.util.control.NonFatal

import com.amazonaws.services.s3.{AmazonS3URI, AmazonS3Client}
import com.amazonaws.services.s3.model.BucketLifecycleConfiguration
import com.amazonaws.services.s3.model.lifecycle.{LifecycleAndOperator, LifecyclePredicateVisitor, LifecyclePrefixPredicate, LifecycleTagPredicate}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.slf4j.LoggerFactory
Expand Down Expand Up @@ -133,11 +134,17 @@ private[redshift] object Utils {
val rules = Option(s3Client.getBucketLifecycleConfiguration(bucket))
.map(_.getRules.asScala)
.getOrElse(Seq.empty)
val keyPrefixMatchingVisitor = new KeyPrefixMatchingVisitor(key)
rules.exists { rule =>
// Note: this only checks that there is an active rule which matches the temp directory;
// it does not actually check that the rule will delete the files. This check is still
// better than nothing, though, and we can always improve it later.
rule.getStatus == BucketLifecycleConfiguration.ENABLED && key.startsWith(rule.getPrefix)
rule.getFilter.getPredicate.accept(keyPrefixMatchingVisitor)
if (rule.getStatus == BucketLifecycleConfiguration.ENABLED) {
keyPrefixMatchingVisitor.matchFound
} else {
false
}
}
}
if (!hasMatchingBucketLifecycleRule) {
Expand Down Expand Up @@ -205,3 +212,17 @@ private[redshift] object Utils {
}
}
}

private class KeyPrefixMatchingVisitor(key: String) extends LifecyclePredicateVisitor {
var matchFound = false

override def visit(lifecyclePrefixPredicate: LifecyclePrefixPredicate): Unit = {
if (!matchFound && key.startsWith(lifecyclePrefixPredicate.getPrefix)) {
matchFound = true
}
}

override def visit(lifecycleTagPredicate: LifecycleTagPredicate): Unit = {}

override def visit(lifecycleAndOperator: LifecycleAndOperator): Unit = {}
}

0 comments on commit a8f3bae

Please sign in to comment.