diff --git a/.github/workflows/github-actions.yml b/.github/workflows/github-actions.yml new file mode 100644 index 0000000..d5129e7 --- /dev/null +++ b/.github/workflows/github-actions.yml @@ -0,0 +1,29 @@ +name: github-actions +on: [ push, pull_request ] +jobs: + build: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-python@v5 + + - run: pip install -r requirements.txt + test: + runs-on: ubuntu-latest + needs: build + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-python@v5 + + - run: pip install -r requirements.txt + - run: python -m unittest tests/CitesphereConnectorTest.py + ruff: + runs-on: ubuntu-latest + needs: build + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-python@v5 + - uses: astral-sh/ruff-action@v3 + + - run: ruff check --fix + - run: ruff format diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..6af5269 --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,14 @@ +repos: +- repo: https://github.com/pre-commit/pre-commit-hooks + rev: v4.1.0 + hooks: + - id: check-yaml + - id: check-ast + - id: end-of-file-fixer + - id: trailing-whitespace +- repo: https://github.com/astral-sh/ruff-pre-commit + rev: v0.7.4 + hooks: + - id: ruff + args: [ --fix ] + - id: ruff-format diff --git a/CitesphereConnector.py b/CitesphereConnector.py deleted file mode 100644 index 05bf0fc..0000000 --- a/CitesphereConnector.py +++ /dev/null @@ -1,81 +0,0 @@ -import urllib.request as urllib2 -import json -import base64 - - -class CitesphereConnector: - def __init__(self, api, auth_token_object): - self.api = api - self.auth_token_object = auth_token_object - self.validate() - self.handle_api_params() - - def validate(self): - if not hasattr(self.auth_token_object, 'authType'): - raise AttributeError('Missing authType attribute') - - if not hasattr(self.auth_token_object, 'headers'): - raise AttributeError('Missing headers attribute') - - if not hasattr(self.auth_token_object, 'access_token'): - if not hasattr(self.auth_token_object, 'username') and not hasattr(self.auth_token_object, 'password'): - raise AttributeError('Either username and password or access_token should be present') - - if not self.auth_token_object.authType == 'oauth' and not self.auth_token_object.authType == 'basic': - raise Exception("authType should be either oauth or basic") - - def handle_api_params(self): - if self.auth_token_object.authType == "oauth": - self.auth_token_object.headers = {'Authorization': 'Bearer {}'.format(self.auth_token_object.access_token)} - elif self.auth_token_object.authType == "basic": - auth_str = '{}:{}'.format(self.auth_token_object.username, self.auth_token_object.password) - auth_b64 = base64.b64encode(auth_str.encode('ascii')) - self.auth_token_object.headers = {'Authorization': 'Basic {}'.format(auth_b64)} - - def execute_command(self, url): - try: - response = urllib2.urlopen(urllib2.Request(url, headers=self.auth_token_object.headers)) - data = json.load(response) - - return data - except Exception as exc: - return {"error_message": str(exc)} - - def get_user(self): - url = self.api + "/v1/user" - return self.execute_command(url) - - def check_test(self): - url = self.api + "/v1/test" - return self.execute_command(url) - - # Common method to get data based on endpoint - def get_data_by_endpoint(self, end_point): - url = self.api + "/v1" + end_point - return self.execute_command(url) - - def get_groups(self): - url = self.api + "/v1/groups" - return self.execute_command(url) - - def get_group_info(self, group_id): - url = self.api + "/v1/groups/{}".format(group_id) - return self.execute_command(url) - - def get_group_items(self, zotero_group_id): - url = self.api + "/v1/groups/{}/items".format(zotero_group_id) - return self.execute_command(url) - - def get_collections(self, zotero_group_id): - url = self.api + "/v1/groups/{}/collections".format(zotero_group_id) - return self.execute_command(url) - - def get_collection_items(self, zotero_group_id, collection_id,page_number=0): - url = self.api + "/v1/groups/{}/collections/{}/items".format(zotero_group_id, collection_id) - if page_number: - url = url+"?&page={}".format(page_number) - return self.execute_command(url) - - def get_item_info(self, zotero_group_id, item_id): - url = self.api + "/v1/groups/{}/items/{}".format(zotero_group_id, item_id) - return self.execute_command(url) \ No newline at end of file diff --git a/CitesphereConnectorTest.py b/CitesphereConnectorTest.py deleted file mode 100644 index 850a7fd..0000000 --- a/CitesphereConnectorTest.py +++ /dev/null @@ -1,37 +0,0 @@ -import unittest -from unittest.mock import Mock, patch, MagicMock -from CitesphereConnector import CitesphereConnector -from authObject import AuthObject -from http import HTTPStatus - -class EmptyObject: - pass - - -class CitesphereConnectorTest(unittest.TestCase): - def test_validate_method(self): - auth_object = AuthObject() - with self.assertRaises(Exception): - CitesphereConnector("example.com", auth_object) - pass - - def test_validate_method_attribute_error(self): - auth_object = EmptyObject() - with self.assertRaises(AttributeError): - CitesphereConnector("example.com", auth_object) - pass - - @patch('CitesphereConnector.CitesphereConnector.get_groups') - def test_api_called(self, mock_get_groups): - auth_object = AuthObject() - auth_object.authType = 'oauth' - mock_get_groups.return_value = Mock() - mock_get_groups.return_value.json.return_value = [{'name': "vogon", 'id': 1}] - connector = CitesphereConnector("example.com", auth_object) - print(connector.get_groups()) - self.assertEqual(connector.get_groups()[0]['id'], 1) - pass - - -if __name__ == '__main__': - unittest.main() \ No newline at end of file diff --git a/README.md b/README.md index 7948d5c..8428e34 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,33 @@ -## Citesphere Connector +# Citesphere Connector -Python library to connect to Citephere using its [API](https://documenter.getpostman.com/view/19365454/UVeMJiyx). \ No newline at end of file +Python library to connect to Citephere using its [API](https://documenter.getpostman.com/view/19365454/UVeMJiyx). + + +## PyPi Package Installation + +`pip install citesphere-connector` + + +## Developer Setup + +Create a python virtual environment outside of this project's root directory `python3 -m venv env` and activate it `source env/bin/activate` + +Navigate to the project root and download package dependencies `pip install -r requirements.txt` + +Install the pre-commit git hooks `pre-commit install` + +For retrieivng the Bearer access token required for endpoint method calls, please see the following OAuth2 [documentation] (https://diging.atlassian.net/wiki/spaces/OAC/pages/3533078792/Getting+OAuth2+Access+Token+in+Postman) for Citesphere + + +## User Guide + +Retrieve Bearer access token required for endpoint method calls: [documentation] (https://diging.atlassian.net/wiki/spaces/OAC/pages/3533078792/Getting+OAuth2+Access+Token+in+Postman) + + +### Downloading Files + +Use the 'Download' Jupyter notebook to download files from Citesphere to your local device. + +### Uploading File + +Use the 'Upload' Jupyter notebook to upload files to Citesphere from your local device. diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/authentication.py b/authentication.py deleted file mode 100644 index 0fe3003..0000000 --- a/authentication.py +++ /dev/null @@ -1,7 +0,0 @@ -class AuthObject: - def __init__(self, authType=None, headers=None, username=None, password=None, access_token=None): - self.authType = authType - self.headers = headers - self.username = username - self.password = password - self.access_token = access_token \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..da00fa6 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,24 @@ +[build-system] +requires = ["setuptools >= 61.0"] +build-backend = "setuptools.build_meta" + +[project] +name = "citesphere-connector" +version = "1.0.0" +authors = [ + {name = "Digital Innovation Group", email = "diging@asu.edu"}, + {name = "Julia Damerow", email = "jdamerow@asu.edu"}, +] +maintainers = [ + {name = "Julia Damerow", email = "jdamerow@asu.edu"}, +] +description = "Connect to Citesphere, an application that enables superior management of Zotero citations" +readme = "README.md" +license = {file = "LICENSE"} +keywords = ["cite", "diging", "citesphere", "sphere", "zotero"] +classifiers = [ + "Development Status :: 3 - Alpha", + "Intended Audience :: Research Software Engineers, Researchers, Data Scientists, Developers", + "Topic :: Reserach Software Engineering :: Citation Manager", + "Programming Language :: Python", +] diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..fa4851b --- /dev/null +++ b/requirements.txt @@ -0,0 +1,5 @@ +cfgv==3.4.0 +distlib==0.3.9 +pre_commit==4.0.1 +PyYAML==6.0.2 +ruff==0.7.4 diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..665e4d2 --- /dev/null +++ b/setup.py @@ -0,0 +1,31 @@ +"""A setuptools based setup module. + +See: +https://packaging.python.org/guides/distributing-packages-using-setuptools/ +https://github.com/pypa/sampleproject +""" + +from setuptools import setup, find_packages +import pathlib + +here = pathlib.Path(__file__).parent.resolve() + +long_description = (here / "README.md").read_text(encoding="utf-8") + +setup( + name="citesphere-connector", + version="1.0.0", + description="Connect to Citesphere, an application that enables superior management of Zotero citations", + url="https://github.com/diging/citesphere-connector", + author="Digital Innovation Group", + author_email="diging@asu.edu", + classifiers=[ + "Development Status :: 3 - Alpha", + "Intended Audience :: Research Software Engineers, Researchers, Data Scientists, Developers", + "Topic :: Reserach Software Engineering :: Citation Manager", + "Programming Language :: Python", + ], + keywords="cite, diging, citesphere, sphere, zotero", + package_dir={"": "src"}, + packages=find_packages(where="src"), +) diff --git a/src/CitesphereConnector.py b/src/CitesphereConnector.py new file mode 100644 index 0000000..4403571 --- /dev/null +++ b/src/CitesphereConnector.py @@ -0,0 +1,132 @@ +import urllib.request as urllib2 +import json +import base64 +import requests +import os + + +class CitesphereConnector: + def __init__(self, api, auth_token_object): + self.api = api + self.auth_token_object = auth_token_object + self.validate() + self.handle_api_params() + + def validate(self): + if not hasattr(self.auth_token_object, "authType"): + raise AttributeError("Missing authType attribute") + + if not hasattr(self.auth_token_object, "headers"): + raise AttributeError("Missing headers attribute") + + if not hasattr(self.auth_token_object, "access_token"): + if not hasattr(self.auth_token_object, "username") and not hasattr( + self.auth_token_object, "password" + ): + raise AttributeError( + "Either username and password or access_token should be present" + ) + + if ( + not self.auth_token_object.authType == "oauth" + and not self.auth_token_object.authType == "basic" + ): + raise Exception("authType should be either oauth or basic") + + def handle_api_params(self): + if self.auth_token_object.authType == "oauth": + self.auth_token_object.headers = { + "Authorization": f"Bearer {self.auth_token_object.access_token}", + } + elif self.auth_token_object.authType == "basic": + auth_str = ( + f"{self.auth_token_object.username}:{self.auth_token_object.password}" + ) + auth_b64 = base64.b64encode(auth_str.encode("ascii")) + self.auth_token_object.headers = {"Authorization": f"Basic {auth_b64}"} + + def execute_command(self, url): + try: + response = urllib2.urlopen( + urllib2.Request(url, headers=self.auth_token_object.headers) + ) + data = json.load(response) + + return data + except Exception as exc: + return {"error_message": str(exc)} + + def execute_post_request(self, url, data, files): + try: + response = requests.post( + url, headers=self.auth_token_object.headers, data=data, files=files + ) + print(response.status_code) + # Uncomment for debugging response from Citesphere + # print(response.text) + return response + except Exception as exc: + return {"error_message": str(exc)} + + def get_user(self): + url = f"{self.api}/v1/user" + return self.execute_command(url) + + def check_test(self): + url = f"{self.api}/v1/test" + return self.execute_command(url) + + def check_access(self, document_id): + url = f"{self.api}/files/giles/{document_id}/access/check" + return self.execute_command(url) + + # Common method to get data based on endpoint + def get_data_by_endpoint(self, end_point): + url = f"{self.api}/v1{end_point}" + return self.execute_command(url) + + def get_groups(self): + url = f"{self.api}/v1/groups" + return self.execute_command(url) + + def get_group_info(self, group_id): + url = f"{self.api}/v1/groups/{group_id}" + return self.execute_command(url) + + def get_group_items(self, zotero_group_id): + url = f"{self.api}/v1/groups/{zotero_group_id}/items" + return self.execute_command(url) + + def get_collections(self, zotero_group_id): + url = f"{self.api}/v1/groups/{zotero_group_id}/collections" + return self.execute_command(url) + + def get_collection_items(self, zotero_group_id, collection_id, page_number=0): + url = ( + f"{self.api}/v1/groups/{zotero_group_id}/collections/{collection_id}/items" + ) + if page_number: + url = f"{url}?&page={page_number}" + return self.execute_command(url) + + def get_item_info(self, zotero_group_id, item_id): + url = f"{self.api}/v1/groups/{zotero_group_id}/items/{item_id}" + return self.execute_command(url) + + def get_collections_by_collection_id(self, zotero_group_id, collection_id): + url = f"{self.api}/groups/{zotero_group_id}/collections/{collection_id}/collections" + return self.execute_command(url) + + def add_item(self, group_id, data, file_path): + try: + with open(file_path, "rb") as file_obj: + files = [(os.path.basename(file_path), file_obj)] + request_files = [ + ("files", (name, file, "application/pdf")) for name, file in files + ] + url = f"{self.api}/v1/groups/{group_id}/items/create" + + return self.execute_post_request(url, data, request_files) + except Exception as e: + print(f"[ERROR] -------- Error during API request with {file_path}: {e}") + return "Error loading/reading file" diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/authentication.py b/src/authentication.py new file mode 100644 index 0000000..f53e0de --- /dev/null +++ b/src/authentication.py @@ -0,0 +1,14 @@ +class AuthObject: + def __init__( + self, + authType=None, + headers=None, + username=None, + password=None, + access_token=None, + ): + self.authType = authType + self.headers = headers + self.username = username + self.password = password + self.access_token = access_token diff --git a/constants.py b/src/constants.py similarity index 51% rename from constants.py rename to src/constants.py index c81f189..a62ba85 100644 --- a/constants.py +++ b/src/constants.py @@ -1,3 +1,3 @@ CITESPHERE_API_URL = "https://diging-dev.asu.edu/citesphere-review/api" MAX_SIZE = 50 -GILES_URL = f"https://diging.asu.edu/geco-giles-staging/api/v2/resources/files/" +GILES_URL = "https://diging.asu.edu/geco-giles-staging/api/v2/resources/files/" diff --git a/csvGenerator.ipynb b/src/csvGenerator.ipynb similarity index 83% rename from csvGenerator.ipynb rename to src/csvGenerator.ipynb index 007c21a..f2f0d46 100644 --- a/csvGenerator.ipynb +++ b/src/csvGenerator.ipynb @@ -10,9 +10,8 @@ "import os\n", "import csv\n", "import math\n", - "import random\n", "import requests\n", - "import constants as const \n", + "import constants as const\n", "from authentication import AuthObject\n", "from CitesphereConnector import CitesphereConnector" ] @@ -25,15 +24,16 @@ "outputs": [], "source": [ "auth_object = AuthObject()\n", - "auth_object.authType = 'oauth'\n", + "auth_object.authType = \"oauth\"\n", "auth_object.access_token = \"f5f7e899-30d3-4531-8b2e-8009e9969ed4\"\n", "citesphere_api_url = const.CITESPHERE_API_URL\n", "connector = CitesphereConnector(citesphere_api_url, auth_object)\n", - "#default max number of items displayed on a collection items page in citesphere\n", - "max_size=const.MAX_SIZE\n", + "# default max number of items displayed on a collection items page in citesphere\n", + "max_size = const.MAX_SIZE\n", "\n", - "def get_file(file_id:str)-> str:\n", - " return const.GILES_URL+\"{}/content\".format(file_id)" + "\n", + "def get_file(file_id: str) -> str:\n", + " return const.GILES_URL + \"{}/content\".format(file_id)" ] }, { @@ -67,7 +67,7 @@ "metadata": {}, "outputs": [], "source": [ - "groups=connector.get_groups()" + "groups = connector.get_groups()" ] }, { @@ -107,35 +107,33 @@ "metadata": {}, "outputs": [], "source": [ - "#download files from the collection items\n", - "def download_files(folder_path:str,ids:set, citesphere_token:str) -> list:\n", - " \n", - " #stores paths to downloaded files\n", + "# download files from the collection items\n", + "def download_files(folder_path: str, ids: set, citesphere_token: str) -> list:\n", + " # stores paths to downloaded files\n", " path_list = []\n", - " \n", - " #iterating through the ids list\n", - " for (file_id, file_name) in ids:\n", - " \n", + "\n", + " # iterating through the ids list\n", + " for file_id, file_name in ids:\n", " # getting the file ur using giles file id\n", " giles_url = get_file(file_id)\n", " os.makedirs(folder_path, exist_ok=True)\n", " filename = os.path.join(folder_path, f\"{file_name}\")\n", - " \n", - " #append the path of the saved file to the folder\n", + "\n", + " # append the path of the saved file to the folder\n", " path_list.append(filename)\n", - " \n", - " #header for get request\n", + "\n", + " # header for get request\n", " headers = {\n", " \"Authorization\": f\"Bearer {citesphere_token}\",\n", - " \"Content-Type\": \"application/pdf;charset=UTF-8\"\n", - " }\n", + " \"Content-Type\": \"application/pdf;charset=UTF-8\",\n", + " }\n", " response = requests.get(giles_url, headers=headers)\n", - " \n", - " #saving the file if retrieved successfully\n", + "\n", + " # saving the file if retrieved successfully\n", " if response.status_code == 200:\n", " with open(filename, \"wb\") as file:\n", " file.write(response.content)\n", - " return path_list " + " return path_list" ] }, { @@ -226,8 +224,7 @@ "source": [ "# Create the CSV file with all the metadata and file path of the downloaded files\n", "def write_to_csv(csv_name: str, item: list, flag: int) -> None:\n", - "\n", - " with open(csv_name, 'a', newline='') as file:\n", + " with open(csv_name, \"a\", newline=\"\") as file:\n", " writer = csv.writer(file)\n", "\n", " # Check if it's the first time writing to the file\n", @@ -236,7 +233,7 @@ " writer.writerow(fields)\n", "\n", " # Write the values to the CSV file\n", - " writer.writerow(list(item.values()))\n" + " writer.writerow(list(item.values()))" ] }, { @@ -275,8 +272,9 @@ "metadata": {}, "outputs": [], "source": [ - "def add_to_csv(csv_name: str, folder_name: str, items: list, csv_dict: dict, flag: int) -> int:\n", - "\n", + "def add_to_csv(\n", + " csv_name: str, folder_name: str, items: list, csv_dict: dict, flag: int\n", + ") -> int:\n", " for item in items[\"items\"]:\n", " if item[\"key\"] in csv_dict:\n", " continue\n", @@ -291,16 +289,28 @@ " for values in items_list:\n", " # Getting the file IDs in uploadedFile, extractedText, pages, image, text, ocr, additionalFiles\n", " if values[\"uploadedFile\"] and values[\"uploadedFile\"] != \"None\":\n", - " giles_ids.add((values[\"uploadedFile\"][\"id\"], values[\"uploadedFile\"][\"filename\"]))\n", + " giles_ids.add(\n", + " (\n", + " values[\"uploadedFile\"][\"id\"],\n", + " values[\"uploadedFile\"][\"filename\"],\n", + " )\n", + " )\n", "\n", " # Check if extractedText is present and not equal to \"None\"\n", " if values[\"extractedText\"] and values[\"extractedText\"] != \"None\":\n", - " giles_ids.add((values[\"extractedText\"][\"id\"], values[\"extractedText\"][\"filename\"]))\n", + " giles_ids.add(\n", + " (\n", + " values[\"extractedText\"][\"id\"],\n", + " values[\"extractedText\"][\"filename\"],\n", + " )\n", + " )\n", "\n", " # Check if pages is present and not equal to \"None\"\n", " if values[\"pages\"] and values[\"pages\"] != \"None\":\n", " for value in values[\"pages\"]:\n", - " giles_ids.add((value[\"image\"][\"id\"], value[\"image\"][\"filename\"]))\n", + " giles_ids.add(\n", + " (value[\"image\"][\"id\"], value[\"image\"][\"filename\"])\n", + " )\n", " giles_ids.add((value[\"text\"][\"id\"], value[\"text\"][\"filename\"]))\n", " giles_ids.add((value[\"ocr\"][\"id\"], value[\"ocr\"][\"filename\"]))\n", "\n", @@ -310,14 +320,16 @@ "\n", " if giles_ids:\n", " # store paths of the downloaded files to the path attribute\n", - " item[\"paths\"] = download_files(folder_name, giles_ids, auth_object.access_token)\n", + " item[\"paths\"] = download_files(\n", + " folder_name, giles_ids, auth_object.access_token\n", + " )\n", "\n", " # Add the item to csv_dict and write it to the CSV file\n", " csv_dict[item[\"key\"]] = item\n", " write_to_csv(csv_name, item, flag)\n", " flag = 1\n", "\n", - " return flag\n" + " return flag" ] }, { @@ -358,26 +370,29 @@ "outputs": [], "source": [ "# Downloads and generates a CSV file containing all the group items information\n", - "def process_groups(csv_name: str, folder_path: str, groups: list, connector, max_size: int) -> dict:\n", - "\n", + "def process_groups(\n", + " csv_name: str, folder_path: str, groups: list, connector, max_size: int\n", + ") -> dict:\n", " csv_dict = {}\n", " flag = 0\n", - " \n", - " #Iterate over the groups\n", + "\n", + " # Iterate over the groups\n", " for group in groups:\n", " group_id = group[\"id\"]\n", " collections = connector.get_collections(group_id)\n", - " \n", - " #Iterate over the collections in the respective group\n", + "\n", + " # Iterate over the collections in the respective group\n", " for collection in collections[\"collections\"]:\n", " num_pages = math.ceil(collection[\"numberOfItems\"] / max_size)\n", - " \n", - " #Iterating over the pages\n", + "\n", + " # Iterating over the pages\n", " for page in range(1, num_pages + 1):\n", - " items = connector.get_collection_items(group_id, collection[\"key\"], page)\n", + " items = connector.get_collection_items(\n", + " group_id, collection[\"key\"], page\n", + " )\n", " flag = add_to_csv(csv_name, folder_path, items, csv_dict, flag)\n", "\n", - " return csv_dict\n" + " return csv_dict" ] }, { @@ -391,9 +406,9 @@ "source": [ "csv_filename = \"citesphere_csv.csv\"\n", "\n", - "folder_path = \"Files\"\n", + "folder_path = \"Files\"\n", "\n", - "process_groups(csv_filename,folder_path,groups,connector,max_size)\n" + "process_groups(csv_filename, folder_path, groups, connector, max_size)" ] }, { diff --git a/tests/CitesphereConnectorTest.py b/tests/CitesphereConnectorTest.py new file mode 100644 index 0000000..0ed2fc2 --- /dev/null +++ b/tests/CitesphereConnectorTest.py @@ -0,0 +1,210 @@ +import unittest +from unittest.mock import patch, mock_open +from src.CitesphereConnector import CitesphereConnector +from src.authentication import AuthObject + + +class EmptyObject: + pass + + +class CitesphereConnectorTest(unittest.TestCase): + def setUp(self): + self.auth_object = AuthObject() + self.auth_object.authType = "oauth" + + def tearDown(self): + del self.auth_object + + def test_validate_method(self): + self.auth_object = AuthObject() + with self.assertRaises(Exception): + CitesphereConnector("example.com", self.auth_object) + pass + + def test_validate_method_attribute_error(self): + self.auth_object = EmptyObject() + with self.assertRaises(AttributeError): + CitesphereConnector("example.com", self.auth_object) + pass + + @patch("src.CitesphereConnector.CitesphereConnector.get_groups") + def test_get_groups(self, mock_get_groups): + mock_get_groups.return_value = [{"name": "vogon", "id": 1}] + c = CitesphereConnector("test_get_groups.com", self.auth_object) + result = c.get_groups() + c.get_groups.assert_called_once() + self.assertEqual(result[0]["id"], 1) + self.assertEqual(result[0]["name"], "vogon") + pass + + @patch("src.CitesphereConnector.CitesphereConnector.check_access") + def test_check_access(self, mock_check_access): + mock_check_access.return_value = "200 OK" + c = CitesphereConnector("test_check_access.com", self.auth_object) + result = c.check_access("example_doc_id") + c.check_access.assert_called_once() + self.assertEqual(result, "200 OK") + pass + + @patch("src.CitesphereConnector.CitesphereConnector.get_user") + def test_get_user(self, mock_get_user): + mock_get_user.return_value = { + "username": "test", + "email": "test@test.com", + "firstName": "get", + "lastName": "user", + } + c = CitesphereConnector("test_get_user.com", self.auth_object) + result = c.get_user() + c.get_user.assert_called_once() + self.assertEqual(result["username"], "test") + self.assertEqual(result["email"], "test@test.com") + self.assertEqual(result["firstName"], "get") + self.assertEqual(result["lastName"], "user") + pass + + @patch("src.CitesphereConnector.CitesphereConnector.get_group_info") + def test_get_group_info(self, mock_get_group_info): + mock_get_group_info.return_value = {"id": 1, "name": "test_get_group_info"} + c = CitesphereConnector("test_get_group_info.com", self.auth_object) + result = c.get_group_info(1) + c.get_group_info.assert_called_once() + self.assertEqual(result["id"], 1) + self.assertEqual(result["name"], "test_get_group_info") + pass + + @patch("src.CitesphereConnector.CitesphereConnector.get_group_items") + def test_get_group_items(self, mock_get_group_items): + mock_get_group_items.return_value = { + "group": {"id": 1, "name": "test_get_group_items_group"}, + "items": [ + {"key": "TEST", "group": 1, "title": "test_get_group_items_item"} + ], + } + c = CitesphereConnector("test_get_group_items.com", self.auth_object) + result = c.get_group_items(1) + c.get_group_items.assert_called_once() + self.assertEqual(result["group"]["id"], 1) + self.assertEqual(result["group"]["name"], "test_get_group_items_group") + self.assertEqual(result["items"][0]["key"], "TEST") + self.assertEqual(result["items"][0]["group"], 1) + self.assertEqual(result["items"][0]["title"], "test_get_group_items_item") + pass + + @patch("src.CitesphereConnector.CitesphereConnector.get_collections") + def test_get_collections(self, mock_get_collections): + mock_get_collections.return_value = { + "group": {"id": 1, "name": "test_get_collections_group"}, + "collections": [ + { + "id": {"timestamp": 1, "date": 1}, + "key": "TEST", + "groupId": 1, + "name": "test_get_collections_collection", + } + ], + } + c = CitesphereConnector("test_get_collections.com", self.auth_object) + result = c.get_collections(1) + c.get_collections.assert_called_once() + self.assertEqual(result["group"]["id"], 1) + self.assertEqual(result["group"]["name"], "test_get_collections_group") + self.assertEqual(result["collections"][0]["id"]["timestamp"], 1) + self.assertEqual(result["collections"][0]["key"], "TEST") + self.assertEqual(result["collections"][0]["groupId"], 1) + self.assertEqual( + result["collections"][0]["name"], "test_get_collections_collection" + ) + pass + + @patch("src.CitesphereConnector.CitesphereConnector.get_collection_items") + def test_get_collection_items(self, mock_get_collection_items): + mock_get_collection_items.return_value = { + "group": {"id": 1, "name": "test_get_collection_items_group"}, + "items": [ + {"key": "TEST", "group": 1, "title": "test_get_collection_items_item"} + ], + } + c = CitesphereConnector("test_get_collection_items.com", self.auth_object) + result = c.get_collection_items(1, 1) + c.get_collection_items.assert_called_once() + self.assertEqual(result["group"]["id"], 1) + self.assertEqual(result["group"]["name"], "test_get_collection_items_group") + self.assertEqual(result["items"][0]["key"], "TEST") + self.assertEqual(result["items"][0]["group"], 1) + self.assertEqual(result["items"][0]["title"], "test_get_collection_items_item") + pass + + @patch("src.CitesphereConnector.CitesphereConnector.get_item_info") + def test_get_item_info(self, mock_get_item_info): + mock_get_item_info.return_value = { + "item": {"key": "TEST", "group": "1", "title": "test_get_item_info_item"}, + } + c = CitesphereConnector("test_get_item_info.com", self.auth_object) + result = c.get_item_info(1, 1) + c.get_item_info.assert_called_once() + self.assertEqual(result["item"]["key"], "TEST") + self.assertEqual(result["item"]["group"], "1") + self.assertEqual(result["item"]["title"], "test_get_item_info_item") + pass + + @patch( + "src.CitesphereConnector.CitesphereConnector.get_collections_by_collection_id" + ) + def test_get_collections_by_collection_id( + self, mock_get_collections_by_collection_id + ): + mock_get_collections_by_collection_id.return_value = { + "group": {"id": 1, "name": "test_get_collections_by_collection_id_group"}, + "items": [ + { + "key": "TEST", + "group": 1, + "title": "test_get_collections_by_collection_id_item", + } + ], + } + c = CitesphereConnector( + "test_get_collections_by_collection_id.com", self.auth_object + ) + result = c.get_collections_by_collection_id(1, 1) + c.get_collections_by_collection_id.assert_called_once() + self.assertEqual(result["group"]["id"], 1) + self.assertEqual( + result["group"]["name"], "test_get_collections_by_collection_id_group" + ) + self.assertEqual(result["items"][0]["key"], "TEST") + self.assertEqual(result["items"][0]["group"], 1) + self.assertEqual( + result["items"][0]["title"], "test_get_collections_by_collection_id_item" + ) + pass + + @patch("src.CitesphereConnector.CitesphereConnector.add_item") + def test_add_item(self, mock_add_item): + mock_add_item.return_value = { + "key": "TEST", + "group": "1", + "title": "test_add_item", + } + c = CitesphereConnector("test_add_item.com", self.auth_object) + result = c.add_item(1, {"data": "test"}, "path/to/file") + c.add_item.assert_called_once() + self.assertEqual(result["key"], "TEST") + self.assertEqual(result["group"], "1") + self.assertEqual(result["title"], "test_add_item") + pass + + @patch("builtins.open", new_callable=mock_open) + def test_add_item_exception(self, mock_open): + mock_open.side_effect = Exception() + c = CitesphereConnector("test_add_item_exception.com", self.auth_object) + result = c.add_item(1, {"data": "test"}, "path/to/file") + mock_open.assert_called_once_with("path/to/file", "rb") + self.assertEqual(result, "Error loading/reading file") + pass + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29