diff --git a/.github/workflows/integration_test.yaml b/.github/workflows/integration_test.yaml index 0981bdf..778a6d6 100644 --- a/.github/workflows/integration_test.yaml +++ b/.github/workflows/integration_test.yaml @@ -12,6 +12,7 @@ jobs: uses: canonical/operator-workflows/.github/workflows/integration_test.yaml@main secrets: inherit with: + test-timeout: 60 channel: 1.28-strict/stable modules: '["test_charm.py"]' juju-channel: 3.4/stable diff --git a/airbyte_rock/local-files/pod-sweeper.sh b/airbyte_rock/local-files/pod-sweeper.sh index d5e0d0b..187d6ca 100644 --- a/airbyte_rock/local-files/pod-sweeper.sh +++ b/airbyte_rock/local-files/pod-sweeper.sh @@ -3,7 +3,6 @@ # See LICENSE file for licensing details. # https://github.com/airbytehq/airbyte-platform/blob/main/charts/airbyte-pod-sweeper/templates/configmap.yaml -# TODO(kelkawi-a): Move this to Airbyte ROCK get_job_pods() { # echo "Running kubectl command to get job pods..." @@ -12,6 +11,17 @@ get_job_pods() { -o=jsonpath='{range .items[*]} {.metadata.name} {.status.phase} {.status.conditions[0].lastTransitionTime} {.status.startTime}{"\n"}{end}' } +# Useful function when debugging +fetch_pod_logs() { + pod_name="$1" + echo "Fetching logs for pod: ${pod_name}" + kubectl -n "${JOB_KUBE_NAMESPACE}" describe pod "$pod_name" + kubectl -n "${JOB_KUBE_NAMESPACE}" get pod "$pod_name" -o yaml | grep serviceAccount + kubectl -n "${JOB_KUBE_NAMESPACE}" logs "$pod_name" + kubectl -n "${JOB_KUBE_NAMESPACE}" logs "$pod_name" -c init + kubectl -n "${JOB_KUBE_NAMESPACE}" logs "$pod_name" -c main +} + delete_pod() { printf "From status '%s' since '%s', " "$2" "$3" echo "$1" | grep -v "STATUS" | awk '{print $1}' | xargs --no-run-if-empty kubectl -n "${JOB_KUBE_NAMESPACE}" delete pod @@ -20,6 +30,7 @@ delete_pod() { while : do echo "Starting pod sweeper cycle:" + sleep 120 if [ -n "${RUNNING_TTL_MINUTES}" ]; then # Time window for running pods diff --git a/airbyte_rock/rockcraft.yaml b/airbyte_rock/rockcraft.yaml index 5dc6b4b..e07034a 100644 --- a/airbyte_rock/rockcraft.yaml +++ b/airbyte_rock/rockcraft.yaml @@ -4,7 +4,7 @@ name: airbyte summary: Airbyte rock description: Airbyte OCI image for the Airbyte charm -version: "1.0" +version: "1.0.0" base: ubuntu@22.04 license: Apache-2.0 platforms: @@ -49,7 +49,7 @@ parts: plugin: dump source: https://github.com/airbytehq/airbyte-platform.git # yamllint disable-line source-type: git - source-tag: v0.63.8 + source-tag: v1.4.0 override-build: | cp -r . ${CRAFT_PART_INSTALL}/airbyte-platform stage: @@ -66,10 +66,14 @@ parts: - gradle - openjdk-21-jdk-headless - npm + - libpq-dev + - python3-dev build-snaps: - docker stage-packages: - openjdk-21-jdk-headless + - libpq-dev + - python3-dev override-build: | cd ${CRAFT_STAGE}/airbyte-platform ./gradlew assemble -x dockerBuildImage --continue --max-workers 1 @@ -80,25 +84,28 @@ parts: plugin: nil override-build: | mkdir ${CRAFT_PART_INSTALL}/airbyte-server - mkdir ${CRAFT_PART_INSTALL}/airbyte-api-server mkdir ${CRAFT_PART_INSTALL}/airbyte-workers mkdir ${CRAFT_PART_INSTALL}/airbyte-bootloader mkdir ${CRAFT_PART_INSTALL}/airbyte-cron mkdir ${CRAFT_PART_INSTALL}/airbyte-connector-builder-server + mkdir ${CRAFT_PART_INSTALL}/airbyte-workload-api-server + mkdir ${CRAFT_PART_INSTALL}/airbyte-workload-launcher tar -xvf ${CRAFT_STAGE}/airbyte-platform/airbyte-server/build/distributions/airbyte-app.tar -C ${CRAFT_PART_INSTALL}/airbyte-server - tar -xvf ${CRAFT_STAGE}/airbyte-platform/airbyte-api-server/build/distributions/airbyte-app.tar -C ${CRAFT_PART_INSTALL}/airbyte-api-server tar -xvf ${CRAFT_STAGE}/airbyte-platform/airbyte-workers/build/distributions/airbyte-app.tar -C ${CRAFT_PART_INSTALL}/airbyte-workers tar -xvf ${CRAFT_STAGE}/airbyte-platform/airbyte-bootloader/build/distributions/airbyte-app.tar -C ${CRAFT_PART_INSTALL}/airbyte-bootloader tar -xvf ${CRAFT_STAGE}/airbyte-platform/airbyte-cron/build/distributions/airbyte-app.tar -C ${CRAFT_PART_INSTALL}/airbyte-cron tar -xvf ${CRAFT_STAGE}/airbyte-platform/airbyte-connector-builder-server/build/distributions/airbyte-app.tar -C ${CRAFT_PART_INSTALL}/airbyte-connector-builder-server + tar -xvf ${CRAFT_STAGE}/airbyte-platform/airbyte-workload-api-server/build/distributions/airbyte-app.tar -C ${CRAFT_PART_INSTALL}/airbyte-workload-api-server + tar -xvf ${CRAFT_STAGE}/airbyte-platform/airbyte-workload-launcher/build/distributions/airbyte-app.tar -C ${CRAFT_PART_INSTALL}/airbyte-workload-launcher stage: - airbyte-server - - airbyte-api-server - airbyte-workers - airbyte-bootloader - airbyte-cron - airbyte-connector-builder-server + - airbyte-workload-api-server + - airbyte-workload-launcher local-files: after: [organize-tars] diff --git a/charmcraft.yaml b/charmcraft.yaml index d9dbb42..14145f9 100644 --- a/charmcraft.yaml +++ b/charmcraft.yaml @@ -428,7 +428,9 @@ config: # Your workload’s containers. containers: - airbyte-api-server: + airbyte-workload-api-server: + resource: airbyte-image + airbyte-workload-launcher: resource: airbyte-image airbyte-bootloader: resource: airbyte-image @@ -448,4 +450,3 @@ resources: airbyte-image: type: oci-image description: OCI image for Airbyte - diff --git a/src/charm.py b/src/charm.py index 5fa79ec..94df7f7 100755 --- a/src/charm.py +++ b/src/charm.py @@ -24,6 +24,8 @@ INTERNAL_API_PORT, LOGS_BUCKET_CONFIG, REQUIRED_S3_PARAMETERS, + WORKLOAD_API_PORT, + WORKLOAD_LAUNCHER_PORT, ) from log import log_event_handler from relations.airbyte_ui import AirbyteServerProvider @@ -62,6 +64,13 @@ def get_pebble_layer(application_name, context): }, } + if application_name == "airbyte-bootloader": + pebble_layer["services"][application_name].update( + { + "on-success": "ignore", + } + ) + application_info = CONTAINER_HEALTH_CHECK_MAP[application_name] if application_info is not None: pebble_layer["services"][application_name].update( @@ -172,7 +181,7 @@ def _on_update_status(self, event): check = container.get_check("up") if check.status != CheckStatus.UP: logger.error(f"check failed for {container_name}") - self.unit.status = MaintenanceStatus("Status check: DOWN") + self.unit.status = MaintenanceStatus(f"Status check: {container_name!r} DOWN") return if not all_valid_plans: @@ -284,7 +293,13 @@ def _update(self, event): self.unit.status = BlockedStatus(f"failed to create buckets: {str(e)}") return - self.model.unit.set_ports(AIRBYTE_API_PORT, INTERNAL_API_PORT, CONNECTOR_BUILDER_SERVER_API_PORT) + self.model.unit.set_ports( + AIRBYTE_API_PORT, + INTERNAL_API_PORT, + CONNECTOR_BUILDER_SERVER_API_PORT, + WORKLOAD_API_PORT, + WORKLOAD_LAUNCHER_PORT, + ) for container_name in CONTAINER_HEALTH_CHECK_MAP: container = self.unit.get_container(container_name) diff --git a/src/charm_helpers.py b/src/charm_helpers.py index aa2c76f..c7d2168 100644 --- a/src/charm_helpers.py +++ b/src/charm_helpers.py @@ -11,6 +11,7 @@ BASE_ENV, CONNECTOR_BUILDER_SERVER_API_PORT, INTERNAL_API_PORT, + WORKLOAD_API_PORT, ) from structured_config import StorageType @@ -39,7 +40,7 @@ def create_env(model_name, app_name, container_name, config, state): secret_persistence = config["secret-persistence"].value # Some defaults are extracted from Helm chart: - # https://github.com/airbytehq/airbyte-platform/tree/v0.60.0/charts/airbyte + # https://github.com/airbytehq/airbyte-platform/tree/v1.3.0/charts/airbyte env = { **BASE_ENV, # Airbye services config @@ -137,11 +138,18 @@ def create_env(model_name, app_name, container_name, config, state): "CONNECTOR_BUILDER_SERVER_API_HOST": f"{app_name}:{CONNECTOR_BUILDER_SERVER_API_PORT}", "CONNECTOR_BUILDER_API_HOST": f"{app_name}:{CONNECTOR_BUILDER_SERVER_API_PORT}", "AIRBYTE_API_HOST": f"{app_name}:{AIRBYTE_API_PORT}/api/public", + "WORKLOAD_API_HOST": f"{app_name}:{WORKLOAD_API_PORT}", + "WORKLOAD_API_BEARER_TOKEN": ".Values.workload-api.bearerToken", } # https://github.com/airbytehq/airbyte/issues/29506#issuecomment-1775148609 - if container_name == "airbyte-api-server": - env.update({"INTERNAL_API_HOST": f"http://{app_name}:{INTERNAL_API_PORT}"}) + if container_name in ["airbyte-workload-launcher", "airbyte-workers"]: + env.update( + { + "INTERNAL_API_HOST": f"http://{app_name}:{INTERNAL_API_PORT}", + "WORKLOAD_API_HOST": f"http://{app_name}:{WORKLOAD_API_PORT}", + } + ) if config["storage-type"].value == StorageType.minio and state.minio: minio_endpoint = construct_svc_endpoint( diff --git a/src/literals.py b/src/literals.py index 91ce1b5..fc9ca48 100644 --- a/src/literals.py +++ b/src/literals.py @@ -7,13 +7,18 @@ INTERNAL_API_PORT = 8001 AIRBYTE_API_PORT = 8006 WORKLOAD_API_PORT = 8007 -AIRBYTE_VERSION = "0.63.8" +WORKLOAD_LAUNCHER_PORT = 8016 +AIRBYTE_VERSION = "1.4.0" DB_NAME = "airbyte-k8s_db" CONTAINER_HEALTH_CHECK_MAP = { - "airbyte-api-server": { - "port": AIRBYTE_API_PORT, + "airbyte-workload-api-server": { + "port": WORKLOAD_API_PORT, + "health_endpoint": "/health", + }, + "airbyte-workload-launcher": { + "port": WORKLOAD_LAUNCHER_PORT, "health_endpoint": "/health", }, "airbyte-bootloader": None, @@ -62,4 +67,11 @@ "CONTAINER_ORCHESTRATOR_ENABLED": "true", "CONTAINER_ORCHESTRATOR_IMAGE": f"airbyte/container-orchestrator:{AIRBYTE_VERSION}", "LOG4J_CONFIGURATION_FILE": "log4j2-minio.xml", + "ENTERPRISE_SOURCE_STUBS_URL": "https://connectors.airbyte.com/files/resources/connector_stubs/v0/connector_stubs.json", + "PUB_SUB_ENABLED": "false", + "PUB_SUB_TOPIC_NAME": "", + "DATA_PLANE_ID": "local", + "LOCAL_ROOT": "/tmp/airbyte_local", # nosec + "RUN_DATABASE_MIGRATION_ON_STARTUP": "true", + "API_AUTHORIZATION_ENABLED": "false", } diff --git a/src/structured_config.py b/src/structured_config.py index 9d3f0c5..d9dfc70 100644 --- a/src/structured_config.py +++ b/src/structured_config.py @@ -42,8 +42,6 @@ class SecretPersistenceType(str, Enum): TESTING_CONFIG_DB_TABLE = "TESTING_CONFIG_DB_TABLE" VAULT = "VAULT" - secret_persistence: Optional["SecretPersistenceType"] = None # Optional field - class VaultAuthType(str, Enum): """Enum for the `vault-auth-method` field.""" diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index f36a6d9..9d4a0f6 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -56,7 +56,7 @@ async def deploy(ops_test: OpsTest, charm: str, charm_image: str): config={"num-history-shards": 4}, ) await ops_test.model.deploy(APP_NAME_TEMPORAL_ADMIN, channel="edge") - await ops_test.model.deploy("postgresql-k8s", channel="14/stable", trust=True) + await ops_test.model.deploy("postgresql-k8s", channel="14/stable", trust=True, revision=381) await ops_test.model.deploy("minio", channel="edge") async with ops_test.fast_forward(): diff --git a/tests/integration/helpers.py b/tests/integration/helpers.py index c7289d0..558bede 100644 --- a/tests/integration/helpers.py +++ b/tests/integration/helpers.py @@ -5,11 +5,15 @@ """Charm integration test helpers.""" import logging +import os import time from pathlib import Path +import psycopg2 import requests import yaml +from psycopg2 import sql +from psycopg2.extras import RealDictCursor from pytest_operator.plugin import OpsTest from temporal_client.activities import say_hello from temporal_client.workflows import SayHello @@ -128,7 +132,7 @@ async def perform_airbyte_integrations(ops_test: OpsTest): raise_on_blocked=False, wait_for_active=True, idle_period=60, - timeout=600, + timeout=300, ) assert ops_test.model.applications[APP_NAME_AIRBYTE_SERVER].units[0].workload_status == "active" @@ -151,6 +155,37 @@ def get_airbyte_workspace_id(api_url): return response.json().get("data")[0]["workspaceId"] +def update_pokeapi_connector_version(db_host, db_password): + """Updates pokeapi connector version. + + Args: + db_host: Database host. + db_password: Database password. + """ + with psycopg2.connect( + host=db_host, + dbname="airbyte-k8s_db", + user="operator", + password=db_password, + port=5432, + ) as conn: + with conn.cursor(cursor_factory=RealDictCursor) as cursor: + update_connector = sql.SQL( + """ + UPDATE {adv_table} + SET docker_image_tag = '0.2.1' + WHERE id IN ( + SELECT adv.id + FROM {ad_table} ad + JOIN {adv_table} adv ON ad.default_version_id = adv.id + WHERE ad.name LIKE '%Poke%' + ); + """ + ).format(adv_table=sql.Identifier("actor_definition_version"), ad_table=sql.Identifier("actor_definition")) + cursor.execute(update_connector) + conn.commit() + + def create_airbyte_source(api_url, workspace_id): """Create Airbyte sample source. @@ -167,8 +202,19 @@ def create_airbyte_source(api_url, workspace_id): "name": "API Test", "workspaceId": workspace_id, } + # payload = { + # "configuration": { + # "sourceType": "jira", + # "api_token": os.getenv("JIRA_SANDBOX_API_TOKEN"), + # "domain": os.getenv("JIRA_SANDBOX_DOMAIN"), + # "email": os.getenv("JIRA_SANDBOX_EMAIL"), + # }, + # "name": "JIRA API Test", + # "workspaceId": workspace_id, + # } logger.info("creating Airbyte source") + logger.info(f"Jira email: {os.getenv('JIRA_SANDBOX_EMAIL')}") response = requests.post(url, json=payload, headers=POST_HEADERS, timeout=300) logger.info(response.json()) @@ -224,6 +270,17 @@ def create_airbyte_connection(api_url, source_id, destination_id): Returns: Created connection ID. """ + # logger.info("creating Airbyte connection") + # s = airbyte_api.AirbyteAPI(server_url=f"{api_url}/api/public/v1") + # res = s.connections.create_connection( + # request=models.ConnectionCreateRequest( + # destination_id=destination_id, + # source_id=source_id, + # name="Pokeapi-to-postgres", + # ) + # ) + + # logger.info(res) url = f"{api_url}/api/public/v1/connections" payload = { "schedule": {"scheduleType": "manual"}, @@ -236,7 +293,7 @@ def create_airbyte_connection(api_url, source_id, destination_id): } logger.info("creating Airbyte connection") - response = requests.post(url, json=payload, headers=POST_HEADERS, timeout=300) + response = requests.post(url, json=payload, headers=POST_HEADERS, timeout=1800) logger.info(response.json()) assert response.status_code == 200 @@ -326,11 +383,19 @@ async def run_test_sync_job(ops_test): """ # Create connection api_url = await get_unit_url(ops_test, application=APP_NAME_AIRBYTE_SERVER, unit=0, port=8001) + + # Get DB URL + status = await ops_test.model.get_status() # noqa: F821 + # db_host = status["applications"]["postgresql-k8s"]["units"]["postgresql-k8s/0"]["address"] + logger.info("curling app address: %s", api_url) workspace_id = get_airbyte_workspace_id(api_url) db_password = await get_db_password(ops_test) assert db_password + # Update Pokeapi connector version, latest version does not work. + # update_pokeapi_connector_version(db_host, db_password) + # Create Source source_id = create_airbyte_source(api_url, workspace_id) @@ -341,13 +406,13 @@ async def run_test_sync_job(ops_test): connection_id = create_airbyte_connection(api_url, source_id, destination_id) # Trigger sync job - for i in range(2): + for i in range(4): logger.info(f"attempt {i + 1} to trigger new job") job_id = trigger_airbyte_connection(api_url, connection_id) # Wait until job is successful job_successful = False - for j in range(7): + for j in range(15): logger.info(f"job {i + 1} attempt {j + 1}: getting job status") status = check_airbyte_job_status(api_url, job_id) diff --git a/tests/unit/test_charm.py b/tests/unit/test_charm.py index 8803bf3..f826219 100644 --- a/tests/unit/test_charm.py +++ b/tests/unit/test_charm.py @@ -195,7 +195,9 @@ def test_update_status_down(self): container.get_check.return_value.status = CheckStatus.DOWN harness.charm.on.update_status.emit() - self.assertEqual(harness.model.unit.status, MaintenanceStatus("Status check: DOWN")) + self.assertEqual( + harness.model.unit.status, MaintenanceStatus("Status check: 'airbyte-workload-api-server' DOWN") + ) def test_incomplete_pebble_plan(self): """The charm re-applies the pebble plan if incomplete.""" @@ -408,14 +410,20 @@ def create_plan(container_name, storage_type): "WEBAPP_URL": "http://airbyte-ui-k8s:8080", "WORKER_LOGS_STORAGE_TYPE": storage_type, "WORKER_STATE_STORAGE_TYPE": storage_type, - "WORKLOAD_API_HOST": "localhost", + "WORKLOAD_API_HOST": "airbyte-k8s:8007", + "WORKLOAD_API_BEARER_TOKEN": ".Values.workload-api.bearerToken", }, }, }, } - if container_name == "airbyte-api-server": - want_plan["services"][container_name]["environment"].update({"INTERNAL_API_HOST": "http://airbyte-k8s:8001"}) + if container_name == "airbyte-bootloader": + want_plan["services"][container_name].update({"on-success": "ignore"}) + + if container_name in ["airbyte-workload-launcher", "airbyte-workers"]: + want_plan["services"][container_name]["environment"].update( + {"INTERNAL_API_HOST": "http://airbyte-k8s:8001", "WORKLOAD_API_HOST": "http://airbyte-k8s:8007"} + ) if storage_type == StorageType.minio: want_plan["services"][container_name]["environment"].update( diff --git a/tox.ini b/tox.ini index 8812d52..fa26344 100644 --- a/tox.ini +++ b/tox.ini @@ -99,11 +99,13 @@ commands = description = Run integration tests deps = ipdb==0.13.9 - juju==3.5.2.0 + juju==3.5.2.1 pytest==7.1.3 pytest-operator==0.35.0 temporalio==1.6.0 pytest-asyncio==0.21 + airbyte-api==0.52.2 + psycopg2-binary==2.9.10 -r{toxinidir}/requirements.txt commands = pytest -v \