Skip to content

Commit

Permalink
fix: issue with non-partitioned tables
Browse files Browse the repository at this point in the history
- script can now be run without installing it first (my bad)
  • Loading branch information
H-Max committed Jul 5, 2024
1 parent 11bfc7d commit c81f4ba
Showing 1 changed file with 22 additions and 19 deletions.
41 changes: 22 additions & 19 deletions bq2dbt/bq2dbt.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

logging.basicConfig(
level=logging.INFO,
format='%(asctime)s | %(name)s | %(levelname)s | %(message)s',
format='%(asctime)s | %(levelname)s | %(message)s',
)

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -59,7 +59,6 @@ def bq2dbt():
args = parse_command_line()

tables = []
dataset_id = None
target_split = args.target.split(".")
if len(target_split) == 3: # Complete project.dataset.table
project_id = target_split[0]
Expand All @@ -72,10 +71,6 @@ def bq2dbt():
logger.error("Invalid BigQuery dataset or table ID.")
sys.exit(1)

if not dataset_id:
logger.error("Dataset ID cannot be empty.")
sys.exit(1)

client = bigquery.Client(project=project_id)

if len(tables) == 0:
Expand All @@ -95,20 +90,24 @@ def bq2dbt():
logger.info("Starting generation of YAML and SQL for table \"%s\"...", table_id)

fields_query = f"""
SELECT C.field_path, C.data_type, C.description, C2.is_nullable
FROM `{project_id}.{dataset_id}.INFORMATION_SCHEMA.COLUMN_FIELD_PATHS` AS C
LEFT JOIN `{project_id}.{dataset_id}.INFORMATION_SCHEMA.COLUMNS` AS C2
ON C.table_catalog = C2.table_catalog
AND C.table_schema = C2.table_schema
AND C.table_name = C2.table_name
AND C.column_name = C2.column_name
AND C.field_path = C2.column_name
WHERE C.table_name = '{table_id}'
SELECT
CFP.field_path,
CFP.data_type,
CFP.description,
COLUMNS.is_nullable
FROM `{project_id}.{dataset_id}.INFORMATION_SCHEMA.COLUMN_FIELD_PATHS` AS CFP
LEFT JOIN `{project_id}.{dataset_id}.INFORMATION_SCHEMA.COLUMNS` AS COLUMNS
ON CFP.table_catalog = COLUMNS.table_catalog
AND CFP.table_schema = COLUMNS.table_schema
AND CFP.table_name = COLUMNS.table_name
AND CFP.column_name = COLUMNS.column_name
AND CFP.field_path = COLUMNS.column_name
WHERE CFP.table_name = '{table_id}'
"""

pk_query = f"""
SELECT C.column_name
FROM `{project_id}.{dataset_id}.INFORMATION_SCHEMA.KEY_COLUMN_USAGE` AS C
SELECT column_name
FROM `{project_id}.{dataset_id}.INFORMATION_SCHEMA.KEY_COLUMN_USAGE`
WHERE table_name = '{table_id}'
AND constraint_name = '{table_id}.pk$'
ORDER BY ordinal_position
Expand Down Expand Up @@ -188,7 +187,7 @@ def bq2dbt():

# Support for time partioning retrieval
table_time_partitioning = table.time_partitioning
table_partition_expiration = table.time_partitioning.expiration_ms
table_partition_expiration = table.time_partitioning.expiration_ms if table_time_partitioning else None
table_require_partition_filter = table.require_partition_filter
logger.info("Time partition : %s", table_time_partitioning)
logger.info("Partition expiration : %s", table_partition_expiration)
Expand Down Expand Up @@ -218,7 +217,7 @@ def bq2dbt():
sql_columns_statement = f"\n{SQL_INDENTATION}, ".join(sql_columns)
sql_from_statement = f"\n{SQL_INDENTATION}`{project_id}.{dataset_id}.{table_id}`"
sql_output = (f"SELECT\n{SQL_INDENTATION}{sql_columns_statement}\nFROM{sql_from_statement}"
f" -- Replace this with ref() or source() macro")
f" -- Replace this with ref() or source() macro\n")

output_path = f"./{output_folder}/{project_id}/{dataset_id}"
os.makedirs(output_path, exist_ok=True)
Expand All @@ -232,3 +231,7 @@ def bq2dbt():
logger.info("YAML and SQL files wrote in path: %s", output_path)

logger.info("Operation complete")


if __name__ == "__main__":
bq2dbt()

0 comments on commit c81f4ba

Please sign in to comment.