Skip to content

Commit

Permalink
switch to Django 5.1, switch to psycopg3, switch db pool to native Dj…
Browse files Browse the repository at this point in the history
…ango pool
  • Loading branch information
s4ke committed Oct 30, 2024
1 parent 714393b commit 311beb3
Show file tree
Hide file tree
Showing 20 changed files with 460 additions and 442 deletions.
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ services:

postgres:
#command: "-c log_statement=all -c log_destination=stderr -c fsync=off -c synchronous_commit=off -c full_page_writes=off"
image: postgres:12
image: postgres:17
shm_size: '2gb'
environment:
POSTGRES_USER: ${POSTGRES_USER:-cephalopod}
Expand Down
23 changes: 11 additions & 12 deletions skipper/Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ djangorestframework-simplejwt = { version = "==5.3.1" }
# at least required for simplejwt crypto
cryptography = "==43.0.1"
django-extensions = "==3.2.3"
psycopg2 = "==2.9.9"
psycopg = {version = "==3.2.3", extras = ["binary", "pool"]}
pytz = "==2024.1"
django-enumchoicefield = "==1.1.0"
requests = "==2.32.2"
Expand Down Expand Up @@ -45,20 +45,19 @@ single-beat = "==0.6.3"
tornado = "==6.4.1"
redis = "==5.0.1"
async-timeout = "==4.0.3"
django-db-geventpool = "==4.0.7"
django-pg-queue = "==0.8.2"
django-health-check = "==3.18.1"
opentelemetry-api = "==1.23.0"
opentelemetry-sdk = "==1.23.0"
django-health-check = "==3.18.3"
opentelemetry-api = "==1.27.0"
opentelemetry-sdk = "==1.27.0"
opentelemetry-exporter-jaeger = "==1.21.0"
opentelemetry-instrumentation-django = "==0.44b0"
opentelemetry-instrumentation-celery = "==0.44b0"
opentelemetry-instrumentation-requests = "==0.44b0"
opentelemetry-instrumentation-botocore = "==0.44b0"
opentelemetry-instrumentation-psycopg2 = "==0.44b0"
opentelemetry-instrumentation-redis = "==0.44b0"
opentelemetry-instrumentation-django = "==0.48b0"
opentelemetry-instrumentation-celery = "==0.48b0"
opentelemetry-instrumentation-requests = "==0.48b0"
opentelemetry-instrumentation-botocore = "==0.48b0"
opentelemetry-instrumentation-psycopg = "==0.48b0"
opentelemetry-instrumentation-redis = "==0.48b0"
sqlparse = "==0.5.0"
Django = "==5.0.9"
Django = "==5.1.2"
Pillow = "==10.3.0"
FormEncode = "==2.0.1"
SQLAlchemy = "==1.4.51"
Expand Down
695 changes: 384 additions & 311 deletions skipper/Pipfile.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions skipper/liccheck.ini
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ authorized_licenses:
mit
historical permission notice and disclaimer (hpnd)
gnu library or lesser general public license (lgpl)
GNU Lesser General Public License v3 (LGPLv3)
academic free license (afl)
mozilla public license 2.0 (mpl 2.0)
ISC License (ISCL)
Expand Down
Empty file.
Empty file.
72 changes: 0 additions & 72 deletions skipper/skipper/core/db_backends/gevent/base.py

This file was deleted.

3 changes: 2 additions & 1 deletion skipper/skipper/dataseries/raw_sql/escape.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from typing import cast

from django.db import connections
from psycopg import sql
from psycopg2.extensions import quote_ident # type: ignore
from rest_framework.exceptions import APIException

Expand All @@ -18,4 +19,4 @@ def escape(string: str, connection_name: str = 'default') -> str:
if not validate_sql_string(string):
raise APIException('SQL may only contain a-zA-Z0-9_%\'()-" .')
with connections[connection_name].cursor() as cursor:
return cast(str, quote_ident(string, cursor.cursor))
return sql.Identifier(string).as_string(cursor.cursor)
57 changes: 36 additions & 21 deletions skipper/skipper/dataseries/storage/contract/file_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from django_multitenant.fields import TenantForeignKey # type: ignore
from django_multitenant.mixins import TenantModelMixin # type: ignore
from django_multitenant.models import TenantManager # type: ignore
from typing import List, Optional, Union, Protocol, Any
from typing import List, Optional, Union, Protocol, Any, Dict

from skipper.dataseries.models import FileLookup
from skipper.dataseries.raw_sql import dbtime
Expand Down Expand Up @@ -288,29 +288,44 @@ def delete_all_matching(
fact_id: Union[str, uuid.UUID],
history_data_point_identifiers: List[HistoryDataPointIdentifier],
) -> None:
query_str = f"""
UPDATE "_3_file_lookup"
SET "deleted_at" = %(deleted_at)s
WHERE
"tenant_id" = %(tenant_id)s AND
"data_series_id" = %(data_series_id)s AND
"fact_id" = %(fact_id)s AND
("data_point_id", "point_in_time", "sub_clock") IN %(data_point_identifiers)s AND
"deleted_at" IS NULL
"""

with sql_cursor(DATA_SERIES_DYNAMIC_SQL_DB) as cursor:
data_point_placeholders = []
params: Dict[str, Any] = {
'tenant_id': tenant_id,
'data_series_id': data_series_id,
'fact_id': fact_id,
'deleted_at': dbtime.now()
}

# Dynamically create named placeholders and add each to params
for i, elem in enumerate(history_data_point_identifiers):
dp_id_key = f"dp_id_{i}"
pt_key = f"pt_{i}"
sub_clock_key = f"sub_clock_{i}"

data_point_placeholders.append(f"(%({dp_id_key})s, %({pt_key})s, %({sub_clock_key})s)")

# Add each component to params with unique keys
params[dp_id_key] = elem.data_point_id
params[pt_key] = elem.point_in_time
params[sub_clock_key] = elem.sub_clock

# Join placeholders to create the final SQL IN clause
data_point_in_clause = ', '.join(data_point_placeholders)

query_str = f"""
UPDATE "_3_file_lookup"
SET "deleted_at" = %(deleted_at)s
WHERE
"tenant_id" = %(tenant_id)s AND
"data_series_id" = %(data_series_id)s AND
"fact_id" = %(fact_id)s AND
("data_point_id", "point_in_time", "sub_clock") IN ({data_point_in_clause}) AND
"deleted_at" IS NULL
"""
cursor.execute(
query_str,
{
'tenant_id': tenant_id,
'data_series_id': data_series_id,
'fact_id': fact_id,
'data_point_identifiers': tuple(
(elem.data_point_id, elem.point_in_time, elem.sub_clock) for elem in history_data_point_identifiers
),
'deleted_at': dbtime.now()
}
params
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,18 @@ def check_external_ids(
for chunk in chunks(external_ids, size=250):
if backend == StorageBackendType.DYNAMIC_SQL_NO_HISTORY.value\
or backend == StorageBackendType.DYNAMIC_SQL_MATERIALIZED_FLAT_HISTORY.value:
chunk = list(chunk)
with sql_cursor(DATA_SERIES_DYNAMIC_SQL_DB) as cursor:
placeholders = ', '.join(['%s'] * len(chunk))
sql = f"""
SELECT ds_dp.external_id
FROM {data_series_query_info.schema_name}.{data_series_query_info.main_query_table_name} ds_dp
WHERE ds_dp.external_id IN %s
WHERE ds_dp.external_id IN ({placeholders})
AND ds_dp.deleted_at IS NULL
"""
cursor.execute(
sql,
(tuple(chunk),)
chunk
)
external_ids_in_use.extend([{'external_id': x[0]} for x in cursor.fetchall()])
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

import datetime
from django.db import connections, transaction
from psycopg2 import sql # type: ignore

from skipper.core.models.tenant import Tenant
from skipper.dataseries.raw_sql import escape
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
# [2019] - [2024] © NeuroForge GmbH & Co. KG


from psycopg2 import sql # type: ignore
from psycopg import sql # type: ignore
from typing import Any, List, Optional

from skipper.dataseries.raw_sql import escape
Expand All @@ -30,7 +30,7 @@ def insert_to_flat_history_query(
:param data_series_external_id: The external id of the dataseries the flat history belongs to
:param user_id: the optional user id that wrote the data.
:param record_source: a string identifying where the data is coming from. Examples "REST PUT", "REST DELETE"
:param cursor: the psycopg2 cursor to use for this query
:param cursor: the psycopg cursor to use for this query
:param source_query: the query to use to fill the history table from. This may be an INSERT ... RETURNING * query or a simple SELECT
as long as the table has all the necessary columns.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

import datetime
from django.core.files.uploadedfile import InMemoryUploadedFile
from psycopg2.extras import execute_values # type: ignore
from typing import Iterable, Dict, Any, List, Tuple, Optional, Union, Sequence

from skipper.core.models.fields import default_media_storage
Expand Down Expand Up @@ -273,7 +272,7 @@ def _insert_or_update_data_points_impl(
with_statement = f"""
WITH "values_to_insert" AS (
SELECT {','.join(map(lambda x: f'{x[0]}::{x[1]}', zip(columns, column_types)))} FROM (
VALUES %s
VALUES ({', '.join(['%s'] * len(columns))})
) AS "t" ({','.join(columns)})
)
"""
Expand Down Expand Up @@ -344,9 +343,7 @@ def _insert_or_update_data_points_impl(
{central_insert}
"""

execute_values(
cursor,
cursor.executemany(
final_insert_sql,
all_values
)

Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from django.db import connections, transaction
from typing import Any, Dict, List, Mapping, Union
from psycopg2 import errors # type: ignore
from psycopg import errors # type: ignore
from skipper.core.lint import sql_cursor # type: ignore

from skipper.core.models.tenant import Tenant
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import datetime
from django.db import transaction
from django_multitenant.utils import set_current_tenant # type: ignore
from psycopg2 import errors # type: ignore
from typing import Dict, Callable

from skipper.core.celery import task
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import uuid
from django.db import transaction
from django_multitenant.utils import set_current_tenant # type: ignore
from psycopg2 import errors # type: ignore
from typing import List, Tuple, Dict, Union, Callable, Dict

from skipper.core.celery import task
Expand Down
2 changes: 1 addition & 1 deletion skipper/skipper/dataseries/tasks/metamodel.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import uuid
from psycopg2 import errors # type: ignore
from psycopg import errors # type: ignore
from typing import Dict, Callable, Union
from skipper.core.celery import task
from skipper.dataseries.models.task_data import MetaModelTaskData
Expand Down
4 changes: 0 additions & 4 deletions skipper/skipper/gunicorn.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
# [2019] - [2024] © NeuroForge GmbH & Co. KG


from psycogreen.gevent import patch_psycopg # type: ignore
from gevent import monkey # type: ignore
import logging
from typing import Any
Expand All @@ -22,9 +21,6 @@ def post_fork(server: Any, worker: Any) -> None:

monkey.patch_all()
server.log.info('gunicorn post_fork: successfully used gevent patch call')
patch_psycopg()
server.log.info('gunicorn post_fork: successfully patched psycopg2 to be compatible with gevent')

from skipper import telemetry

telemetry.setup_telemetry_django()
20 changes: 15 additions & 5 deletions skipper/skipper/settings_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,13 +314,17 @@ def task_upstream_dashboard(tenant: Tenant, user: Optional[Union[User, Anonymous
} if environment.SKIPPER_DB_SSL_ENABLE else {}
)
_db_engine = (
'skipper.core.db_backends.gevent'
'django.db.backends.postgresql'
if skipper_container_type in ['DJANGO', 'DJANGO_INTERNAL'] and not environment.SKIPPER_TESTING and not TYPE_CHECKING and not (os.environ.get('MYPY_RUN', 'false') == 'true')
else 'django.db.backends.postgresql_psycopg2'
else 'django.db.backends.postgresql'
)
_db_options = (
{
'MAX_CONNS': max(2, environment.SKIPPER_GUNICORN_WORKER_CONCURRENCY)
'pool': {
'min_size': max(2, environment.SKIPPER_GUNICORN_WORKER_CONCURRENCY),
'max_size': max(2, environment.SKIPPER_GUNICORN_WORKER_CONCURRENCY),
'timeout': 10
}
}
if skipper_container_type in ['DJANGO', 'DJANGO_INTERNAL'] and not environment.SKIPPER_TESTING and not TYPE_CHECKING and not (os.environ.get('MYPY_RUN', 'false') == 'true')
else {}
Expand Down Expand Up @@ -434,13 +438,19 @@ def task_upstream_dashboard(tenant: Tenant, user: Optional[Union[User, Anonymous
AWS_S3_ENDPOINT_URL = environment.SKIPPER_S3_INTERNAL_ENDPOINT_URL
AWS_S3_REGION_NAME = 'eu-west-1'

STATICFILES_STORAGE = 'skipper.core.storage.static.S3Boto3StaticStorage'
STORAGES = {
'staticfiles': {
'BACKEND': 'skipper.core.storage.static.S3Boto3StaticStorage'
},
'default': {
'BACKEND': 'skipper.core.storage.media.S3Boto3MediaStorage'
}
}
# by default use the authenticated one
AWS_STORAGE_BUCKET_NAME = environment.SKIPPER_S3_MEDIA_BUCKET_NAME
NF_AWS_STORAGE_BUCKET_NAME_STATIC = environment.SKIPPER_S3_STATIC_BUCKET_NAME
NF_AWS_QUERYSTRING_AUTH_STATIC = False

DEFAULT_FILE_STORAGE = 'skipper.core.storage.media.S3Boto3MediaStorage'
NF_AWS_STORAGE_BUCKET_NAME_MEDIA = environment.SKIPPER_S3_MEDIA_BUCKET_NAME


Expand Down
1 change: 0 additions & 1 deletion skipper/skipper/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ def _instrument() -> None:
CeleryInstrumentor().instrument() # type: ignore
RequestsInstrumentor().instrument()
BotocoreInstrumentor().instrument() # type: ignore
Psycopg2Instrumentor().instrument()
RedisInstrumentor().instrument()


Expand Down

0 comments on commit 311beb3

Please sign in to comment.