Skip to content

Commit

Permalink
feat: import product DB every day (#119)
Browse files Browse the repository at this point in the history
  • Loading branch information
raphael0202 authored Jan 5, 2024
1 parent 970ff2c commit b60c3ac
Show file tree
Hide file tree
Showing 15 changed files with 315 additions and 52 deletions.
4 changes: 3 additions & 1 deletion .env
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,6 @@ POSTGRES_PASSWORD=postgres
POSTGRES_HOST=postgres
POSTGRES_PORT=5432

POSTGRES_EXPOSE=127.0.0.1:5433
POSTGRES_EXPOSE=127.0.0.1:5432

ENVIRONMENT=net
3 changes: 3 additions & 0 deletions .github/workflows/container-deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,12 @@ jobs:
if: matrix.env == 'open-prices-net'
run: |
echo "SSH_HOST=10.1.0.200" >> $GITHUB_ENV
echo "ENVIRONMENT=net" >> $GITHUB_ENV
- name: Set various variable for production deployment
if: matrix.env == 'open-prices-org'
run: |
echo "SSH_HOST=10.1.0.201" >> $GITHUB_ENV
echo "ENVIRONMENT=org" >> $GITHUB_ENV
- name: Wait for docker image container build workflow
uses: tomchv/[email protected]
id: wait-build
Expand Down Expand Up @@ -121,6 +123,7 @@ jobs:
echo "POSTGRES_DB=postgres" >> .env
echo "POSTGRES_USER=postgres" >> .env
echo "POSTGRES_PASSWORD=${{ secrets.POSTGRES_PASSWORD }}" >> .env
echo "ENVIRONMENT=${{ env.ENVIRONMENT }}" >> .env
- name: Create Docker volumes
uses: appleboy/ssh-action@master
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
"""add product.unique_scans_n column
Revision ID: 3f8d293e669e
Revises: 24d71d56d493
Create Date: 2024-01-05 09:23:21.726450
"""
from typing import Sequence, Union

import sqlalchemy as sa

from alembic import op

# revision identifiers, used by Alembic.
revision: str = "3f8d293e669e"
down_revision: Union[str, None] = "24d71d56d493"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.add_column("products", sa.Column("unique_scans_n", sa.Integer(), nullable=True))
# ### end Alembic commands ###


def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column("products", "unique_scans_n")
# ### end Alembic commands ###
4 changes: 4 additions & 0 deletions app/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from app.cli.main import main

if __name__ == "__main__":
main()
29 changes: 29 additions & 0 deletions app/cli/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import typer

app = typer.Typer()


@app.command()
def import_product_db() -> None:
"""Import from DB JSONL dump to insert/update product table."""
from app.db import session
from app.tasks import import_product_db
from app.utils import get_logger

get_logger()
db = session()
import_product_db(db)


@app.command()
def run_scheduler() -> None:
"""Launch the scheduler."""
from app import scheduler
from app.utils import get_logger

get_logger()
scheduler.run()


def main() -> None:
app()
2 changes: 2 additions & 0 deletions app/config.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from enum import Enum
from pathlib import Path

from openfoodfacts import Environment
from pydantic import Field
from pydantic_settings import BaseSettings, SettingsConfigDict

Expand Down Expand Up @@ -42,6 +43,7 @@ class Settings(BaseSettings):
sentry_dns: str | None = None
log_level: LoggingLevel = LoggingLevel.INFO
images_dir: Path = STATIC_DIR / "img"
environment: Environment = Environment.org

model_config = SettingsConfigDict(env_file=".env", extra="ignore")

Expand Down
1 change: 1 addition & 0 deletions app/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class Product(Base):
product_quantity = Column(Integer)
brands = Column(String)
image_url = Column(String)
unique_scans_n = Column(Integer, nullable=True)

prices: Mapped[list["Price"]] = relationship(back_populates="product")

Expand Down
24 changes: 24 additions & 0 deletions app/scheduler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from apscheduler.executors.pool import ThreadPoolExecutor
from apscheduler.jobstores.memory import MemoryJobStore
from apscheduler.schedulers.blocking import BlockingScheduler
from openfoodfacts.utils import get_logger

from app.db import session
from app.tasks import import_product_db

logger = get_logger(__name__)


def import_product_db_job():
db = session()
import_product_db(db=db)


def run():
scheduler = BlockingScheduler()
scheduler.add_executor(ThreadPoolExecutor(20))
scheduler.add_jobstore(MemoryJobStore())
scheduler.add_job(
import_product_db_job, "cron", max_instances=1, hour=6, minute=0, jitter=60
)
scheduler.start()
4 changes: 4 additions & 0 deletions app/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ class ProductBase(ProductCreate):
"https://images.openfoodfacts.org/images/products/800/150/500/5707/front_fr.161.400.jpg"
],
)
unique_scans_n: int | None = Field(
description="number of unique scans of the product on Open Food Facts.",
examples=[15],
)
created: datetime.datetime = Field(description="datetime of the creation.")
updated: datetime.datetime | None = Field(
description="datetime of the last update."
Expand Down
124 changes: 124 additions & 0 deletions app/tasks.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,25 @@
import datetime

import tqdm
from openfoodfacts import DatasetType, Flavor, ProductDataset
from openfoodfacts.images import generate_image_url
from openfoodfacts.types import JSONType
from openfoodfacts.utils import get_logger
from sqlalchemy import or_, select
from sqlalchemy.orm import Session

from app import crud
from app.config import settings
from app.models import Product
from app.schemas import LocationCreate, PriceBase, ProductCreate
from app.utils import (
OFF_FIELDS,
fetch_location_openstreetmap_details,
fetch_product_openfoodfacts_details,
)

logger = get_logger(__name__)


def create_price_product(db: Session, price: PriceBase):
# The price may not have a product code, if it's the price of a
Expand Down Expand Up @@ -46,3 +59,114 @@ def create_price_location(db: Session, price: PriceBase):
crud.update_location(
db, location=db_location, update_dict=location_openstreetmap_details
)


def generate_main_image_url(code: str, images: JSONType, lang: str) -> str | None:
"""Generate the URL of the main image of a product.
:param code: The code of the product
:param images: The images of the product
:param lang: The main language of the product
:return: The URL of the main image of the product or None if no image is
available.
"""
image_key = None
if f"front_{lang}" in images:
image_key = f"front_{lang}"
else:
for key in (k for k in images if k.startswith("front_")):
image_key = key
break

if image_key:
image_rev = images[image_key]["rev"]
image_id = f"{image_key}.{image_rev}.400"
return generate_image_url(
code, image_id=image_id, flavor=Flavor.off, environment=settings.environment
)

return None


def import_product_db(db: Session, batch_size: int = 1000):
"""Import from DB JSONL dump to insert/update product table.
:param db: the session to use
:param batch_size: the number of products to insert/update in a single
transaction, defaults to 1000
"""
logger.info("Launching import_product_db")
existing_codes = set(db.execute(select(Product.code)).scalars())
logger.info("Number of existing codes: %d", len(existing_codes))
dataset = ProductDataset(
dataset_type=DatasetType.jsonl, force_download=True, download_newer=True
)

added_count = 0
updated_count = 0
buffer_len = 0
# the dataset was created after the start of the day, every product updated
# after should be skipped, as we don't know the exact creation time of the
# dump
start_datetime = datetime.datetime.now(tz=datetime.timezone.utc).replace(
hour=0, minute=0, second=0
)
for product in tqdm.tqdm(dataset):
if "code" not in product:
continue

product_code = product["code"]
images: JSONType = product.get("images", {})
last_modified_t = product.get("last_modified_t")
last_modified = (
datetime.datetime.fromtimestamp(last_modified_t, tz=datetime.timezone.utc)
if last_modified_t
else None
)

if last_modified is None:
continue

# Skip products that have been modified today (more recent updates are
# possible)
if last_modified >= start_datetime:
logger.debug("Skipping %s", product_code)
continue

if product_code not in existing_codes:
item = {"code": product_code, "source": Flavor.off}
for key in OFF_FIELDS:
item[key] = product[key] if key in product else None
item["image_url"] = generate_main_image_url(
product_code, images, product["lang"]
)
db.add(Product(**item))
added_count += 1
buffer_len += 1

else:
item = {key: product[key] if key in product else None for key in OFF_FIELDS}
item["image_url"] = generate_main_image_url(
product_code, images, product["lang"]
)
execute_result = db.execute(
Product.__table__.update()
.where(Product.code == product_code)
.where(Product.source == Flavor.off)
# Update the product if only if it has not been updated since
# the creation of the current dataset
.where(
or_(
Product.updated < last_modified,
Product.updated == None, # noqa: E711, E501
)
)
.values(**item)
)
updated_count += execute_result.rowcount
buffer_len += 1

if buffer_len % batch_size == 0:
db.commit()
logger.info(f"Products: {added_count} added, {updated_count} updated")
buffer_len = 0
13 changes: 10 additions & 3 deletions app/utils.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import logging

import sentry_sdk
from openfoodfacts import API, APIVersion, Country, Environment, Flavor
from openfoodfacts import API, APIVersion, Country, Flavor
from openfoodfacts.utils import get_logger
from OSMPythonTools.nominatim import Nominatim
from sentry_sdk.integrations import Integration
from sentry_sdk.integrations.logging import LoggingIntegration

from app.config import settings
from app.schemas import LocationBase, ProductBase

logger = get_logger(__name__)
Expand All @@ -31,7 +32,13 @@ def init_sentry(sentry_dsn: str | None, integrations: list[Integration] | None =

# OpenFoodFacts
# ------------------------------------------------------------------------------
OFF_FIELDS = ["product_name", "product_quantity", "brands", "image_url"]
OFF_FIELDS = [
"product_name",
"product_quantity",
"brands",
"image_url",
"unique_scans_n",
]


def openfoodfacts_product_search(code: str):
Expand All @@ -41,7 +48,7 @@ def openfoodfacts_product_search(code: str):
country=Country.world,
flavor=Flavor.off,
version=APIVersion.v2,
environment=Environment.org,
environment=settings.environment,
)
return client.product.get(code)

Expand Down
6 changes: 6 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ services:
<<: *api-common
volumes:
- ./static:/opt/open-prices/static

scheduler:
<<: *api-common
command: ["python", "-m", "app", "run-scheduler"]
volumes:
- ./static:/opt/open-prices/static

postgres:
restart: $RESTART_POLICY
Expand Down
4 changes: 4 additions & 0 deletions docker/dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,7 @@ services:
<<: *api-base
# uvicorn in reload mode
command: ["uvicorn", "app.api:app", "--proxy-headers", "--host", "0.0.0.0", "--port", "8000", "--reload"]

scheduler:
<<: *api-base
command: ["python", "-m", "app", "run-scheduler"]
Loading

0 comments on commit b60c3ac

Please sign in to comment.