Skip to content

Commit

Permalink
Merge pull request #184 from awslabs/copy-table-ffmarti
Browse files Browse the repository at this point in the history
support for py 3.9 + partitioning optional + improved readme
  • Loading branch information
bobigbal authored Mar 28, 2024
2 parents d7f59a3 + 770f9f2 commit 98d68a1
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 49 deletions.
67 changes: 35 additions & 32 deletions tools/python/copy-table/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,41 @@

## When to use this automation?

This automation can be used for migrating Timestream for LiveAnalytics table to new table. Automation is divided into two parts #1 for unloading the data from Timestream using different paritioning choices #2 batchloading the data into Timestream, it also covers S3 copy functionality if unload was run on different account or same account with different region.Data modelling changes can be applied as part of batchload. You can use this automation in following use-cases.
This automation can be used for migrating **Timestream for LiveAnalytics** table to new table. Automation is divided into two parts:
1. unloading the data from Timestream using different partitioning choices
2. batchloading the data into Timestream, it also covers S3 copy functionality if unload was run on different account or same account with different region. Data modelling changes can be applied as part of batchload.

You can use this automation in following use-cases:
- Migrating Timestream for LiveAnalytics table to different AWS Organization.
- Migrating Timestream for LiveAnalytics table to different region or different account and need data model changes in destination account/region. If data model changes are not required (and accounts belong to same AWS Organization) try to make use of AWS Backups for Timestream https://docs.aws.amazon.com/aws-backup/latest/devguide/timestream-backup.html and https://docs.aws.amazon.com/aws-backup/latest/devguide/timestream-restore.html.
- Migrating Timestream for LiveAnalytics table to new table with customer defined partition key https://docs.aws.amazon.com/timestream/latest/developerguide/customer-defined-partition-keys.html
- Migrating Timestream for LiveAnalytics table to different region or different account and need data model changes in destination account/region. If data model changes are not required (and accounts belong to same AWS Organization) try to make use of AWS Backups for Timestream (ref. [Amazon Timestream backups](https://docs.aws.amazon.com/aws-backup/latest/devguide/timestream-backup.html) and [Restore an Amazon Timestream table](https://docs.aws.amazon.com/aws-backup/latest/devguide/timestream-restore.html)).
- Migrating Timestream for LiveAnalytics table to new table with [customer defined partition key](https://docs.aws.amazon.com/timestream/latest/developerguide/customer-defined-partition-keys.html)


## Usage and Requirements

These are the full steps to execute the script in your AWS Account.

1. Log into your AWS account and select the AWS Region in which your Timestream table is stored

2. Launch [AWS CloudShell](https://console.aws.amazon.com/cloudshell/home) or your local shell (Python 3.9 or newer is **required**)

3. Clone this source code project using [git](https://git-scm.com/) or download it manually

4. Make sure you have latest pip package installed
```bash
python3 -m ensurepip --upgrade
```
5. Install Python [boto3](https://pypi.org/project/boto3/), [backoff](https://pypi.org/project/backoff/) and [tqdm](https://pypi.org/project/tqdm/) packages
```bash
python3 -m pip install boto3 backoff tqdm
```
6. Run the unload.py or the batch_load.py as described below.

## Getting started with UNLOAD

### Key considerations/limits and best practices
- **Recommendation is to have your UNLOAD not exceed 80 GB**, consider to split the process into multiple UNLOADS leveraging on the time start/end parameters (e.g. if you have 1 TB, run 13 UNLOADS) to avoid any interruptions.
- **Queries containing UNLOAD statement can export at most 100 partitions per query.**, hence consider to split the process into multiple UNLOADS leveraging on the time start/end parameters.
- **Recommendation is to have your UNLOAD not exceed 80 GB**, consider to split the process into multiple UNLOADS leveraging scripts start/end time parameters (e.g. if you have 1 TB, run 13 UNLOADS) to avoid any interruptions.
- **Queries containing UNLOAD statement can export at most 100 partitions per query.**, hence consider to split the process into multiple UNLOADS leveraging scripts start/end and partition parameters
- **Maximum file size in a batch load task cannot exceed 5 GB**.Unload files as part of this automation will not exceed that size.
- **Concurrency for queries using the UNLOAD statement is 1 query per second (QPS)**. Exceeding the query rate will result in throttling.
- **Queries containing UNLOAD statement time out after 60 minutes.**
Expand All @@ -30,14 +53,14 @@ Check the following guide to learn more: [Limits for UNLOAD from Timestream for
- **from_time** [OPTIONAL]: Timestamp (extreme included) from which you want to select data to unload (e.g.: *2024-02-26 17:24:38.270000000*)
- **end_time** [OPTIONAL]: Timestamp (extreme excluded) to which you want to select data to unload (e.g.: *2024-03-15 19:26:31.304000000*)
- **partition** [OPTIONAL]: Time partition you want to use (possible values: *day, month, year*)
- **iam_role_bucket_policy** [OPTIONAL]: {Applies for cross account migrations} Grants destination IAM Role access to S3 Bucket (e.g.: *arn:aws:iam::123456789123:role/BatchLoadRole*)
- **iam_role_arn** [OPTIONAL]: {Applies for cross account migrations} Grants destination IAM Role access to S3 Bucket (e.g.: *arn:aws:iam::123456789123:role/BatchLoadRole*)
### Examples
Example to unload the Timestream table *myTable* in the database *sourcedb* to the folder *unload* in the *timestream-unload-sourcedb-mytable* S3 bucket.
Also, it applies an S3 bucket policy to allow the IAM Role *BatchLoadRole* of account *123456789123* to allow the copy. Does day level partitions.
Also, it applies an S3 bucket policy to allow the IAM Role *BatchLoadRole* of account *123456789123* to allow the copy. It does day level partitions.
```bash
python3.10 unload.py --region eu-west-1 --s3_uri s3://timestream-unload-sourcedb-mytable/unload --database sourcedb --table myTable --iam_role_bucket_policy arn:aws:iam::123456789123:role/BatchLoadRole --partition day
python3 unload.py --region eu-west-1 --s3_uri s3://timestream-unload-sourcedb-mytable/unload --database sourcedb --table myTable --iam_role_arn arn:aws:iam::123456789123:role/BatchLoadRole --partition day
```
## Getting started with BATCH LOAD
Expand Down Expand Up @@ -76,7 +99,7 @@ python3.10 unload.py --region eu-west-1 --s3_uri s3://timestream-unload-sourcedb
### Examples
**With S3 Copy**
#### With S3 Copy
Example to execute a batch load to the target Timestream table *myTable* with partition key *city* in the database *targetdb* with *us-west-2* region.
Timestream objects are created by this script as per *create_timestream_resource* parameter.
Source data are located in the S3 bucket *timestream-unload-sourcedb-mytable* with prefix *unload/results/*.
Expand All @@ -85,36 +108,16 @@ Target bucket and error bucket names are given by *s3_target_bucket* and *s3_tar
Destination prefix will be created with prefix dest/ given by *destination_s3_prefix*. Desired data model file is chosen as *data_model_sample.json* in the current location of the script.
```bash
python3.10 batch_load.py --region us-west-2 --create_timestream_resource --database=targetdb --table=myTable --partition_key city --copy_s3_bucket --s3_source_bucket_location timestream-unload-sourcedb-mytable --source_s3_prefix unload/results/ --create_destination_bucket --s3_target_bucket timestream-batchload-targetdb-mytable --destination_s3_prefix dest/ --create_error_logging_bucket --s3_target_error_bucket timestream-batchload-error-logs --data_model_file "data_model_sample.json"
python3 batch_load.py --region us-west-2 --create_timestream_resource --database=targetdb --table=myTable --partition_key city --copy_s3_bucket --s3_source_bucket_location timestream-unload-sourcedb-mytable --source_s3_prefix unload/results/ --create_destination_bucket --s3_target_bucket timestream-batchload-targetdb-mytable --destination_s3_prefix dest/ --create_error_logging_bucket --s3_target_error_bucket timestream-batchload-error-logs --data_model_file "data_model_sample.json"
```
**Without S3 Copy**
#### Without S3 Copy
Example to execute a batch load to the target Timestream table *myTable* with partition key *city* in the database *targetdb* with *eu-west-1* region.
Timestream objects are created by this script as per *create_timestream_resource* parameter. Source data are located in the S3 bucket *timestream-unload-sourcedb-mytable* with prefix *unload/results/*.
Error logs are stored into S3 bucket *timestream-batchload-error-logs*. If you need error log buckets to be created specify --create_error_logging_bucket.
```bash
python3.10 batch_load.py --region eu-west-1 --database=targetdb --table=myTable --s3_target_bucket timestream-unload-sourcedb-mytable --destination_s3_prefix unload/results/ --data_model_file "data_model_sample.json" --create_timestream_resource --partition_key city --s3_target_error_bucket timestream-batchload-error-logs
python3 batch_load.py --region eu-west-1 --database=targetdb --table=myTable --s3_target_bucket timestream-unload-sourcedb-mytable --destination_s3_prefix unload/results/ --data_model_file "data_model_sample.json" --create_timestream_resource --partition_key city --s3_target_error_bucket timestream-batchload-error-logs
```
## Usage and Requirements

These are the full steps to execute the script in your AWS Account.

1. Log into your AWS account and select the AWS Region in which your Timestream table is stored

2. Launch [AWS CloudShell](https://console.aws.amazon.com/cloudshell/home) or your local shell (Python 3.10 or newer is **required**)

3. Clone this source code project using [git](https://git-scm.com/) or download it manually
4. Make sure you have latest pip package installed
```bash
python3 -m ensurepip --upgrade
```
5. Install Python [boto3](https://pypi.org/project/boto3/), [backoff](https://pypi.org/project/backoff/) and [tqdm](https://pypi.org/project/tqdm/) packages
```bash
python3 -m pip install boto3
python3 -m pip install backoff
python3 -m pip install tqdm
```
6. Run the unload.py or the batch_load.py as described above.
35 changes: 18 additions & 17 deletions tools/python/copy-table/unload.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from utils.logger_utils import create_logger
from utils.s3_utils import s3Utility

def main(logger, region, database, table, bucket_s3_uri, from_time, end_time, partition, iam_role_bucket_policy):
def main(logger, region, database, table, bucket_s3_uri, from_time, end_time, partition, iam_role_arn):

session = boto3.Session()
if (region is None or len(region) == 0):
Expand All @@ -29,22 +29,22 @@ def main(logger, region, database, table, bucket_s3_uri, from_time, end_time, pa
bucket_s3_uri = s3_utility.create_s3_bucket(bucket_name)

# Create bucket policy for accessing data if IAM Role is provided
if (iam_role_bucket_policy):
if (iam_role_arn):
bucket_name = bucket_s3_uri.split('s3://')[1]
bucket_name = bucket_name.split('/')[0]

bucket_policy = {
'Version': '2012-10-17',
'Statement': [{
'Sid': 'PermissionS3Copy1',
'Sid': 'PermissionS3CopyGetObj',
'Effect': 'Allow',
'Principal': {'AWS': f'{iam_role_bucket_policy}'},
'Principal': {'AWS': f'{iam_role_arn}'},
'Action': ['s3:GetObject'],
'Resource': f'arn:aws:s3:::{bucket_name}/*'
},{
'Sid': 'PermissionS3Copy2',
'Sid': 'PermissionS3CopyListBucket',
'Effect': 'Allow',
'Principal': {'AWS': f'{iam_role_bucket_policy}'},
'Principal': {'AWS': f'{iam_role_arn}'},
'Action': ['s3:ListBucket'],
'Resource': f'arn:aws:s3:::{bucket_name}'
}
Expand All @@ -62,13 +62,12 @@ def build_query(database, table, bucket_s3_uri, from_time, end_time, partition):
unload_query = "UNLOAD("
unload_query += " SELECT *, to_nanoseconds(time) as nanoseconds"
if (partition):
match partition:
case "day":
unload_query += ", DATE_FORMAT(time,'%Y-%m-%d') as partition_date"
case "month":
unload_query += ", DATE_FORMAT(time,'%Y-%m') as partition_date"
case "year":
unload_query += ", DATE_FORMAT(time,'%Y') as partition_date"
if (partition == "day"):
unload_query += ", DATE_FORMAT(time,'%Y-%m-%d') as partition_date"
elif (partition == "month"):
unload_query += ", DATE_FORMAT(time,'%Y-%m') as partition_date"
elif (partition == "year"):
unload_query += ", DATE_FORMAT(time,'%Y') as partition_date"
unload_query += " FROM " + database + "." + table
if (from_time and end_time):
unload_query += " WHERE time >= '" + from_time + "' AND time < '" + end_time + "'"
Expand All @@ -80,7 +79,9 @@ def build_query(database, table, bucket_s3_uri, from_time, end_time, partition):
unload_query += " time asc )"

unload_query += " TO '" + bucket_s3_uri + "'"
unload_query += " WITH (partitioned_by = ARRAY['partition_date'],"
unload_query += " WITH ("
if (partition):
unload_query += " partitioned_by = ARRAY['partition_date'],"
unload_query += " format='csv',"
unload_query += " compression='none',"
unload_query += " max_file_size = '4.9GB',"
Expand Down Expand Up @@ -111,16 +112,16 @@ def run_query(logger, client, query):
parser.add_argument("-s", "--s3_uri", help="S3 Bucket URI to store unload data", required=False)
parser.add_argument("-f", "--from_time", help="Timestamp from which you want to unload data (included)", required=False)
parser.add_argument("-e", "--end_time", help="Timestamp to which you want to unload data (not included)", required=False)
parser.add_argument("-p", "--partition", help="Partition data by 'day', 'month' or 'year'", required=True, choices=['day', 'month', 'year'])
parser.add_argument("-i", "--iam_role_bucket_policy", help="S3 Bucket policy to apply to the S3 Bucket where unload data is stored", required=False)
parser.add_argument("-p", "--partition", help="Partition data by 'day', 'month' or 'year'", required=False, choices=['day', 'month', 'year'])
parser.add_argument("-i", "--iam_role_arn", help="IAM Role ARN used in the S3 Bucket policy that is applied to the S3 Bucket where unload data is stored", required=False)

#assign arguments to args variable
args = parser.parse_args()

#create logger
logger = create_logger("Unload Logger")

main(logger, args.region, args.database, args.table, args.s3_uri, args.from_time, args.end_time, args.partition, args.iam_role_bucket_policy)
main(logger, args.region, args.database, args.table, args.s3_uri, args.from_time, args.end_time, args.partition, args.iam_role_arn)

logger.info("COMPLETED SUCCESSFULLY")

0 comments on commit 98d68a1

Please sign in to comment.