-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcreate_crawl_job.py
59 lines (49 loc) · 1.33 KB
/
create_crawl_job.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import requests
from datetime import datetime
from airflow.decorators import task
from airflow import DAG
import sqlite3
from settings import OXY_USERNAME, OXY_PASSWORD, DB_FILE_PATH
OXYLABS_CRAWLER_PAYLOAD = {
"url": "https://books.toscrape.com",
"filters": {
"crawl": [
"page-\\d+\\.html"
],
"process": [
"https://books\\.toscrape\\.com/catalogue/((?!category).)*\\d+/index\\.html"
],
"max_depth": 1
},
"output": {
"type_": "sitemap",
"aggregate_chunk_size_bytes": 1073741824
},
}
@task(task_id="create_crawl_job")
def create_crawl_job():
response = requests.post(
url="https://ect.oxylabs.io/v1/jobs",
auth=(OXY_USERNAME, OXY_PASSWORD),
json=OXYLABS_CRAWLER_PAYLOAD
)
job_id = response.json().get("id")
conn = sqlite3.connect(DB_FILE_PATH)
cursor = conn.cursor()
cursor.execute(
f"""
INSERT INTO crawl_jobs (job_id, status)
VALUES ('{job_id}', 'pending');
"""
)
conn.commit()
with DAG(
"create_crawl_job",
description="Crawl target website.",
schedule_interval="0 7 * * *",
start_date=datetime(2022, month=6, day=6, hour=13),
catchup=False,
tags=["webinar"],
default_args={"owner": "airflow"},
) as dag:
(create_crawl_job())