diff --git a/spark-jobs/brazilian-finder/brazilian-finder_spark.py b/spark-jobs/brazilian-finder/brazilian-finder_spark.py index f91d7f5..7433d5c 100644 --- a/spark-jobs/brazilian-finder/brazilian-finder_spark.py +++ b/spark-jobs/brazilian-finder/brazilian-finder_spark.py @@ -1,8 +1,8 @@ from urllib.parse import quote from pyspark.sql import SparkSession -from pyspark.sql.functions import col, lower +from pyspark.sql.functions import col, lower, lit, array from spark_session import execute_spark -import re +from functools import reduce def spark_job(spark: SparkSession, params, *args, **kwargs): # Extract parameters @@ -13,18 +13,22 @@ def spark_job(spark: SparkSession, params, *args, **kwargs): # Load words and collect to a list words_df = spark.read.text(s3_brazilian_words_path).select(lower("value").alias("word")) - words_list = [row.word for row in words_df.collect()] - - # Create regex pattern - pattern = '|'.join([re.escape(word) for word in words_list]) + words_list = [row.word for row in words_df.collect() if row.word] # Read emails and select necessary columns emails_df = spark.read.format('delta').load(s3_input_combo_path) \ - .select(lower(col('email_tel')).alias('email_lower'), 'email_tel').distinct() + .select(lower(col('email_tel')).alias('email_lower'), 'email_tel') + + # Repartition to increase parallelism + num_partitions = 100 # Adjust based on your cluster configuration + emails_df = emails_df.repartition(num_partitions) + + # Create a combined condition using 'contains' and 'reduce' + conditions = [col('email_lower').contains(word) for word in words_list] + combined_condition = reduce(lambda x, y: x | y, conditions) - # Filter emails using rlike - matching_emails_df = emails_df.filter(col('email_lower').rlike(pattern)) \ - .select('email_tel').distinct() + # Filter emails using the combined condition + matching_emails_df = emails_df.filter(combined_condition).select('email_tel').distinct() # Write matching emails to output path matching_emails_df.write.mode("overwrite").text(s3_output_path)