diff --git a/asyncwhois/__init__.py b/asyncwhois/__init__.py index 8607dff..8be353c 100644 --- a/asyncwhois/__init__.py +++ b/asyncwhois/__init__.py @@ -42,12 +42,13 @@ "GeneralError", "QueryError", ] -__version__ = "1.1.7" +__version__ = "1.1.8" def whois( search_term: Union[str, ipaddress.IPv4Address, ipaddress.IPv6Address], - authoritative_only: bool = False, + authoritative_only: bool = False, # todo: deprecate and remove this argument + find_authoritative_server: bool = True, ignore_not_found: bool = False, proxy_url: str = None, timeout: int = 10, @@ -59,8 +60,11 @@ def whois( otherwise a DNS search is performed. :param search_term: Any domain, URL, IPv4, or IPv6 - :param authoritative_only: If False (default), asyncwhois returns the entire WHOIS query chain, + :param authoritative_only: DEPRECATED - If False (default), asyncwhois returns the entire WHOIS query chain, otherwise if True, only the authoritative response is returned. + :param find_authoritative_server: This parameter only applies to domain queries. If True (default), asyncwhois + will attempt to find the authoritative response, otherwise if False, asyncwhois will only query the whois server + associated with the given TLD as specified in the IANA root db (`asyncwhois/servers/domains.py`). :param ignore_not_found: If False (default), the `NotFoundError` exception is raised if the query output contains "no such domain" language. If True, asyncwhois will not raise `NotFoundError` exceptions. :param proxy_url: Optional SOCKS4 or SOCKS5 proxy url (e.g. 'socks5://host:port') @@ -85,6 +89,7 @@ def whois( except (ipaddress.AddressValueError, ValueError): return DomainClient( authoritative_only=authoritative_only, + find_authoritative_server=find_authoritative_server, ignore_not_found=ignore_not_found, proxy_url=proxy_url, timeout=timeout, @@ -140,7 +145,8 @@ def rdap( async def aio_whois( search_term: str, - authoritative_only: bool = False, + authoritative_only: bool = False, # todo: deprecate and remove this argument + find_authoritative_server: bool = True, ignore_not_found: bool = False, proxy_url: str = None, timeout: int = 10, @@ -152,8 +158,11 @@ async def aio_whois( otherwise a DNS search is performed. :param search_term: Any domain, URL, IPv4, or IPv6 - :param authoritative_only: If False (default), asyncwhois returns the entire WHOIS query chain, + :param authoritative_only: DEPRECATED - If False (default), asyncwhois returns the entire WHOIS query chain, otherwise if True, only the authoritative response is returned. + :param find_authoritative_server: This parameter only applies to domain queries. If True (default), asyncwhois + will attempt to find the authoritative response, otherwise if False, asyncwhois will only query the whois server + associated with the given TLD as specified in the IANA root db (`asyncwhois/servers/domains.py`). :param ignore_not_found: If False (default), the `NotFoundError` exception is raised if the query output contains "no such domain" language. If True, asyncwhois will not raise `NotFoundError` exceptions. :param proxy_url: Optional SOCKS4 or SOCKS5 proxy url (e.g. 'socks5://host:port') @@ -182,6 +191,7 @@ async def aio_whois( proxy_url=proxy_url, timeout=timeout, tldextract_obj=tldextract_obj, + find_authoritative_server=find_authoritative_server, ).aio_whois(search_term) else: return "", {} diff --git a/asyncwhois/client.py b/asyncwhois/client.py index 5750df7..1b8998d 100644 --- a/asyncwhois/client.py +++ b/asyncwhois/client.py @@ -49,6 +49,7 @@ class DomainClient(Client): def __init__( self, authoritative_only: bool = False, + find_authoritative_server: bool = True, ignore_not_found: bool = False, proxy_url: str = None, whodap_client: whodap.DNSClient = None, @@ -61,7 +62,11 @@ def __init__( self.proxy_url = proxy_url self.timeout = timeout self.tldextract_obj = tldextract_obj - self.query_obj = DomainQuery(proxy_url=proxy_url, timeout=timeout) + self.query_obj = DomainQuery( + proxy_url=proxy_url, + timeout=timeout, + find_authoritative_server=find_authoritative_server, + ) self.parse_obj = DomainParser(ignore_not_found=ignore_not_found) def _get_domain_components(self, domain: str) -> tuple[str, str, str]: diff --git a/asyncwhois/query.py b/asyncwhois/query.py index e204e63..7e2c2da 100644 --- a/asyncwhois/query.py +++ b/asyncwhois/query.py @@ -19,9 +19,15 @@ class Query: refer_regex = r"refer: *(.+)" whois_server_regex = r".+ whois server: *(.+)" - def __init__(self, proxy_url: str = None, timeout: int = 10): + def __init__( + self, + proxy_url: str = None, + timeout: int = 10, + find_authoritative_server: bool = True, + ): self.proxy_url = proxy_url self.timeout = timeout + self.find_authoritative_server = find_authoritative_server @staticmethod def _find_match(regex: str, blob: str) -> str: @@ -117,6 +123,16 @@ def run(self, search_term: str, server: str = None) -> list[str]: server_regex = self.whois_server_regex return self._do_query(server, data, server_regex, []) + @staticmethod + def _continue_querying(current_server: str, next_server: str) -> bool: + next_server = next_server.lower() + return ( + next_server + and next_server != current_server + and not next_server.startswith("http") + and not next_server.startswith("www.") + ) + async def aio_run(self, search_term: str, server: str = None) -> list[str]: data = search_term + "\r\n" if not server: @@ -141,19 +157,16 @@ def _do_query( query_output = self._send_and_recv(conn, data) # save query chain chain.append(query_output) - # parse response for the referred WHOIS server name - whois_server = self._find_match(regex, query_output) - whois_server = whois_server.lower() - if ( - whois_server - and whois_server != server - and not whois_server.startswith("http") - and not whois_server.startswith("www.") - ): - # recursive call to find more authoritative server - chain = self._do_query( - whois_server, data, self.whois_server_regex, chain - ) + # if we should find the authoritative response, + # then parse the response for the next server + if self.find_authoritative_server: + # parse response for the referred WHOIS server name + whois_server = self._find_match(regex, query_output) + if self._continue_querying(server, whois_server): + # recursive call to find more authoritative server + chain = self._do_query( + whois_server, data, self.whois_server_regex, chain + ) # return the WHOIS query chain return chain @@ -171,20 +184,16 @@ async def _aio_do_query( self._aio_send_and_recv(reader, writer, data), self.timeout ) chain.append(query_output) - # parse response for the referred WHOIS server name - whois_server = self._find_match(regex, query_output) - whois_server = whois_server.lower() - # check for another legitimate server name - if ( - whois_server - and whois_server != server - and not whois_server.startswith("http") - and not whois_server.startswith("www.") - ): - # recursive call to find the authoritative server - chain = await self._aio_do_query( - whois_server, data, self.whois_server_regex, chain - ) + # if we should find the authoritative response, + # then parse the response for the next server + if self.find_authoritative_server: + # parse response for the referred WHOIS server name + whois_server = self._find_match(regex, query_output) + if self._continue_querying(server, whois_server): + # recursive call to find more authoritative server + chain = self._do_query( + whois_server, data, self.whois_server_regex, chain + ) # return the WHOIS query chain return chain @@ -195,8 +204,9 @@ def __init__( server: str = None, proxy_url: str = None, timeout: int = 10, + find_authoritative_server: bool = True, ): - super().__init__(proxy_url, timeout) + super().__init__(proxy_url, timeout, find_authoritative_server) self.server = server @staticmethod