Skip to content

Commit

Permalink
Merge pull request #204 from peopledoc/admin-shell-38
Browse files Browse the repository at this point in the history
Implement an administration prompt
  • Loading branch information
Antoine Rozo authored May 29, 2020
2 parents 64472ff + a0e3e22 commit 82e923a
Show file tree
Hide file tree
Showing 15 changed files with 1,043 additions and 30 deletions.
27 changes: 27 additions & 0 deletions docs/howto/shell.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
Use the administration shell
----------------------------

The procrastinate shell is a tool to administrate jobs and overview queues and tasks.
It is an interactive shell that you can run with the following command.

*Experimental feature*.

.. code-block:: console
$ procrastinate shell
Welcome to the procrastinate shell. Type help or ? to list commands.
procrastinate> help
Documented commands (type help <topic>):
========================================
EOF cancel exit help list_jobs list_queues list_tasks retry
As usual, you should use ``--app`` argument or ``PROCRASTINATE_APP`` environment
variable to specify the application you want to use (see `command_line`).

There are commands to list all the jobs (``list_jobs``), tasks (``list_tasks``)
& queues (``list_queues``).
And commands to retry (``retry``) & cancel (``cancel``) a specific job.

You can get help for a specific command *cmd* by typing ``help cmd``.
1 change: 1 addition & 0 deletions docs/howto_index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ How to...
howto/defer
howto/worker
howto/command_line
howto/shell
howto/locks
howto/concurrency
howto/schedule
Expand Down
7 changes: 7 additions & 0 deletions docs/reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,10 @@ Exceptions
.. automodule:: procrastinate.exceptions
:members: ProcrastinateException, PoolAlreadySet, LoadFromPathError,
ConnectorException

Administration
--------------

.. autoclass:: procrastinate.admin.Admin
:members: list_jobs, list_jobs_async, list_queues, list_queues_async,
list_tasks, list_tasks_async, set_job_status, set_job_status_async
181 changes: 181 additions & 0 deletions procrastinate/admin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
from typing import Any, Dict, Iterable

from procrastinate import connector as connector_module
from procrastinate import sql, utils


@utils.add_sync_api
class Admin:
"""
The Admin is used to overview and administrate procrastinate jobs.
You should never need to instanciate an Admin object as it is
already available as `App.admin`.
"""

def __init__(self, connector: connector_module.BaseConnector):
"""
Parameters
----------
connector :
Instance of a subclass of :py:class:`procrastinate.connector.BaseConnector`,
typically `PostgresConnector`. It will be responsible for all communications
with the database. Mandatory.
"""
self.connector = connector

async def list_jobs_async(
self,
id: int = None,
queue: str = None,
task: str = None,
status: str = None,
lock: str = None,
) -> Iterable[Dict[str, Any]]:
"""
List all procrastinate jobs given query filters.
Parameters
----------
id : ``int``
Filter by job ID
queue : ``str``
Filter by job queue name
task : ``str``
Filter by job task name
status : ``str``
Filter by job status (*todo*/*doing*/*succeeded*/*failed*)
lock : ``str``
Filter by job lock
Returns
-------
``List[Dict[str, Any]]``
A list of dictionnaries representing jobs (``id``, ``queue``, ``task``,
``lock``, ``args``, ``status``, ``scheduled_at``, ``attempts``).
"""
return [
{
"id": row["id"],
"queue": row["queue_name"],
"task": row["task_name"],
"lock": row["lock"],
"args": row["args"],
"status": row["status"],
"scheduled_at": row["scheduled_at"],
"attempts": row["attempts"],
}
for row in await self.connector.execute_query_all(
query=sql.queries["list_jobs"],
id=id,
queue_name=queue,
task_name=task,
status=status,
lock=lock,
)
]

async def list_queues_async(
self, queue: str = None, task: str = None, status: str = None, lock: str = None,
) -> Iterable[Dict[str, Any]]:
"""
List all queues and number of jobs per status for each queue.
Parameters
----------
queue : ``str``
Filter by job queue name
task : ``str``
Filter by job task name
status : ``str``
Filter by job status (*todo*/*doing*/*succeeded*/*failed*)
lock : ``str``
Filter by job lock
Returns
-------
``List[Dict[str, Any]]``
A list of dictionnaries representing queues stats (``name``, ``jobs_count``,
``todo``, ``doing``, ``succeeded``, ``failed``).
"""
return [
{
"name": row["name"],
"jobs_count": row["jobs_count"],
"todo": row["stats"].get("todo", 0),
"doing": row["stats"].get("doing", 0),
"succeeded": row["stats"].get("succeeded", 0),
"failed": row["stats"].get("failed", 0),
}
for row in await self.connector.execute_query_all(
query=sql.queries["list_queues"],
queue_name=queue,
task_name=task,
status=status,
lock=lock,
)
]

async def list_tasks_async(
self, queue: str = None, task: str = None, status: str = None, lock: str = None,
) -> Iterable[Dict[str, Any]]:
"""
List all tasks and number of jobs per status for each task.
Parameters
----------
queue : ``str``
Filter by job queue name
task : ``str``
Filter by job task name
status : ``str``
Filter by job status (*todo*/*doing*/*succeeded*/*failed*)
lock : ``str``
Filter by job lock
Returns
-------
``List[Dict[str, Any]]``
A list of dictionnaries representing tasks stats (``name``, ``jobs_count``,
``todo``, ``doing``, ``succeeded``, ``failed``).
"""
return [
{
"name": row["name"],
"jobs_count": row["jobs_count"],
"todo": row["stats"].get("todo", 0),
"doing": row["stats"].get("doing", 0),
"succeeded": row["stats"].get("succeeded", 0),
"failed": row["stats"].get("failed", 0),
}
for row in await self.connector.execute_query_all(
query=sql.queries["list_tasks"],
queue_name=queue,
task_name=task,
status=status,
lock=lock,
)
]

async def set_job_status_async(self, id: int, status: str) -> Dict[str, Any]:
"""
Set/reset the status of a specific job.
Parameters
----------
id : ``int``
Job ID
status : ``str``
New job status (*todo*/*doing*/*succeeded*/*failed*)
Returns
-------
``Dict[str, Any]``
A dictionnary representing the job (``id``, ``queue``, ``task``,
``lock``, ``args``, ``status``, ``scheduled_at``, ``attempts``).
"""
await self.connector.execute_query(
query=sql.queries["set_job_status"], id=id, status=status,
)
(result,) = await self.list_jobs_async(id=id)
return result
7 changes: 7 additions & 0 deletions procrastinate/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging
from typing import TYPE_CHECKING, Any, Callable, Dict, Iterable, Optional, Set

from procrastinate import admin
from procrastinate import connector as connector_module
from procrastinate import healthchecks, jobs
from procrastinate import retry as retry_module
Expand Down Expand Up @@ -31,6 +32,8 @@ class App:
builtin_tasks : ``Dict[str, tasks.Task]``
The mapping of builtin tasks. Use it to programatically access builtin tasks, to
defer them.
admin : ``admin.Admin``
The administration interface linked to the application.
"""

@classmethod
Expand Down Expand Up @@ -259,3 +262,7 @@ def schema_manager(self) -> schema.SchemaManager:
@property
def health_check_runner(self) -> healthchecks.HealthCheckRunner:
return healthchecks.HealthCheckRunner(connector=self.connector)

@property
def admin(self) -> admin.Admin:
return admin.Admin(connector=self.connector)
12 changes: 11 additions & 1 deletion procrastinate/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import pendulum

import procrastinate
from procrastinate import connector, exceptions, jobs, types, utils, worker
from procrastinate import connector, exceptions, jobs, shell, types, utils, worker

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -327,6 +327,16 @@ def healthchecks(app: procrastinate.App):
click.echo(f"{status.value}: {count}")


@cli.command("shell")
@click.pass_obj
@handle_errors()
def shell_(app: procrastinate.App):
"""
Administration shell for procrastinate.
"""
shell.ProcrastinateShell(app.admin).cmdloop()


def main():
# https://click.palletsprojects.com/en/7.x/python3/
os.environ.setdefault("LC_ALL", "C.UTF-8")
Expand Down
110 changes: 110 additions & 0 deletions procrastinate/shell.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import cmd

from procrastinate import admin


def parse_argument(arg):
splitted_args = (item.partition("=") for item in arg.split())
return {key: value for key, _, value in splitted_args}


def print_job(job, details=False):
msg = f"#{job['id']} {job['task']} on {job['queue']} - [{job['status']}]"
if details:
msg += (
f" (attempts={job['attempts']}, "
f"scheduled_at={job['scheduled_at']}, args={job['args']}, "
f"lock={job['lock']})"
)
print(msg)


class ProcrastinateShell(cmd.Cmd):
intro = "Welcome to the procrastinate shell. Type help or ? to list commands.\n"
prompt = "procrastinate> "

def __init__(self, admin: admin.Admin):
super().__init__()
self.admin = admin

def do_EOF(self, _):
"Exit procrastinate shell."
return True

do_exit = do_EOF

def do_list_jobs(self, arg):
"""
List procrastinate jobs.
Usage: list_jobs [id=ID] [queue=QUEUE_NAME] [task=TASK_NAME] [status=STATUS]
[lock=LOCK] [details]
Jobs can be filtered by id, queue name, task name, status and lock.
Use the details argument to get more info about jobs.
Example: list_jobs queue=default task=sums status=failed details
"""
kwargs = parse_argument(arg)
details = kwargs.pop("details", None) is not None
for job in self.admin.list_jobs(**kwargs):
print_job(job, details=details)

def do_list_queues(self, arg):
"""
List procrastinate queues: get queues names and number of jobs per queue.
Usage: list_queues [queue=QUEUE_NAME] [task=TASK_NAME] [status=STATUS]
[lock=LOCK]
Jobs can be filtered by queue name, task name, status and lock.
Example: list_queues task=sums status=failed
"""
kwargs = parse_argument(arg)
for queue in self.admin.list_queues(**kwargs):
print(
f"{queue['name']}: {queue['jobs_count']} jobs ("
f"todo: {queue['todo']}, "
f"succeeded: {queue['succeeded']}, "
f"failed: {queue['failed']})"
)

def do_list_tasks(self, arg):
"""
List procrastinate tasks: get tasks names and number of jobs per task.
Usage: list_tasks [queue=QUEUE_NAME] [task=TASK_NAME] [status=STATUS]
[lock=LOCK]
Jobs can be filtered by queue name, task name, status and lock.
Example: list_queues queue=default status=failed
"""
kwargs = parse_argument(arg)
for task in self.admin.list_tasks(**kwargs):
print(
f"{task['name']}: {task['jobs_count']} jobs ("
f"todo: {task['todo']}, "
f"succeeded: {task['succeeded']}, "
f"failed: {task['failed']})"
)

def do_retry(self, arg):
"""
Retry a specific job (reset its status to todo).
Usage: retry JOB_ID
JOB_ID is the id (numeric) of the job.
Example: retry 2
"""
print_job(self.admin.set_job_status(arg, status="todo"))

def do_cancel(self, arg):
"""
Cancel a specific job (set its status to failed).
Usage: cancel JOB_ID
JOB_ID is the id (numeric) of the job.
Example: cancel 3
"""
print_job(self.admin.set_job_status(arg, status="failed"))
Loading

0 comments on commit 82e923a

Please sign in to comment.