Skip to content

Commit

Permalink
Merge pull request #260 from tigergraph/cjin_add_data_loading
Browse files Browse the repository at this point in the history
Update pyTigerGraphLoading.py to add support on direct data loading
  • Loading branch information
parkererickson-tg authored Oct 30, 2024
2 parents a477e31 + 7cb8aed commit dfc312d
Show file tree
Hide file tree
Showing 3 changed files with 219 additions and 24 deletions.
14 changes: 3 additions & 11 deletions pyTigerGraph/common/loading.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,14 @@

logger = logging.getLogger(__name__)

def _prep_run_loading_job_with_file(filePath, jobName, fileTag, sep, eol):
def _prep_run_loading_job_with_file(filePath):
'''read file contents for runLoadingJobWithFile()'''
try:
data = open(filePath, 'rb').read()
params = {
"tag": jobName,
"filename": fileTag,
}
if sep is not None:
params["sep"] = sep
if eol is not None:
params["eol"] = eol
return data, params
return data
except OSError as ose:
logger.error(ose.strerror)
logger.info("exit: runLoadingJobWithFile")

return None, None
return None
# TODO Should throw exception instead?
115 changes: 108 additions & 7 deletions pyTigerGraph/pyTigerGraphLoading.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,67 @@
import logging
import warnings

from typing import Union
from typing import TYPE_CHECKING, Union
if TYPE_CHECKING:
import pandas as pd

from pyTigerGraph.common.loading import _prep_run_loading_job_with_file

from pyTigerGraph.pyTigerGraphBase import pyTigerGraphBase

logger = logging.getLogger(__name__)


class pyTigerGraphLoading(pyTigerGraphBase):

def runLoadingJobWithDataFrame(self, df: 'pd.DataFrame', fileTag: str, jobName: str, sep: str = None,
eol: str = None, timeout: int = 16000, sizeLimit: int = 128000000, columns: list = None) -> Union[dict, None]:
"""Execute a loading job with the given pandas DataFrame with optional column list.
The data string will be posted to the TigerGraph server and the value of the appropriate
FILENAME definition will be updated to point to the data received.
NOTE: The argument `USING HEADER="true"` in the GSQL loading job may not be enough to
load the file correctly. Remove the header from the data file before using this function.
Args:
df:
The pandas DateFrame data structure to be loaded.
fileTag:
The name of file variable in the loading job (DEFINE FILENAME <fileTag>).
jobName:
The name of the loading job.
sep:
Data value separator. If your data is JSON, you do not need to specify this
parameter. The default separator is a comma `,`.
eol:
End-of-line character. Only one or two characters are allowed, except for the
special case `\\r\\n`. The default value is `\\n`
timeout:
Timeout in seconds. If set to `0`, use the system-wide endpoint timeout setting.
sizeLimit:
Maximum size for input file in bytes.
columns:
The ordered pandas DataFrame columns to be uploaded.
Endpoint:
- `POST /ddl/{graph_name}`
See xref:tigergraph-server:API:built-in-endpoints.adoc#_run_a_loading_job[Run a loading job]
"""
logger.info("entry: runLoadingJobWithDataFrame")
if logger.level == logging.DEBUG:
logger.debug("params: " + self._locals(locals()))

if columns is None:
data = df.to_csv(sep = sep, header=False)
else:
data = df.to_csv(columns = columns, sep = sep, header=False)

res = self.runLoadingJobWithData(data, fileTag, jobName, sep, eol, timeout, sizeLimit)

logger.info("exit: runLoadingJobWithDataFrame")

return res

def runLoadingJobWithFile(self, filePath: str, fileTag: str, jobName: str, sep: str = None,
eol: str = None, timeout: int = 16000, sizeLimit: int = 128000000) -> Union[dict, None]:
"""Execute a loading job with the referenced file.
Expand Down Expand Up @@ -53,19 +103,70 @@ def runLoadingJobWithFile(self, filePath: str, fileTag: str, jobName: str, sep:
if logger.level == logging.DEBUG:
logger.debug("params: " + self._locals(locals()))

data, params = _prep_run_loading_job_with_file(
filePath, jobName, fileTag, sep, eol)
data = _prep_run_loading_job_with_file(filePath)
res = self.runLoadingJobWithData(data, fileTag, jobName, sep, eol, timeout, sizeLimit)

if not data and not params:
# failed to read file
logger.info("exit: runLoadingJobWithFile")

return res

def runLoadingJobWithData(self, data: str, fileTag: str, jobName: str, sep: str = None,
eol: str = None, timeout: int = 16000, sizeLimit: int = 128000000) -> Union[dict, None]:
"""Execute a loading job with the given data string.
The data string will be posted to the TigerGraph server and the value of the appropriate
FILENAME definition will be updated to point to the data received.
NOTE: The argument `USING HEADER="true"` in the GSQL loading job may not be enough to
load the file correctly. Remove the header from the data file before using this function.
Args:
data:
The data string to be loaded.
fileTag:
The name of file variable in the loading job (DEFINE FILENAME <fileTag>).
jobName:
The name of the loading job.
sep:
Data value separator. If your data is JSON, you do not need to specify this
parameter. The default separator is a comma `,`.
eol:
End-of-line character. Only one or two characters are allowed, except for the
special case `\\r\\n`. The default value is `\\n`
timeout:
Timeout in seconds. If set to `0`, use the system-wide endpoint timeout setting.
sizeLimit:
Maximum size for input file in bytes.
Endpoint:
- `POST /ddl/{graph_name}`
See xref:tigergraph-server:API:built-in-endpoints.adoc#_run_a_loading_job[Run a loading job]
"""
logger.info("entry: runLoadingJobWithData")
if logger.level == logging.DEBUG:
logger.debug("params: " + self._locals(locals()))

if not data or not jobName or not fileTag:
# invalid inputs
logger.error("Invalid data or params")
logger.info("exit: runLoadingJobWithData")
return None

params = {
"tag": jobName,
"filename": fileTag,
}
if sep is not None:
params["sep"] = sep
if eol is not None:
params["eol"] = eol

res = self._req("POST", self.restppUrl + "/ddl/" + self.graphname, params=params, data=data,
headers={"RESPONSE-LIMIT": str(sizeLimit), "GSQL-TIMEOUT": str(timeout)})

if logger.level == logging.DEBUG:
logger.debug("return: " + str(res))
logger.info("exit: runLoadingJobWithFile")
logger.info("exit: runLoadingJobWithData")

return res

Expand Down
114 changes: 108 additions & 6 deletions pyTigerGraph/pytgasync/pyTigerGraphLoading.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
import logging
import warnings

from typing import Union
from typing import TYPE_CHECKING, Union
if TYPE_CHECKING:
import pandas as pd

from pyTigerGraph.common.loading import _prep_run_loading_job_with_file
from pyTigerGraph.pytgasync.pyTigerGraphBase import AsyncPyTigerGraphBase
Expand All @@ -16,6 +18,55 @@

class AsyncPyTigerGraphLoading(AsyncPyTigerGraphBase):

async def runLoadingJobWithDataFrame(self, df: 'pd.DataFrame', fileTag: str, jobName: str, sep: str = None,
eol: str = None, timeout: int = 16000, sizeLimit: int = 128000000, columns: list = None) -> Union[dict, None]:
"""Execute a loading job with the given pandas DataFrame with optional column list.
The data string will be posted to the TigerGraph server and the value of the appropriate
FILENAME definition will be updated to point to the data received.
NOTE: The argument `USING HEADER="true"` in the GSQL loading job may not be enough to
load the file correctly. Remove the header from the data file before using this function.
Args:
df:
The pandas DateFrame data structure to be loaded.
fileTag:
The name of file variable in the loading job (DEFINE FILENAME <fileTag>).
jobName:
The name of the loading job.
sep:
Data value separator. If your data is JSON, you do not need to specify this
parameter. The default separator is a comma `,`.
eol:
End-of-line character. Only one or two characters are allowed, except for the
special case `\\r\\n`. The default value is `\\n`
timeout:
Timeout in seconds. If set to `0`, use the system-wide endpoint timeout setting.
sizeLimit:
Maximum size for input file in bytes.
columns:
The ordered pandas DataFrame columns to be uploaded.
Endpoint:
- `POST /ddl/{graph_name}`
See xref:tigergraph-server:API:built-in-endpoints.adoc#_run_a_loading_job[Run a loading job]
"""
logger.info("entry: runLoadingJobWithDataFrame")
if logger.level == logging.DEBUG:
logger.debug("params: " + self._locals(locals()))

if columns is None:
data = df.to_csv(sep = sep, header=False)
else:
data = df.to_csv(columns = columns, sep = sep, header=False)

res = await self.runLoadingJobWithData(data, fileTag, jobName, sep, eol, timeout, sizeLimit)

logger.info("exit: runLoadingJobWithDataFrame")

return res

async def runLoadingJobWithFile(self, filePath: str, fileTag: str, jobName: str, sep: str = None,
eol: str = None, timeout: int = 16000, sizeLimit: int = 128000000) -> Union[dict, None]:
"""Execute a loading job with the referenced file.
Expand Down Expand Up @@ -52,19 +103,70 @@ async def runLoadingJobWithFile(self, filePath: str, fileTag: str, jobName: str,
if logger.level == logging.DEBUG:
logger.debug("params: " + self._locals(locals()))

data, params = _prep_run_loading_job_with_file(
filePath, jobName, fileTag, sep, eol)
data = _prep_run_loading_job_with_file(filePath)
res = await self.runLoadingJobWithData(data, fileTag, jobName, sep, eol, timeout, sizeLimit)

logger.info("exit: runLoadingJobWithFile")

return res

async def runLoadingJobWithData(self, data: str, fileTag: str, jobName: str, sep: str = None,
eol: str = None, timeout: int = 16000, sizeLimit: int = 128000000) -> Union[dict, None]:
"""Execute a loading job with the given data string.
The data string will be posted to the TigerGraph server and the value of the appropriate
FILENAME definition will be updated to point to the data received.
NOTE: The argument `USING HEADER="true"` in the GSQL loading job may not be enough to
load the file correctly. Remove the header from the data file before using this function.
Args:
data:
The data string to be loaded.
fileTag:
The name of file variable in the loading job (DEFINE FILENAME <fileTag>).
jobName:
The name of the loading job.
sep:
Data value separator. If your data is JSON, you do not need to specify this
parameter. The default separator is a comma `,`.
eol:
End-of-line character. Only one or two characters are allowed, except for the
special case `\\r\\n`. The default value is `\\n`
timeout:
Timeout in seconds. If set to `0`, use the system-wide endpoint timeout setting.
sizeLimit:
Maximum size for input file in bytes.
if not data and not params:
# failed to read file
Endpoint:
- `POST /ddl/{graph_name}`
See xref:tigergraph-server:API:built-in-endpoints.adoc#_run_a_loading_job[Run a loading job]
"""
logger.info("entry: runLoadingJobWithData")
if logger.level == logging.DEBUG:
logger.debug("params: " + self._locals(locals()))

if not data or not jobName or not fileTag:
# invalid inputs
logger.error("Invalid data or params")
logger.info("exit: runLoadingJobWithData")
return None

params = {
"tag": jobName,
"filename": fileTag,
}
if sep is not None:
params["sep"] = sep
if eol is not None:
params["eol"] = eol

res = await self._req("POST", self.restppUrl + "/ddl/" + self.graphname, params=params, data=data,
headers={"RESPONSE-LIMIT": str(sizeLimit), "GSQL-TIMEOUT": str(timeout)})

if logger.level == logging.DEBUG:
logger.debug("return: " + str(res))
logger.info("exit: runLoadingJobWithFile")
logger.info("exit: runLoadingJobWithData")

return res

Expand Down

0 comments on commit dfc312d

Please sign in to comment.