forked from GeoSoftII2020-21/TestRepo
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathe2e_sst.py
60 lines (53 loc) · 1.37 KB
/
e2e_sst.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
#@author Judith Becka, https://github.com/a2beckj
#@author Jonas Raabe, https://github.com/jona159
import requests
import json
import time
import sys
import re
import urllib.request
import os
import xarray as xr
import netCDF4
import scipy.io.netcdf
import pytest
# Test Data
testjob = {
"title": "Example Title",
"description": "Example Description",
"process": {
"process_graph": {
"loadcollection1": {
"process_id": "load_collection",
"arguments": {
"timeframe" : ["01-12-1981 00:00:00","30-12-1981 00:00:00","%d-%m-%Y %H:%M:%S"],
"DataType": "SST"
}
},
"SST": {
"process_id": "mean_sst",
"arguments": {
"data":{
"from_node": "loadcollection1"
},
"timeframe":["1981-12-01","1981-12-17"],
"bbox":[-999,-999,-999,-999]
}
},
"save":{
"process_id": "save_result",
"arguments":{
"SaveData":{
"from_node":"SST"
},
"Format": "netcdf"
}
}
}
}
}
# This function executes a POST HTTP Request to the job endpoint of the frontend microservice
def e2e_sst():
requests.post("http://localhost:80/api/v1/jobs", json=testjob, headers={"Content-Type": "application/json"})
time.sleep(300)
e2e_sst()