From 291edf80fe2a5cc99d2fe8b55807c5648b26f531 Mon Sep 17 00:00:00 2001 From: Francesco Martini Date: Thu, 28 Mar 2024 23:11:52 +0100 Subject: [PATCH 1/3] support for py 3.9 + partitioning optional + improved readme --- tools/python/copy-table/README.md | 67 ++++++++++++++++--------------- tools/python/copy-table/unload.py | 35 ++++++++-------- 2 files changed, 53 insertions(+), 49 deletions(-) diff --git a/tools/python/copy-table/README.md b/tools/python/copy-table/README.md index a951b40f..ffebf227 100644 --- a/tools/python/copy-table/README.md +++ b/tools/python/copy-table/README.md @@ -2,18 +2,41 @@ ## When to use this automation? -This automation can be used for migrating Timestream for LiveAnalytics table to new table. Automation is divided into two parts #1 for unloading the data from Timestream using different paritioning choices #2 batchloading the data into Timestream, it also covers S3 copy functionality if unload was run on different account or same account with different region.Data modelling changes can be applied as part of batchload. You can use this automation in following use-cases. +This automation can be used for migrating **Timestream for LiveAnalytics** table to new table. Automation is divided into two parts: +1. unloading the data from Timestream using different partitioning choices +2. batchloading the data into Timestream, it also covers S3 copy functionality if unload was run on different account or same account with different region. Data modelling changes can be applied as part of batchload. +You can use this automation in following use-cases. - Migrating Timestream for LiveAnalytics table to different AWS Organization. -- Migrating Timestream for LiveAnalytics table to different region or different account and need data model changes in destination account/region. If data model changes are not required (and accounts belong to same AWS Organization) try to make use of AWS Backups for Timestream https://docs.aws.amazon.com/aws-backup/latest/devguide/timestream-backup.html and https://docs.aws.amazon.com/aws-backup/latest/devguide/timestream-restore.html. -- Migrating Timestream for LiveAnalytics table to new table with customer defined partition key https://docs.aws.amazon.com/timestream/latest/developerguide/customer-defined-partition-keys.html +- Migrating Timestream for LiveAnalytics table to different region or different account and need data model changes in destination account/region. If data model changes are not required (and accounts belong to same AWS Organization) try to make use of AWS Backups for Timestream (ref. [Amazon Timestream backups](https://docs.aws.amazon.com/aws-backup/latest/devguide/timestream-backup.html) and [Restore an Amazon Timestream table](https://docs.aws.amazon.com/aws-backup/latest/devguide/timestream-restore.html). +- Migrating Timestream for LiveAnalytics table to new table with [customer defined partition key](https://docs.aws.amazon.com/timestream/latest/developerguide/customer-defined-partition-keys.html) +## Usage and Requirements + +These are the full steps to execute the script in your AWS Account. + +1. Log into your AWS account and select the AWS Region in which your Timestream table is stored + +2. Launch [AWS CloudShell](https://console.aws.amazon.com/cloudshell/home) or your local shell (Python 3.9 or newer is **required**) + +3. Clone this source code project using [git](https://git-scm.com/) or download it manually + +4. Make sure you have latest pip package installed + ```bash + python3 -m ensurepip --upgrade + ``` +5. Install Python [boto3](https://pypi.org/project/boto3/), [backoff](https://pypi.org/project/backoff/) and [tqdm](https://pypi.org/project/tqdm/) packages + ```bash + python3 -m pip install boto3 backoff tqdm + ``` +6. Run the unload.py or the batch_load.py as described below. + ## Getting started with UNLOAD ### Key considerations/limits and best practices -- **Recommendation is to have your UNLOAD not exceed 80 GB**, consider to split the process into multiple UNLOADS leveraging on the time start/end parameters (e.g. if you have 1 TB, run 13 UNLOADS) to avoid any interruptions. -- **Queries containing UNLOAD statement can export at most 100 partitions per query.**, hence consider to split the process into multiple UNLOADS leveraging on the time start/end parameters. +- **Recommendation is to have your UNLOAD not exceed 80 GB**, consider to split the process into multiple UNLOADS leveraging scripts start/end time parameters (e.g. if you have 1 TB, run 13 UNLOADS) to avoid any interruptions. +- **Queries containing UNLOAD statement can export at most 100 partitions per query.**, hence consider to split the process into multiple UNLOADS leveraging scripts start/end and partition parameters - **Maximum file size in a batch load task cannot exceed 5 GB**.Unload files as part of this automation will not exceed that size. - **Concurrency for queries using the UNLOAD statement is 1 query per second (QPS)**. Exceeding the query rate will result in throttling. - **Queries containing UNLOAD statement time out after 60 minutes.** @@ -30,14 +53,14 @@ Check the following guide to learn more: [Limits for UNLOAD from Timestream for - **from_time** [OPTIONAL]: Timestamp (extreme included) from which you want to select data to unload (e.g.: *2024-02-26 17:24:38.270000000*) - **end_time** [OPTIONAL]: Timestamp (extreme excluded) to which you want to select data to unload (e.g.: *2024-03-15 19:26:31.304000000*) - **partition** [OPTIONAL]: Time partition you want to use (possible values: *day, month, year*) -- **iam_role_bucket_policy** [OPTIONAL]: {Applies for cross account migrations} Grants destination IAM Role access to S3 Bucket (e.g.: *arn:aws:iam::123456789123:role/BatchLoadRole*) +- **iam_role_arn_bucket_policy** [OPTIONAL]: {Applies for cross account migrations} Grants destination IAM Role access to S3 Bucket (e.g.: *arn:aws:iam::123456789123:role/BatchLoadRole*) ### Examples Example to unload the Timestream table *myTable* in the database *sourcedb* to the folder *unload* in the *timestream-unload-sourcedb-mytable* S3 bucket. -Also, it applies an S3 bucket policy to allow the IAM Role *BatchLoadRole* of account *123456789123* to allow the copy. Does day level partitions. +Also, it applies an S3 bucket policy to allow the IAM Role *BatchLoadRole* of account *123456789123* to allow the copy. It does day level partitions. ```bash -python3.10 unload.py --region eu-west-1 --s3_uri s3://timestream-unload-sourcedb-mytable/unload --database sourcedb --table myTable --iam_role_bucket_policy arn:aws:iam::123456789123:role/BatchLoadRole --partition day +python3 unload.py --region eu-west-1 --s3_uri s3://timestream-unload-sourcedb-mytable/unload --database sourcedb --table myTable --iam_role_arn_bucket_policy arn:aws:iam::123456789123:role/BatchLoadRole --partition day ``` ## Getting started with BATCH LOAD @@ -76,7 +99,7 @@ python3.10 unload.py --region eu-west-1 --s3_uri s3://timestream-unload-sourcedb ### Examples -**With S3 Copy** +#### With S3 Copy Example to execute a batch load to the target Timestream table *myTable* with partition key *city* in the database *targetdb* with *us-west-2* region. Timestream objects are created by this script as per *create_timestream_resource* parameter. Source data are located in the S3 bucket *timestream-unload-sourcedb-mytable* with prefix *unload/results/*. @@ -85,36 +108,16 @@ Target bucket and error bucket names are given by *s3_target_bucket* and *s3_tar Destination prefix will be created with prefix dest/ given by *destination_s3_prefix*. Desired data model file is chosen as *data_model_sample.json* in the current location of the script. ```bash -python3.10 batch_load.py --region us-west-2 --create_timestream_resource --database=targetdb --table=myTable --partition_key city --copy_s3_bucket --s3_source_bucket_location timestream-unload-sourcedb-mytable --source_s3_prefix unload/results/ --create_destination_bucket --s3_target_bucket timestream-batchload-targetdb-mytable --destination_s3_prefix dest/ --create_error_logging_bucket --s3_target_error_bucket timestream-batchload-error-logs --data_model_file "data_model_sample.json" +python3 batch_load.py --region us-west-2 --create_timestream_resource --database=targetdb --table=myTable --partition_key city --copy_s3_bucket --s3_source_bucket_location timestream-unload-sourcedb-mytable --source_s3_prefix unload/results/ --create_destination_bucket --s3_target_bucket timestream-batchload-targetdb-mytable --destination_s3_prefix dest/ --create_error_logging_bucket --s3_target_error_bucket timestream-batchload-error-logs --data_model_file "data_model_sample.json" ``` -**Without S3 Copy** +#### Without S3 Copy Example to execute a batch load to the target Timestream table *myTable* with partition key *city* in the database *targetdb* with *eu-west-1* region. Timestream objects are created by this script as per *create_timestream_resource* parameter. Source data are located in the S3 bucket *timestream-unload-sourcedb-mytable* with prefix *unload/results/*. Error logs are stored into S3 bucket *timestream-batchload-error-logs*. If you need error log buckets to be created specify --create_error_logging_bucket. ```bash -python3.10 batch_load.py --region eu-west-1 --database=targetdb --table=myTable --s3_target_bucket timestream-unload-sourcedb-mytable --destination_s3_prefix unload/results/ --data_model_file "data_model_sample.json" --create_timestream_resource --partition_key city --s3_target_error_bucket timestream-batchload-error-logs +python3 batch_load.py --region eu-west-1 --database=targetdb --table=myTable --s3_target_bucket timestream-unload-sourcedb-mytable --destination_s3_prefix unload/results/ --data_model_file "data_model_sample.json" --create_timestream_resource --partition_key city --s3_target_error_bucket timestream-batchload-error-logs ``` -## Usage and Requirements - -These are the full steps to execute the script in your AWS Account. - -1. Log into your AWS account and select the AWS Region in which your Timestream table is stored - -2. Launch [AWS CloudShell](https://console.aws.amazon.com/cloudshell/home) or your local shell (Python 3.10 or newer is **required**) - -3. Clone this source code project using [git](https://git-scm.com/) or download it manually -4. Make sure you have latest pip package installed - ```bash - python3 -m ensurepip --upgrade - ``` -5. Install Python [boto3](https://pypi.org/project/boto3/), [backoff](https://pypi.org/project/backoff/) and [tqdm](https://pypi.org/project/tqdm/) packages - ```bash - python3 -m pip install boto3 - python3 -m pip install backoff - python3 -m pip install tqdm - ``` -6. Run the unload.py or the batch_load.py as described above. diff --git a/tools/python/copy-table/unload.py b/tools/python/copy-table/unload.py index 6721b964..aeca0efa 100644 --- a/tools/python/copy-table/unload.py +++ b/tools/python/copy-table/unload.py @@ -7,7 +7,7 @@ from utils.logger_utils import create_logger from utils.s3_utils import s3Utility -def main(logger, region, database, table, bucket_s3_uri, from_time, end_time, partition, iam_role_bucket_policy): +def main(logger, region, database, table, bucket_s3_uri, from_time, end_time, partition, iam_role_arn_bucket_policy): session = boto3.Session() if (region is None or len(region) == 0): @@ -29,22 +29,22 @@ def main(logger, region, database, table, bucket_s3_uri, from_time, end_time, pa bucket_s3_uri = s3_utility.create_s3_bucket(bucket_name) # Create bucket policy for accessing data if IAM Role is provided - if (iam_role_bucket_policy): + if (iam_role_arn_bucket_policy): bucket_name = bucket_s3_uri.split('s3://')[1] bucket_name = bucket_name.split('/')[0] bucket_policy = { 'Version': '2012-10-17', 'Statement': [{ - 'Sid': 'PermissionS3Copy1', + 'Sid': 'PermissionS3CopyGetObj', 'Effect': 'Allow', - 'Principal': {'AWS': f'{iam_role_bucket_policy}'}, + 'Principal': {'AWS': f'{iam_role_arn_bucket_policy}'}, 'Action': ['s3:GetObject'], 'Resource': f'arn:aws:s3:::{bucket_name}/*' },{ - 'Sid': 'PermissionS3Copy2', + 'Sid': 'PermissionS3CopyListBucket', 'Effect': 'Allow', - 'Principal': {'AWS': f'{iam_role_bucket_policy}'}, + 'Principal': {'AWS': f'{iam_role_arn_bucket_policy}'}, 'Action': ['s3:ListBucket'], 'Resource': f'arn:aws:s3:::{bucket_name}' } @@ -62,13 +62,12 @@ def build_query(database, table, bucket_s3_uri, from_time, end_time, partition): unload_query = "UNLOAD(" unload_query += " SELECT *, to_nanoseconds(time) as nanoseconds" if (partition): - match partition: - case "day": - unload_query += ", DATE_FORMAT(time,'%Y-%m-%d') as partition_date" - case "month": - unload_query += ", DATE_FORMAT(time,'%Y-%m') as partition_date" - case "year": - unload_query += ", DATE_FORMAT(time,'%Y') as partition_date" + if (partition == "day"): + unload_query += ", DATE_FORMAT(time,'%Y-%m-%d') as partition_date" + elif (partition == "month"): + unload_query += ", DATE_FORMAT(time,'%Y-%m') as partition_date" + elif (partition == "year"): + unload_query += ", DATE_FORMAT(time,'%Y') as partition_date" unload_query += " FROM " + database + "." + table if (from_time and end_time): unload_query += " WHERE time >= '" + from_time + "' AND time < '" + end_time + "'" @@ -80,7 +79,9 @@ def build_query(database, table, bucket_s3_uri, from_time, end_time, partition): unload_query += " time asc )" unload_query += " TO '" + bucket_s3_uri + "'" - unload_query += " WITH (partitioned_by = ARRAY['partition_date']," + unload_query += " WITH (" + if (partition): + unload_query += " partitioned_by = ARRAY['partition_date']," unload_query += " format='csv'," unload_query += " compression='none'," unload_query += " max_file_size = '4.9GB'," @@ -111,8 +112,8 @@ def run_query(logger, client, query): parser.add_argument("-s", "--s3_uri", help="S3 Bucket URI to store unload data", required=False) parser.add_argument("-f", "--from_time", help="Timestamp from which you want to unload data (included)", required=False) parser.add_argument("-e", "--end_time", help="Timestamp to which you want to unload data (not included)", required=False) - parser.add_argument("-p", "--partition", help="Partition data by 'day', 'month' or 'year'", required=True, choices=['day', 'month', 'year']) - parser.add_argument("-i", "--iam_role_bucket_policy", help="S3 Bucket policy to apply to the S3 Bucket where unload data is stored", required=False) + parser.add_argument("-p", "--partition", help="Partition data by 'day', 'month' or 'year'", required=False, choices=['day', 'month', 'year']) + parser.add_argument("-i", "--iam_role_arn_bucket_policy", help="IAM Role ARN used in the S3 Bucket policy that is applied to the S3 Bucket where unload data is stored", required=False) #assign arguments to args variable args = parser.parse_args() @@ -120,7 +121,7 @@ def run_query(logger, client, query): #create logger logger = create_logger("Unload Logger") - main(logger, args.region, args.database, args.table, args.s3_uri, args.from_time, args.end_time, args.partition, args.iam_role_bucket_policy) + main(logger, args.region, args.database, args.table, args.s3_uri, args.from_time, args.end_time, args.partition, args.iam_role_arn_bucket_policy) logger.info("COMPLETED SUCCESSFULLY") From 7f98795a74cc3ba335e75302a928bce3799f70dd Mon Sep 17 00:00:00 2001 From: Francesco Martini Date: Thu, 28 Mar 2024 23:26:37 +0100 Subject: [PATCH 2/3] improved readme --- tools/python/copy-table/README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tools/python/copy-table/README.md b/tools/python/copy-table/README.md index ffebf227..893fdd0f 100644 --- a/tools/python/copy-table/README.md +++ b/tools/python/copy-table/README.md @@ -6,9 +6,9 @@ This automation can be used for migrating **Timestream for LiveAnalytics** table 1. unloading the data from Timestream using different partitioning choices 2. batchloading the data into Timestream, it also covers S3 copy functionality if unload was run on different account or same account with different region. Data modelling changes can be applied as part of batchload. -You can use this automation in following use-cases. +You can use this automation in following use-cases: - Migrating Timestream for LiveAnalytics table to different AWS Organization. -- Migrating Timestream for LiveAnalytics table to different region or different account and need data model changes in destination account/region. If data model changes are not required (and accounts belong to same AWS Organization) try to make use of AWS Backups for Timestream (ref. [Amazon Timestream backups](https://docs.aws.amazon.com/aws-backup/latest/devguide/timestream-backup.html) and [Restore an Amazon Timestream table](https://docs.aws.amazon.com/aws-backup/latest/devguide/timestream-restore.html). +- Migrating Timestream for LiveAnalytics table to different region or different account and need data model changes in destination account/region. If data model changes are not required (and accounts belong to same AWS Organization) try to make use of AWS Backups for Timestream (ref. [Amazon Timestream backups](https://docs.aws.amazon.com/aws-backup/latest/devguide/timestream-backup.html) and [Restore an Amazon Timestream table](https://docs.aws.amazon.com/aws-backup/latest/devguide/timestream-restore.html)). - Migrating Timestream for LiveAnalytics table to new table with [customer defined partition key](https://docs.aws.amazon.com/timestream/latest/developerguide/customer-defined-partition-keys.html) From 770f9f204a12cd2734efe043039c6c521e0e03bf Mon Sep 17 00:00:00 2001 From: Francesco Martini Date: Thu, 28 Mar 2024 23:37:50 +0100 Subject: [PATCH 3/3] improved docs --- tools/python/copy-table/README.md | 4 ++-- tools/python/copy-table/unload.py | 12 ++++++------ 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/tools/python/copy-table/README.md b/tools/python/copy-table/README.md index 893fdd0f..c1b44f57 100644 --- a/tools/python/copy-table/README.md +++ b/tools/python/copy-table/README.md @@ -53,14 +53,14 @@ Check the following guide to learn more: [Limits for UNLOAD from Timestream for - **from_time** [OPTIONAL]: Timestamp (extreme included) from which you want to select data to unload (e.g.: *2024-02-26 17:24:38.270000000*) - **end_time** [OPTIONAL]: Timestamp (extreme excluded) to which you want to select data to unload (e.g.: *2024-03-15 19:26:31.304000000*) - **partition** [OPTIONAL]: Time partition you want to use (possible values: *day, month, year*) -- **iam_role_arn_bucket_policy** [OPTIONAL]: {Applies for cross account migrations} Grants destination IAM Role access to S3 Bucket (e.g.: *arn:aws:iam::123456789123:role/BatchLoadRole*) +- **iam_role_arn** [OPTIONAL]: {Applies for cross account migrations} Grants destination IAM Role access to S3 Bucket (e.g.: *arn:aws:iam::123456789123:role/BatchLoadRole*) ### Examples Example to unload the Timestream table *myTable* in the database *sourcedb* to the folder *unload* in the *timestream-unload-sourcedb-mytable* S3 bucket. Also, it applies an S3 bucket policy to allow the IAM Role *BatchLoadRole* of account *123456789123* to allow the copy. It does day level partitions. ```bash -python3 unload.py --region eu-west-1 --s3_uri s3://timestream-unload-sourcedb-mytable/unload --database sourcedb --table myTable --iam_role_arn_bucket_policy arn:aws:iam::123456789123:role/BatchLoadRole --partition day +python3 unload.py --region eu-west-1 --s3_uri s3://timestream-unload-sourcedb-mytable/unload --database sourcedb --table myTable --iam_role_arn arn:aws:iam::123456789123:role/BatchLoadRole --partition day ``` ## Getting started with BATCH LOAD diff --git a/tools/python/copy-table/unload.py b/tools/python/copy-table/unload.py index aeca0efa..2996a269 100644 --- a/tools/python/copy-table/unload.py +++ b/tools/python/copy-table/unload.py @@ -7,7 +7,7 @@ from utils.logger_utils import create_logger from utils.s3_utils import s3Utility -def main(logger, region, database, table, bucket_s3_uri, from_time, end_time, partition, iam_role_arn_bucket_policy): +def main(logger, region, database, table, bucket_s3_uri, from_time, end_time, partition, iam_role_arn): session = boto3.Session() if (region is None or len(region) == 0): @@ -29,7 +29,7 @@ def main(logger, region, database, table, bucket_s3_uri, from_time, end_time, pa bucket_s3_uri = s3_utility.create_s3_bucket(bucket_name) # Create bucket policy for accessing data if IAM Role is provided - if (iam_role_arn_bucket_policy): + if (iam_role_arn): bucket_name = bucket_s3_uri.split('s3://')[1] bucket_name = bucket_name.split('/')[0] @@ -38,13 +38,13 @@ def main(logger, region, database, table, bucket_s3_uri, from_time, end_time, pa 'Statement': [{ 'Sid': 'PermissionS3CopyGetObj', 'Effect': 'Allow', - 'Principal': {'AWS': f'{iam_role_arn_bucket_policy}'}, + 'Principal': {'AWS': f'{iam_role_arn}'}, 'Action': ['s3:GetObject'], 'Resource': f'arn:aws:s3:::{bucket_name}/*' },{ 'Sid': 'PermissionS3CopyListBucket', 'Effect': 'Allow', - 'Principal': {'AWS': f'{iam_role_arn_bucket_policy}'}, + 'Principal': {'AWS': f'{iam_role_arn}'}, 'Action': ['s3:ListBucket'], 'Resource': f'arn:aws:s3:::{bucket_name}' } @@ -113,7 +113,7 @@ def run_query(logger, client, query): parser.add_argument("-f", "--from_time", help="Timestamp from which you want to unload data (included)", required=False) parser.add_argument("-e", "--end_time", help="Timestamp to which you want to unload data (not included)", required=False) parser.add_argument("-p", "--partition", help="Partition data by 'day', 'month' or 'year'", required=False, choices=['day', 'month', 'year']) - parser.add_argument("-i", "--iam_role_arn_bucket_policy", help="IAM Role ARN used in the S3 Bucket policy that is applied to the S3 Bucket where unload data is stored", required=False) + parser.add_argument("-i", "--iam_role_arn", help="IAM Role ARN used in the S3 Bucket policy that is applied to the S3 Bucket where unload data is stored", required=False) #assign arguments to args variable args = parser.parse_args() @@ -121,7 +121,7 @@ def run_query(logger, client, query): #create logger logger = create_logger("Unload Logger") - main(logger, args.region, args.database, args.table, args.s3_uri, args.from_time, args.end_time, args.partition, args.iam_role_arn_bucket_policy) + main(logger, args.region, args.database, args.table, args.s3_uri, args.from_time, args.end_time, args.partition, args.iam_role_arn) logger.info("COMPLETED SUCCESSFULLY")