This is a short example of how to use Aqueduct to deploy a sentiment analysis model.
You can find and download this notebook on GitHub here.
Note: This example workflow uses HuggingFace's Transformers package, which uses large models. If you're running on a resource constrained machine, or if you're running on an M1 Mac using Rosetta, you will likely run out of memory for these models. We recommend using another example workflow if this is the case.
Throughout this notebook, you'll see a decorator (@aq.op
) above functions. This decorator allows Aqueduct to run your functions as a part of a workflow automatically.
import aqueduct
from aqueduct.decorator import op, check
# If you're running your notebook on a separate machine from your
# Aqueduct server, change this to the address of your Aqueduct server.
address = "http://localhost:8080"
# If you're running youre notebook on a separate machine from your
# Aqueduct server, you will have to copy your API key here rather than
# using `get_apikey()`.
api_key = aqueduct.get_apikey()
client = aqueduct.Client(api_key, address)
First, we'll load some test data. Here, we'll use a dataset that has reviews of various hotels; our table has the name of the hotel, the date of the review, the nationality of the reviewer, and the text of the review itself. This data is preloaded for us in the Aqueduct demo DB.
warehouse = client.resource("Demo")
# reviews_table is an Aqueduct TableArtifact, which is a wrapper around
# a Pandas DataFrame. A TableArtifact can be used as argument to any operator
# in a workflow; you can also call .get() on a TableArtifact to retrieve
# the underlying DataFrame and interact with it directly.
reviews_table = warehouse.sql("select * from hotel_reviews;")
# This gets the head of the underlying DataFrame. Note that you can't
# pass a DataFrame as an argument to a workflow; you must use the Aqueduct
# TableArtifact!
reviews_table.get()
Output
hotel_name | review_date | reviewer_nationality | review | |
---|---|---|---|---|
0 | H10 Itaca | 2017-08-03 | Australia | Damaged bathroom shower screen sealant and ti... |
1 | De Vere Devonport House | 2016-03-28 | United Kingdom | No Negative The location and the hotel was ver... |
2 | Ramada Plaza Milano | 2016-05-15 | Kosovo | No Negative Im a frequent traveler i visited m... |
3 | Aloft London Excel | 2016-11-05 | Canada | Only tepid water for morning shower They said ... |
4 | The Student Hotel Amsterdam City | 2016-07-31 | Australia | No Negative The hotel had free gym table tenni... |
... | ... | ... | ... | ... |
95 | The Chesterfield Mayfair | 2015-08-25 | Denmark | Bad Reading light And light in bathNo Positive |
96 | Hotel V Nesplein | 2015-08-27 | Turkey | Nothing except the construction going on the s... |
97 | Le Parisis Paris Tour Eiffel | 2015-10-20 | Australia | When we arrived we had to bring our own baggag... |
98 | NH Amsterdam Museum Quarter | 2016-01-26 | Belgium | No stairs even to go the first floor Restaura... |
99 | Barcel Raval | 2017-07-07 | United Kingdom | Air conditioning a little zealous Nice atmosp... |
100 rows × 4 columns
Now that we have our data, we'll define an Aqueduct operator called sentiment_prediction
that takes in our reviews data and appends a positive or negative label to the table as well as a score rating how positive or negative the review was.
from transformers import pipeline
import pandas as pd
import torch # this is needed to ensure that pytorch is installed.
# The @op decorator here allows Aqueduct to run this function as
# a part of the Aqueduct workflow. It tells Aqueduct that when
# we execute this function, we're defining a step in the workflow.
# While the results can be retrieved immediately, nothing is
# published until we call `publish_flow()` below.
@op()
def sentiment_prediction(reviews):
"""
This function uses the HuggingFace transformers library's sentiment-analysis
model to predict the positive or negative sentiment of the reviews passed in
to this function. The reviews argument is expected to have a `review` column
and can have any other additional columns.
This function will append the sentiment prediction as a column to the original
DataFrame.
"""
model = pipeline("sentiment-analysis")
return reviews.join(pd.DataFrame(model(list(reviews["review"]))))
# This tells Aqueduct to execute sentiment_prediction on reviews_table
# as a part of our workflow. However, nothing is published (yet) until we
# call `publish_flow()` below.
sentiment_table = sentiment_prediction(reviews_table)
We can see all the positive or negative labels as well as the numerical score generated by our sentiment model by calling .get()
on the sentiment_table
:
sentiment_table.get()
Output
hotel_name | review_date | reviewer_nationality | review | label | score | |
---|---|---|---|---|---|---|
0 | H10 Itaca | 2017-08-03 | Australia | Damaged bathroom shower screen sealant and ti... | POSITIVE | 0.715813 |
1 | De Vere Devonport House | 2016-03-28 | United Kingdom | No Negative The location and the hotel was ver... | POSITIVE | 0.999741 |
2 | Ramada Plaza Milano | 2016-05-15 | Kosovo | No Negative Im a frequent traveler i visited m... | POSITIVE | 0.999773 |
3 | Aloft London Excel | 2016-11-05 | Canada | Only tepid water for morning shower They said ... | NEGATIVE | 0.999169 |
4 | The Student Hotel Amsterdam City | 2016-07-31 | Australia | No Negative The hotel had free gym table tenni... | NEGATIVE | 0.931378 |
... | ... | ... | ... | ... | ... | ... |
95 | The Chesterfield Mayfair | 2015-08-25 | Denmark | Bad Reading light And light in bathNo Positive | NEGATIVE | 0.999340 |
96 | Hotel V Nesplein | 2015-08-27 | Turkey | Nothing except the construction going on the s... | POSITIVE | 0.999691 |
97 | Le Parisis Paris Tour Eiffel | 2015-10-20 | Australia | When we arrived we had to bring our own baggag... | NEGATIVE | 0.999032 |
98 | NH Amsterdam Museum Quarter | 2016-01-26 | Belgium | No stairs even to go the first floor Restaura... | POSITIVE | 0.996806 |
99 | Barcel Raval | 2017-07-07 | United Kingdom | Air conditioning a little zealous Nice atmosp... | POSITIVE | 0.999748 |
100 rows × 6 columns
It might be helpful to monitor the runtime of this sentiment_predictions operator. Aqueduct comes with a set of pre-built system metrics that allow you to capture system-level metrics like function runtime and memory usage. You can see all available system metrics, you can call list_system_metrics
. Here, we'll use the runtime
system metric to track how long it takes to compute the sentiment_table
artifact.
sentiment_table.list_system_metrics()
Output:
['runtime', 'max_memory']
# Use an Aqueduct system metric to capture how long it takes to run
# the sentiment_prediction function that generates sentiment_table.
sentiment_table_runtime = sentiment_table.system_metric("runtime")
Now you can view the runtime (in seconds) by retrieving the contents of the sentimment_table_runtime artifact.
sentiment_table_runtime.get()
Output:
8.313335418701172
Now that we've defined our predictions, we can save them back to the data warehouse. Here, we'll simply write them back to the same demo DB that we loaded the data from earlier, but the predictions can be written to any system Aqueduct is connected to.
# This tells Aqueduct to save the results in sentiment_table
# back to the demo DB we configured earlier.
# NOTE: At this point, no data is actually saved! This is just
# part of a workflow spec that will be executed once the workflow
# is published in the next cell.
warehouse.save(sentiment_table, table_name="sentiment_pred", update_mode="replace")
Finally, we'll publish our workflow to Aqueduct, giving it a name and telling it which artifacts to publish. Optionally, we can also give this workflow a schedule, telling it to run on an hourly basis:
# This publishes all of the logic needed to create sentiment_table
# to Aqueduct. The URL below will take you to the Aqueduct UI, which
# will show you the status of your workflow runs and allow you to
# inspect them.
sentiment_flow = client.publish_flow(
name="Demo Customer Sentiment",
artifacts=[sentiment_table],
# Uncomment the following line to schedule the workflow on a hourly basis.
# schedule=aqueduct.hourly(),
)
print(sentiment_flow.id())
Clicking on the URL above will take you to the Aqueudct UI where you can see the workflow that we just created! On the Aqueduct UI, you'll be able to see the DAG of operators we just created, click into any of those operators, and see the data and metadata associated with each stage of the pipeline.