Skip to content

Commit

Permalink
jobs: apply code upgrades
Browse files Browse the repository at this point in the history
  • Loading branch information
ntarocco committed Oct 14, 2024
1 parent 0903904 commit a24c5ea
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 40 deletions.
43 changes: 4 additions & 39 deletions invenio_vocabularies/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,42 +9,17 @@
"""Jobs module."""

import datetime
from datetime import timezone

from invenio_i18n import gettext as _
from invenio_jobs.jobs import JobType
from marshmallow import Schema, fields
from marshmallow_utils.fields import TZDateTime

from invenio_vocabularies.services.tasks import process_datastream


class ArgsSchema(Schema):
"""Schema of task input arguments."""

since = TZDateTime(
timezone=timezone.utc,
format="iso",
metadata={
"description": _(
"YYYY-MM-DD HH:mm format. "
"Leave field empty if it should continue since last successful run."
)
},
)
job_arg_schema = fields.String(
metadata={"type": "hidden"},
dump_default="ArgsSchema",
load_default="ArgsSchema",
)


class ProcessDataStreamJob(JobType):
"""Generic process data stream job type."""

arguments_schema = ArgsSchema
task = process_datastream
id = None


class ProcessRORAffiliationsJob(ProcessDataStreamJob):
Expand All @@ -55,13 +30,8 @@ class ProcessRORAffiliationsJob(ProcessDataStreamJob):
id = "process_ror_affiliations"

@classmethod
def default_args(cls, job_obj, since=None, **kwargs):
"""Generate default job arguments here."""
if since is None and job_obj.last_runs["success"]:
since = job_obj.last_runs["success"].started_at
else:
since = since or datetime.datetime.now()

def build_task_arguments(cls, job_obj, since=None, **kwargs):
"""Process ROR affiliations."""
# NOTE: Update is set to False for now given we don't have the logic to re-index dependent records yet.
# Since jobs support custom args, update true can be passed via that.
return {
Expand Down Expand Up @@ -98,13 +68,8 @@ class ProcessRORFundersJob(ProcessDataStreamJob):
id = "process_ror_funders"

@classmethod
def default_args(cls, job_obj, since=None, **kwargs):
"""Generate default job arguments here."""
if since is None and job_obj.last_runs["success"]:
since = job_obj.last_runs["success"].started_at
else:
since = since or datetime.datetime.now()

def build_task_arguments(cls, job_obj, since=None, **kwargs):
"""Process ROR funders."""
# NOTE: Update is set to False for now given we don't have the logic to re-index dependent records yet.
# Since jobs support custom args, update true can be passed via that.
return {
Expand Down
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ install_requires =
invenio-i18n>=2.0.0,<3.0.0
invenio-records-resources>=6.0.0,<7.0.0
invenio-administration>=2.0.0,<3.0.0
invenio-jobs>=1.0.0,<2.0.0
invenio-jobs>=2.0.0,<3.0.0
lxml>=4.5.0
PyYAML>=5.4.1
regex>=2024.7.24
Expand Down

0 comments on commit a24c5ea

Please sign in to comment.