diff --git a/spark-jobs/brazilian-finder/brazilian-finder_spark.py b/spark-jobs/brazilian-finder/brazilian-finder_spark.py index e5907c5..300b0ae 100644 --- a/spark-jobs/brazilian-finder/brazilian-finder_spark.py +++ b/spark-jobs/brazilian-finder/brazilian-finder_spark.py @@ -8,46 +8,45 @@ def spark_job(spark: SparkSession, params, *args, **kwargs): s3_brazilian_words_path = params.get("brazilian_words_bucket") s3_output_path = params.get("output_bucket") s3_master_combo_path = params.get("master_bucket") + # Load and broadcast the words table + words_df = spark.read.text(s3_brazilian_words_path).select(lower("value").alias("word")) + broadcasted_words_df = broadcast(words_df) + broadcasted_words_df.createOrReplaceTempView("words") - # Register paths as variables for SQL + # Register paths as SQL variables spark.sql(f""" CREATE OR REPLACE TEMP VIEW bronze_table AS SELECT * FROM delta.`{s3_input_combo_path}` """) - spark.sql(f""" - CREATE OR REPLACE TEMP VIEW words AS - SELECT LOWER(value) AS word - FROM text.`{s3_brazilian_words_path}` - """) - spark.sql(""" CREATE OR REPLACE TEMP VIEW emails AS SELECT DISTINCT LOWER(email_tel) AS email_lower, email_tel FROM bronze_table """) + # Use SQL with broadcast applied to words spark.sql(""" CREATE OR REPLACE TEMP VIEW matching_emails AS SELECT DISTINCT e.email_tel FROM emails e - JOIN words w /*+ BROADCAST */ - ON e.email_lower LIKE CONCAT('%', w.word, '%') + JOIN words w ON e.email_lower LIKE CONCAT('%', w.word, '%') """) - # Write to silver delta table as a .txt file with or igin al emails + # Write to silver delta table as a .txt file with original emails spark.sql(f""" INSERT OVERWRITE TEXT.`{s3_output_path}` SELECT email_tel FROM matching_emails """) - # Append the matching emails to the ma ster delt ta table + # Append the matching emails to the master delta table spark.sql(f""" INSERT INTO delta.`{s3_master_combo_path}` SELECT email_tel FROM matching_emails """) + if __name__ == "__main__": execute_spark(method=spark_job)