Skip to content

Commit

Permalink
Merge pull request #4 from the-sam963/master
Browse files Browse the repository at this point in the history
New Features added
  • Loading branch information
CodingSamrat authored Mar 20, 2023
2 parents 715439a + c7b6d17 commit fb2328a
Show file tree
Hide file tree
Showing 8 changed files with 483 additions and 128 deletions.
2 changes: 1 addition & 1 deletion filexdb/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@
from .database import FileXdb


__version__ = "0.1.0"
__version__ = "1.0.0"

201 changes: 115 additions & 86 deletions filexdb/collection.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import json
from typing import Mapping, List

from .document import Document
from .document import Document, JsonArray
from .fileio import FileIO


Expand All @@ -14,36 +14,29 @@


class Collection:
def __init__(self, col_name: str, binary_file: FileIO) -> None:
def __init__(self, col_name: str, file_handler: FileIO) -> None:
self._col_name = col_name
self._binary_file = binary_file
self._file_handler = file_handler

# Get the data of existing Database or empty database.
self._database = self._binary_file.read()
self._database = self._get_database()

self._cursor: int = 0

# Initiate a default Collection.
# Check the Collection is already exists or no.
if self._col_name in self._database.keys():
# Initiating Collecting
self._collection: JsonArray = self._get_collection()

# Get the existing Collection
self._collection: list = self._database[self._col_name]

else:
# Create new Collection
self._database[self._col_name] = []
self._collection: list = self._database[self._col_name]
# Cursor
self._cursor: int = 0

def insert(self, document: Mapping) -> str:
"""
Inserts a single Document into the Database.
Document should be JSON Object.
:param document: Document to insert into database
:return: None
:param document: Document to insert into the database.
:return: Document ID.
"""

# Make sure the document implements the ``Mapping`` interface
if not isinstance(document, Mapping):
raise ValueError('Document is not a Dictionary')
Expand All @@ -52,30 +45,33 @@ def insert(self, document: Mapping) -> str:
if "_id_" in document.keys():
raise KeyError(f"You are not allowed to modify key `_id_`")

# getting Database
_database = self._get_database()

# Create a Document
_document = Document(document)

# id of Document
_doc_id: str = _document.id
# ID of Document
_doc_id: str = str(_document.id)

# check Document is already exist or not
if not self._doc_is_exists(_document.id):
if not self._doc_is_exists(str(_document.id)):

# Append the document into the Collection
self._collection.append(_document)

# Add modified Collection to Database
self._database[self._col_name] = self._collection
_database[self._col_name] = self._collection

# print(self._database)
# Write current state of Database into the Database-file
self._binary_file.write(self._database)
self._file_handler.write(_database)

return _doc_id
else:
raise ValueError(f"Document id `{_document.id}` is already exists")

def insert_all(self, document_list: List[Mapping]) -> List[str]:
def insert_all(self, document_list: List[Mapping]) -> JsonArray:
"""
Inserts a single ``Document`` into the ``Database``.
Expand All @@ -97,43 +93,9 @@ def insert_all(self, document_list: List[Mapping]) -> List[str]:
# insert every single document in Database & increment ``doc_count``.
_doc_id.append(self.insert(document))

return _doc_id

""" FIND_ONE
def __find_one(self, query: Mapping = None) -> Document | None:
"
Finds a single ``Document`` of ``Collection``.
If ``query`` is None then returns all the ``Documents`` of ``Collection``.
If ``query`` is not None then returns only the first occurrence.
return JsonArray(_doc_id)

:param query: Condition to search Document
:return: Document
"
# Default result
_result = {}
# Make sure the query implements the ``Mapping`` interface.
if not isinstance(query, Mapping | None):
raise ValueError('Document is not a Dictionary')
# Check if has ``query`` or not
if query is None:
_result = self._collection[self._cursor]
self._reset_cursor()
else:
print(self._cursor)
_result = self._find_document_by_query(query)
self._reset_cursor()
_result = _result[self._cursor]
return _result
"""

def find(self, query=None, limit=None) -> List[Document]:
def find(self, query=None, limit=None) -> JsonArray:
"""
Finds all ``Document`` of ``Collection``.
Expand All @@ -160,7 +122,7 @@ def find(self, query=None, limit=None) -> List[Document]:
raise ValueError('Document is not a Tuple')

# if limit, Check everything ok
_limit_start = _limit_end = None
_limit_start = _limit_end = 0

if limit and type(limit) == type((1, 3)):
if len(limit) == 2:
Expand Down Expand Up @@ -189,7 +151,7 @@ def find(self, query=None, limit=None) -> List[Document]:
else:
_result = self._collection

return _result
return JsonArray(_result)

elif query is not None and type(query) == type({}):
if limit:
Expand All @@ -208,7 +170,7 @@ def find(self, query=None, limit=None) -> List[Document]:
self._reset_cursor()

# check if lower limit is valid or not
if _limit_start >= len(_result):
if _limit_start >= len(_result) and _limit_start != 0:
raise ValueError(f"lower limit should be smaller than length of result")
else:
# Travers limited result
Expand All @@ -225,9 +187,9 @@ def find(self, query=None, limit=None) -> List[Document]:

self._reset_cursor()

return _result
return JsonArray(_result)

def delete(self, query=None) -> List[str]:
def delete(self, query=None) -> JsonArray:
"""
Delete single or multiple Document when meet the Conditions or ``query``.
Expand All @@ -248,11 +210,11 @@ def delete(self, query=None) -> List[str]:
self._collection.remove(_doc)
_doc_id.append(_doc["_id_"])

self._binary_file.write(self._database)
self._file_handler.write(self._database)

return _doc_id
return JsonArray(_doc_id)

def update(self, document: Mapping, query=None) -> List[str]:
def update(self, document: Mapping, query=None) -> JsonArray:
"""
Fetch all the Documents mathc the conditions and update them.
Expand Down Expand Up @@ -284,30 +246,105 @@ def update(self, document: Mapping, query=None) -> List[str]:
_doc_id.append(_doc["_id_"])

# Write current state of Database
self._binary_file.write(self._database)
self._file_handler.write(self._database)

return _doc_id
return JsonArray(_doc_id)

def count(self, query=None, limit: tuple = None) -> int:
def rename(self, new_name: str) -> int:
"""
Return amount of Document found.
This method used to change the name of collection.
Takes current name & new name to change name of the collection.
:param query: Condition to search Document.
:param limit: If there is any limit.
:return: (int) amount of Document found.
:param new_name: New name for collection.
:return: Amount of affected collection.
"""
count = len(self.find(query=query, limit=limit))

# Initiating counter
count = 0

# Checking the collection is already exist or not
if new_name not in self._database.keys():
# Creating new collection and
# Putting old data into new collection
self._database[new_name] = self._collection

# Writing Current database status into the file
self._file_handler.write(self._database)

# Remove old collection
self.drop()

# Increasing counter
count += 1

return count

def drop(self) -> int:
"""
Deletes the selected collection from the database
:return: Amount of affected collection
"""

# Initiating counter
count = 0

# Getting database
_database = self._file_handler.read()

# Check database has the collection or not
if self._col_name in _database.keys():
# Removing collection from database
_database.pop(self._col_name)

# Writing current status of database into the file system.
self._file_handler.write(_database)

# Increasing counter
count += 1

return count

# ----------------------------------------------------------------#
def _get_database(self) -> Document:
"""
Getting Database
:return: Database
"""
# Get the data of existing Database or empty database.
database = Document(self._file_handler.read(), False)

return database

def _get_collection(self) -> JsonArray:
"""
Getting Collection
:return: Collection
"""
# Initiate a default Collection.
# Check the Collection is already exists or no.
if self._col_name in self._database.keys():

# Get the existing Collection
_collection: JsonArray = self._database[self._col_name]

else:
# Create new Collection
self._database[self._col_name] = JsonArray([])
_collection = self._database[self._col_name]

return JsonArray(_collection)

def _reset_cursor(self) -> None:
"""
Reset Cursor Pointer to 0th index
:return: None
"""
self._cursor = 0

def _find_document_by_query(self, query: Mapping) -> List:
def _find_document_by_query(self, query: Mapping) -> JsonArray | None:
"""
Finds a single ``Document`` of ``Collection``.
Expand Down Expand Up @@ -371,20 +408,12 @@ def _find_document_by_query(self, query: Mapping) -> List:
else:
return None

return result
return JsonArray(result)

# ======================== #
def _doc_is_exists(self, doc_id: str) -> bool:
# Iterate over all Documents of Collection
for doc in self._collection:
if doc["_id_"] == doc_id:
return True

return False

def _find_document_by_id(self, doc_id) -> Document:
for doc in self._collection:
if doc["_id_"] == doc_id:
return doc
else:
return None
Loading

0 comments on commit fb2328a

Please sign in to comment.