Skip to content

Commit

Permalink
renaming
Browse files Browse the repository at this point in the history
  • Loading branch information
sanderegg committed Nov 25, 2024
1 parent 5e9cf33 commit 1a9fe6b
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ async def _get_pipeline_dag(project_id: ProjectID, db_engine: Engine) -> nx.DiGr
get_redis_client_from_app,
lock_key=get_redis_lock_key(MODULE_NAME_SCHEDULER, unique_lock_key_builder=None),
)
async def schedule_pipelines(app: FastAPI) -> None:
async def schedule_all_pipelines(app: FastAPI) -> None:
with log_context(_logger, logging.DEBUG, msg="scheduling pipelines"):
db_engine = get_db_engine(app)
runs_to_schedule = await CompRunsRepository.instance(db_engine).list(
Expand All @@ -142,7 +142,7 @@ async def schedule_pipelines(app: FastAPI) -> None:

async def setup_manager(app: FastAPI) -> None:
app.state.scheduler_manager = start_periodic_task(
schedule_pipelines,
schedule_all_pipelines,
interval=SCHEDULER_INTERVAL,
task_name=MODULE_NAME_SCHEDULER,
app=app,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from simcore_service_director_v2.modules.comp_scheduler._manager import (
SCHEDULER_INTERVAL,
run_new_pipeline,
schedule_pipelines,
schedule_all_pipelines,
stop_pipeline,
)
from simcore_service_director_v2.modules.comp_scheduler._models import (
Expand Down Expand Up @@ -66,25 +66,25 @@ def with_fast_scheduling(mocker: MockerFixture) -> None:


@pytest.fixture
def mocked_schedule_pipelines(mocker: MockerFixture) -> mock.Mock:
def mocked_schedule_all_pipelines(mocker: MockerFixture) -> mock.Mock:
return mocker.patch(
"simcore_service_director_v2.modules.comp_scheduler._manager.schedule_pipelines",
"simcore_service_director_v2.modules.comp_scheduler._manager.schedule_all_pipelines",
autospec=True,
)


async def test_manager_starts_and_auto_schedules_pipelines(
with_fast_scheduling: None,
with_disabled_scheduler_worker: mock.Mock,
mocked_schedule_pipelines: mock.Mock,
mocked_schedule_all_pipelines: mock.Mock,
initialized_app: FastAPI,
sqlalchemy_async_engine: AsyncEngine,
):
await assert_comp_runs_empty(sqlalchemy_async_engine)
mocked_schedule_pipelines.assert_called()
mocked_schedule_all_pipelines.assert_called()


async def test_schedule_pipelines_empty_db(
async def test_schedule_all_pipelines_empty_db(
with_disabled_auto_scheduling: mock.Mock,
with_disabled_scheduler_worker: mock.Mock,
initialized_app: FastAPI,
Expand All @@ -93,7 +93,7 @@ async def test_schedule_pipelines_empty_db(
):
await assert_comp_runs_empty(sqlalchemy_async_engine)

await schedule_pipelines(initialized_app)
await schedule_all_pipelines(initialized_app)

# check nothing was distributed
scheduler_rabbit_client_parser.assert_not_called()
Expand All @@ -102,7 +102,7 @@ async def test_schedule_pipelines_empty_db(
await assert_comp_runs_empty(sqlalchemy_async_engine)


async def test_schedule_pipelines_concurently_runs_exclusively_and_raises(
async def test_schedule_all_pipelines_concurently_runs_exclusively_and_raises(
with_disabled_auto_scheduling: mock.Mock,
initialized_app: FastAPI,
mocker: MockerFixture,
Expand All @@ -124,7 +124,7 @@ async def slow_limited_gather(*args, **kwargs):
)

results = await asyncio.gather(
*(schedule_pipelines(initialized_app) for _ in range(CONCURRENCY)),
*(schedule_all_pipelines(initialized_app) for _ in range(CONCURRENCY)),
return_exceptions=True,
)

Expand All @@ -135,7 +135,7 @@ async def slow_limited_gather(*args, **kwargs):
mock_function.assert_called_once()


async def test_schedule_pipelines(
async def test_schedule_all_pipelines(
with_disabled_auto_scheduling: mock.Mock,
with_disabled_scheduler_worker: mock.Mock,
initialized_app: FastAPI,
Expand Down Expand Up @@ -178,7 +178,7 @@ async def test_schedule_pipelines(
start_modified_time = comp_run.modified

# this will now not schedule the pipeline since it was last scheduled
await schedule_pipelines(initialized_app)
await schedule_all_pipelines(initialized_app)
scheduler_rabbit_client_parser.assert_not_called()
comp_runs = await assert_comp_runs(sqlalchemy_async_engine, expected_total=1)
comp_run = comp_runs[0]
Expand All @@ -188,7 +188,7 @@ async def test_schedule_pipelines(

# this will now schedule the pipeline since the time passed
await asyncio.sleep(SCHEDULER_INTERVAL.total_seconds() + 1)
await schedule_pipelines(initialized_app)
await schedule_all_pipelines(initialized_app)
scheduler_rabbit_client_parser.assert_called_once_with(
SchedulePipelineRabbitMessage(
user_id=published_project.project.prj_owner,
Expand All @@ -211,7 +211,7 @@ async def test_schedule_pipelines(
user_id=published_project.project.prj_owner,
project_id=published_project.project.uuid,
)
await schedule_pipelines(initialized_app)
await schedule_all_pipelines(initialized_app)
scheduler_rabbit_client_parser.assert_called_once_with(
SchedulePipelineRabbitMessage(
user_id=published_project.project.prj_owner,
Expand Down

0 comments on commit 1a9fe6b

Please sign in to comment.