Skip to content

Commit

Permalink
Atomic retryable unit of works with both request & spider.process
Browse files Browse the repository at this point in the history
  • Loading branch information
Yomguithereal committed Oct 2, 2024
1 parent 61c3653 commit cc0ac92
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 37 deletions.
5 changes: 4 additions & 1 deletion ftest/crawlers/echojs.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,10 @@ def process(self, job: CrawlJob, response: Response) -> SpiderResult:

class EchoJSCrawler(Crawler):
def __init__(self, **kwargs):
super().__init__(EchoJSSpider(), **kwargs)
super().__init__(
EchoJSSpider(),
**kwargs,
)


def factory(**crawler_kwargs):
Expand Down
89 changes: 55 additions & 34 deletions minet/crawl/crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
if TYPE_CHECKING:
from playwright.async_api import BrowserContext
from minet.browser import ThreadsafeBrowser
from minet.web import Response

from os import makedirs
from os.path import join
Expand Down Expand Up @@ -189,53 +190,73 @@ def __call__(
if cancel_event.is_set():
return

try:
retryer = getattr(self.local_context, "retryer", None)
request_fn = (
request
if self.crawler.browser is None
else self.crawler.browser.request
)
request_fn = (
request
if self.crawler.browser is None
else self.crawler.browser.request
)

# NOTE: we create an atomic unit of work that will retry both the request
# and the subsequent spider processing
response = None

# NOTE: the function takes "url" and "raise_on_statuses" because of RequestRetrying quirks
def retryable_work(
url: str, raise_on_statuses=None
) -> Optional[Tuple["Response", Any, Any]]:
nonlocal response

try:
response = request_fn(
url, raise_on_statuses=raise_on_statuses, **kwargs
)

except CancelledRequestError:
return

if retryer is not None:
response = retryer(request_fn, job.url, **kwargs)
if cancel_event.is_set():
return

spider_result = spider.process(job, response)

if spider_result is not None:
try:
data, next_jobs = spider_result
except (ValueError, TypeError):
raise TypeError(
'Spider.process is expected to return either None or a 2-tuple containing data and next targets to enqueue. Got a "%s" instead.'
% spider_result.__class__.__name__
)
else:
response = request_fn(job.url, **kwargs)
data = None
next_jobs = None

# If end url is different from job we add the url to visited cache
# NOTE: this is somewhat subject to race conditions but it should
# be benign and still be useful in some cases.
if self.crawler.url_cache is not None and job.url != response.end_url:
with self.crawler.enqueue_lock:
self.crawler.url_cache.add(response.end_url)
return response, data, next_jobs

except CancelledRequestError:
return
retryer = getattr(self.local_context, "retryer", None)

try:
if retryer is None:
output = retryable_work(job.url)
else:
output = retryer(retryable_work, job.url)

except EXPECTED_WEB_ERRORS as error:
return ErroredCrawlResult(job, error), None

if cancel_event.is_set():
return

try:
spider_result = spider.process(job, response)
except Exception as reason:
if response is None:
raise

raise CrawlerSpiderProcessError(
reason=reason, job=job, response=response
)

if spider_result is not None:
try:
data, next_jobs = spider_result
except (ValueError, TypeError):
raise TypeError(
'Spider.process is expected to return either None or a 2-tuple containing data and next targets to enqueue. Got a "%s" instead.'
% spider_result.__class__.__name__
)
else:
data = None
next_jobs = None
# Was cancelled?
if output is None:
return

response, data, next_jobs = output

if cancel_event.is_set():
return
Expand Down
3 changes: 2 additions & 1 deletion minet/executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,8 @@ def cancel(self) -> None:

def shutdown(self, wait=True) -> None:
self.cancel()
self.pool_manager.clear()
if hasattr(self, "pool_manager"):
self.pool_manager.clear()
return super().shutdown(wait=wait)

@overload
Expand Down
2 changes: 1 addition & 1 deletion minet/web.py
Original file line number Diff line number Diff line change
Expand Up @@ -1236,7 +1236,7 @@ def __init__(
super().__init__(*args, **kwargs)

def __call__(self, fn, *args, **kwargs):
if self._invalid_statuses is not None and fn in (request, resolve):
if self._invalid_statuses is not None:
kwargs["raise_on_statuses"] = self._invalid_statuses

return super().__call__(fn, *args, **kwargs)
Expand Down

0 comments on commit cc0ac92

Please sign in to comment.