Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ocn 027 rw0 #292

Open
wants to merge 23 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions ocn_027a_rw0_nitrogen_plumes/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
## Wastewater Plumes in Coastal Areas Dataset Pre-processing
This file describes the data pre-processing that was done to the [Global Inputs and Impacts from of Human Sewage in Coastal Ecosystems](https://journals.plos.org/plosone/article?id=10.1371/journal.pone.0258898) for [display on Resource Watch](https://resourcewatch.org/data/explore/11804f04-d9c7-47b9-8d27-27ce6ed6c042).
clairehemmerly marked this conversation as resolved.
Show resolved Hide resolved

This dataset is provided as a series of GeoTIFF files from the data provider to the Resource Watch data team.

To display these data on Resource Watch, each GeoTIFF was translated into the appropriate projection for web display and uploaded to Google Earth Engine.

Please see the [Python script](https://github.com/resource-watch/data-pre-processing/blob/master/ocn_027a_rw0_nitrogen_plumes/ocn_027a_rw0_nitrogen_plumes_processing.py) for more details on this processing.

You can view the processed Wastewater Plumes in Coastal Areas dataset [on Resource Watch](https://resourcewatch.org/data/explore/11804f04-d9c7-47b9-8d27-27ce6ed6c042).

You can also download the original dataset [directly through Resource Watch](http://wri-public-data.s3.amazonaws.com/resourcewatch/raster/ocn_027_rw0_nitrogen_plumes.zip), or [from the source website](https://knb.ecoinformatics.org/view/urn%3Auuid%3Ac7bdc77e-6c7d-46b6-8bfc-a66491119d07).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this if a raster file we don't link to our s3 version. You can just link to the source.
Side note, do you know why the zip file on our s3 is so large? My browser says it is 11GB but the source file looks to be 250MB.

Copy link
Collaborator Author

@clairehemmerly clairehemmerly Jul 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I compressed the zipfile and uploaded to the correct folder. I deleted the one that was 11G


###### Note: This dataset processing was done by Claire Hemmerly, and QC'd by [Chris Rowe](https://www.wri.org/profile/chris-rowe).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can add any link here if you want.

clairehemmerly marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
import os
import sys
utils_path = os.path.join(os.path.abspath(os.getenv('PROCESSING_DIR')),'utils')
if utils_path not in sys.path:
sys.path.append(utils_path)
import util_files
import util_cloud
import zipfile
from zipfile import ZipFile
import ee
from google.cloud import storage
import logging
#import urllib
from collections import OrderedDict
import shlex
import subprocess

# Set up logging
# Get the top-level logger object
logger = logging.getLogger()
for handler in logger.handlers: logger.removeHandler(handler)
logger.setLevel(logging.INFO)
# make it print to the console.
console = logging.StreamHandler()
logger.addHandler(console)
logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')

# name of asset on GEE where you want to upload data
# this should be an asset name that is not currently in use
dataset_name = 'ocn_027a_rw0_nitrogen_plumes'

# create a new sub-directory within your specified dir called 'data'
# within this directory, create files to store raw and processed data
data_dir = util_files.prep_dirs(dataset_name)

# create a dictionary to store information about the dataset
data_dict = OrderedDict()

data_dict= {
'url': 'https://knb.ecoinformatics.org/knb/d1/mn/v2/object/urn%3Auuid%3Aefef18ef-416e-4d4d-9190-f17485c02c15',
'unzipped folder': 'Global_N_Coastal_Plumes_tifs',
'tifs': ['global_effluent_2015_open_N.tif', 'global_effluent_2015_septic_N.tif', 'global_effluent_2015_treated_N.tif', 'global_effluent_2015_tot_N.tif'],
'raw_data_file':[],
'processed_data_file': [],
'sds': [
'classification',
],
'pyramiding_policy': 'MEAN',
'band_ids': ['b1']
}

'''
Download data and save to your data directory - this may take a few minutes
'''
logger.info('Downloading raw data')

#download the data from the source
raw_data_file = os.path.join(data_dir, 'Global_N_Coastal_Plumes_tifs.zip')
urllib.request.urlretrieve(data_dict['url'], raw_data_file)

# unzip source data
raw_data_file_unzipped = raw_data_file.split('.')[0]
zip_ref = ZipFile(raw_data_file, 'r')
zip_ref.extractall(raw_data_file_unzipped)
zip_ref.close()

# set name of raw data files
for tif in data_dict['tifs']:
data_dict['raw_data_file'].append(os.path.join(data_dir, data_dict['unzipped folder'], tif))


'''
Process data
'''
# Project and compress each tif
for i in range(len(data_dict['tifs'])):
# set a new file name to represent processed data
plume_type = ['open', 'septic', 'treated', 'total']
data_dict['processed_data_file'].append(os.path.join(data_dir,dataset_name + '_' + plume_type[i] +'.tif'))

logger.info('Processing data for ' + data_dict['processed_data_file'][i])

raw_data_path = os.path.join(os.getenv('PROCESSING_DIR'), dataset_name, data_dict['raw_data_file'][i])
logger.info(raw_data_path)

# project the data into WGS84 (espg 4326) using the command line terminal
cmd = 'gdalwarp {} {}'
# format to command line and run
posix_cmd = shlex.split(cmd.format(raw_data_path, data_dict['processed_data_file'][i]), posix=True)
logger.info(posix_cmd)
#completed_process= subprocess.check_output(posix_cmd)
completed_process= subprocess.call(posix_cmd)
#logging.debug(str(completed_process))

'''
Upload processed data to Google Earth Engine
'''

# set up Google Cloud Storage project and bucket objects
gcsClient = storage.Client(os.environ.get("CLOUDSDK_CORE_PROJECT"))
gcsBucket = gcsClient.bucket(os.environ.get("GEE_STAGING_BUCKET"))

# initialize ee and eeUtil modules for uploading to Google Earth Engine
auth = ee.ServiceAccountCredentials(os.getenv('GEE_SERVICE_ACCOUNT'), os.getenv('GOOGLE_APPLICATION_CREDENTIALS'))
ee.Initialize(auth)

# set pyramiding policy for GEE upload
pyramiding_policy = data_dict['pyramiding_policy'] #check

# Create an image collection where we will put the processed data files in GEE
image_collection = f'projects/resource-watch-gee/{dataset_name}'
ee.data.createAsset({'type': 'ImageCollection'}, image_collection)

# set image collection's privacy to public
acl = {"all_users_can_read": True}
ee.data.setAssetAcl(image_collection, acl)
print('Privacy set to public.')

# list the bands in each image
band_ids = data_dict['band_ids']

task_id = []

# Upload processed data files to GEE

# if upload is timing out, uncomment the following lines
# storage.blob._DEFAULT_CHUNKSIZE = 10 * 1024* 1024 # 10 MB
# storage.blob._MAX_MULTIPART_SIZE = 10 * 1024* 1024 # 10 MB

#loop though the processed data files to upload to Google Cloud Storage and Google Earth Engine

for i in range(len(data_dict['tifs'])):
logger.info('Uploading '+ data_dict['processed_data_file'][i]+' to Google Cloud Storage.')
# upload files to Google Cloud Storage
gcs_uri= util_cloud.gcs_upload(data_dict['processed_data_file'][i], dataset_name, gcs_bucket=gcsBucket)

logger.info('Uploading '+ data_dict['processed_data_file'][i]+ ' Google Earth Engine.')
# generate an asset name for the current file by using the filename (minus the file type extension)
file_name=data_dict['processed_data_file'][i].split('.')[0].split('/')[1]
asset_name = f'projects/resource-watch-gee/{dataset_name}/{file_name}'

# create the band manifest for this asset
#tileset_id= data_dict['processed_data_file'][i].split('.')[0]
mf_bands = [{'id': band_id, 'tileset_band_index': band_ids.index(band_id), 'tileset_id': file_name,'pyramidingPolicy': pyramiding_policy} for band_id in band_ids]

# create complete manifest for asset upload
manifest = util_cloud.gee_manifest_complete(asset_name, gcs_uri[0], mf_bands)

# upload the file from Google Cloud Storage to Google Earth Engine
task = util_cloud.gee_ingest(manifest)
print(asset_name + ' uploaded to GEE')
task_id.append(task)

# remove files from Google Cloud Storage
util_cloud.gcs_remove(gcs_uri[0], gcs_bucket=gcsBucket)
logger.info('Files deleted from Google Cloud Storage.')

'''
Upload original data and processed data to Amazon S3 storage
'''
# initialize AWS variables
aws_bucket = 'wri-public-data'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should actually be aws_bucket = 'wri-projects' for raster datasets.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

s3_prefix = 'resourcewatch/raster/'

# Copy the raw data into a zipped file to upload to S3

print('Uploading original data to S3.')
# Copy the raw data into a zipped file to upload to S3
raw_data_dir = os.path.join(data_dir, dataset_name+'.zip')
with ZipFile(raw_data_dir,'w') as zip:
raw_data_files = data_dict['raw_data_file']
for raw_data_file in raw_data_files:
zip.write(raw_data_file, os.path.basename(raw_data_file))

# Upload raw data file to S3
uploaded = util_cloud.aws_upload(raw_data_dir, aws_bucket, s3_prefix + os.path.basename(raw_data_dir))

logger.info('Uploading processed data to S3.')
# Copy the processed data into a zipped file to upload to S3
processed_data_dir = os.path.join(data_dir, dataset_name+'_edit.zip')
with ZipFile(processed_data_dir,'w') as zip:
processed_data_files = data_dict['processed_data_file']
for processed_data_file in processed_data_files:
zip.write(processed_data_file, os.path.basename(processed_data_file),compress_type= zipfile.ZIP_DEFLATED)

# Upload processed data file to S3
uploaded = util_cloud.aws_upload(processed_data_dir, aws_bucket, s3_prefix + os.path.basename(processed_data_dir))