Skip to content

Commit

Permalink
fix: broadcast syntax
Browse files Browse the repository at this point in the history
  • Loading branch information
leo-the-nardo committed Nov 9, 2024
1 parent 9fbb152 commit 3ebda8a
Showing 1 changed file with 10 additions and 11 deletions.
21 changes: 10 additions & 11 deletions spark-jobs/brazilian-finder/brazilian-finder_spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,46 +8,45 @@ def spark_job(spark: SparkSession, params, *args, **kwargs):
s3_brazilian_words_path = params.get("brazilian_words_bucket")
s3_output_path = params.get("output_bucket")
s3_master_combo_path = params.get("master_bucket")
# Load and broadcast the words table
words_df = spark.read.text(s3_brazilian_words_path).select(lower("value").alias("word"))
broadcasted_words_df = broadcast(words_df)
broadcasted_words_df.createOrReplaceTempView("words")

# Register paths as variables for SQL
# Register paths as SQL variables
spark.sql(f"""
CREATE OR REPLACE TEMP VIEW bronze_table AS
SELECT *
FROM delta.`{s3_input_combo_path}`
""")

spark.sql(f"""
CREATE OR REPLACE TEMP VIEW words AS
SELECT LOWER(value) AS word
FROM text.`{s3_brazilian_words_path}`
""")

spark.sql("""
CREATE OR REPLACE TEMP VIEW emails AS
SELECT DISTINCT LOWER(email_tel) AS email_lower, email_tel
FROM bronze_table
""")

# Use SQL with broadcast applied to words
spark.sql("""
CREATE OR REPLACE TEMP VIEW matching_emails AS
SELECT DISTINCT e.email_tel
FROM emails e
JOIN words w /*+ BROADCAST */
ON e.email_lower LIKE CONCAT('%', w.word, '%')
JOIN words w ON e.email_lower LIKE CONCAT('%', w.word, '%')
""")

# Write to silver delta table as a .txt file with or igin al emails
# Write to silver delta table as a .txt file with original emails
spark.sql(f"""
INSERT OVERWRITE TEXT.`{s3_output_path}`
SELECT email_tel
FROM matching_emails
""")

# Append the matching emails to the ma ster delt ta table
# Append the matching emails to the master delta table
spark.sql(f"""
INSERT INTO delta.`{s3_master_combo_path}`
SELECT email_tel
FROM matching_emails
""")

if __name__ == "__main__":
execute_spark(method=spark_job)

0 comments on commit 3ebda8a

Please sign in to comment.