Skip to content

Commit

Permalink
fix: tune brazilian spark parittions
Browse files Browse the repository at this point in the history
  • Loading branch information
leo-the-nardo committed Nov 11, 2024
1 parent d942816 commit 18812d0
Showing 1 changed file with 14 additions and 10 deletions.
24 changes: 14 additions & 10 deletions spark-jobs/brazilian-finder/brazilian-finder_spark.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from urllib.parse import quote
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lower
from pyspark.sql.functions import col, lower, lit, array
from spark_session import execute_spark
import re
from functools import reduce

def spark_job(spark: SparkSession, params, *args, **kwargs):
# Extract parameters
Expand All @@ -13,18 +13,22 @@ def spark_job(spark: SparkSession, params, *args, **kwargs):

# Load words and collect to a list
words_df = spark.read.text(s3_brazilian_words_path).select(lower("value").alias("word"))
words_list = [row.word for row in words_df.collect()]

# Create regex pattern
pattern = '|'.join([re.escape(word) for word in words_list])
words_list = [row.word for row in words_df.collect() if row.word]

# Read emails and select necessary columns
emails_df = spark.read.format('delta').load(s3_input_combo_path) \
.select(lower(col('email_tel')).alias('email_lower'), 'email_tel').distinct()
.select(lower(col('email_tel')).alias('email_lower'), 'email_tel')

# Repartition to increase parallelism
num_partitions = 100 # Adjust based on your cluster configuration
emails_df = emails_df.repartition(num_partitions)

# Create a combined condition using 'contains' and 'reduce'
conditions = [col('email_lower').contains(word) for word in words_list]
combined_condition = reduce(lambda x, y: x | y, conditions)

# Filter emails using rlike
matching_emails_df = emails_df.filter(col('email_lower').rlike(pattern)) \
.select('email_tel').distinct()
# Filter emails using the combined condition
matching_emails_df = emails_df.filter(combined_condition).select('email_tel').distinct()

# Write matching emails to output path
matching_emails_df.write.mode("overwrite").text(s3_output_path)
Expand Down

0 comments on commit 18812d0

Please sign in to comment.