Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Spark calculations #493

Closed
Avinash-1394 opened this issue Nov 23, 2023 · 8 comments · Fixed by #497
Closed

Add support for Spark calculations #493

Avinash-1394 opened this issue Nov 23, 2023 · 8 comments · Fixed by #497

Comments

@Avinash-1394
Copy link

Description

Add support to run spark calculations using any cursor

Related docs

  1. Implementation in awswrangler - https://github.com/aws/aws-sdk-pandas/blob/main/awswrangler/athena/_spark.py
  2. Official AWS docs - https://docs.aws.amazon.com/athena/latest/ug/notebooks-spark.html
  3. MR in dbt-athena-community - feat: support python submissions dbt-labs/dbt-athena#248

Comments

I am currently working on adding support to run python models using the dbt-athena-community adapter and it would be much easier to accomplish if the pyathena library supports this first. I don't think mock_athena supports these yet so testing it actually much more difficult than I thought.

@laughingman7743
Copy link
Owner

@Avinash-1394
Copy link
Author

Avinash-1394 commented Nov 24, 2023

If you pass a flag that uses Spark in some way, I think it would be possible to implement the query to be executed by Spark

The way dbt have handled that is by treating the query as a generic compiled_code and using the file extension to redirect it to either the query engine or spark engine. I just started looking into this library but what would you say is the entrypoint to receive the query? Is it the Cursor class

Refactoring of the cursor class may be necessary, though.

Definitely. It seems like we need to add calls to the new spark api endpoints to the BaseCursor and create something similar to AthenaQueryExecution like AthenaSparkExecution. Don't think that will be enough but those seem like the first steps.

@laughingman7743
Copy link
Owner

laughingman7743 commented Jan 5, 2024

I am trying to implement a cursor class that executes Spark calculations in the following branch.
#497

It looks like the PySpark code can be executed as follows.

import textwrap
from pyathena import connect

conn = connect(work_group="spark-primary", cursor_class=CalcCursor)
with conn.cursor() as cursor:
    cursor.execute(
        textwrap.dedent(
            """
            spark.sql("create database if not exists spark_demo_database")
            """
        )
    )

Since it would be difficult to add features to a regular cursor, I have implemented a different cursor class. If you have any ideas, please feel free to suggest them.

@Avinash-1394
Copy link
Author

Avinash-1394 commented Jan 6, 2024

@laughingman7743 Thank you so much that. I have reviewed the PR.

There are a couple of additional models you can test and check if they cause issues.

Pandas dataframe

import pandas as pd
return pd.DataFrame({"A": [1, 2, 3, 4]})

Spark dataframe

return spark.createDataFrame(data, ["A"])

Think you can also import pyspark and return a pyspark dataframe but I haven't tested that one out

@laughingman7743
Copy link
Owner

The code for the Athena Example notebook is as follows:

Spark Dataframes:

file_name = "s3://athena-examples-us-east-1/notebooks/yellow_tripdata_2016-01.parquet"

taxi_df = (spark.read.format("parquet")
     .option("header", "true")
     .option("inferSchema", "true")
     .load(file_name))

print("Read parquet file" + " complete")

taxi1_df=taxi_df.groupBy("VendorID", "passenger_count").count()
taxi1_df.show()

var1 = taxi1_df.collect()
%table var1

taxi1_df.coalesce(1).write.mode('overwrite').csv("s3://aws-athena-query-results-****-us-west-2-hl3rhzkk/select_taxi")
print("Write to s3 " + "complete")

Spark SQL:

spark.sql("create database if not exists spark_demo_database")
spark.sql("show databases").show()

spark.sql("use spark_demo_database")
taxi1_df.write.mode("overwrite").format("parquet").option("path","s3://aws-athena-query-results-****-us-west-2-hl3rhzkk/select_taxi").saveAsTable("select_taxi_table")
print("Create new table" + " complete")

spark.sql("show tables").show()

spark.sql("select * from select_taxi_table").show()

spark.sql("DROP TABLE if exists select_taxi_table")
spark.sql("DROP DATABASE if exists spark_demo_database")
print("Clean resources" + " complete")

@laughingman7743
Copy link
Owner

Dataframe would be using Spark Dataframe, not Pandas. I am not sure of the use case that would return values. You will probably be running code that writes data out to S3.

@Avinash-1394
Copy link
Author

That was to mainly check if the import cause any issues but think we can skip that feedback 👍🏽

laughingman7743 added a commit that referenced this issue Jan 9, 2024
Implement SparkCursor to support Spark calculations (fix #493)
@laughingman7743
Copy link
Owner

I just have released v3.1.0. 🎉
https://pypi.org/project/PyAthena/3.1.0/
https://github.com/laughingman7743/PyAthena/releases/tag/v3.1.0

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants