From a79fdde1481f7817958cba07d1ed90f47843b12e Mon Sep 17 00:00:00 2001 From: anjali9791 Date: Fri, 24 Feb 2023 10:49:52 +0530 Subject: [PATCH] feat(bigquery): add clustering fields and extra fields for partition (#472) * add clustering fields * add partition details for different kinds of partitioning in bigquery - time and range --------- Co-authored-by: Anjali Aggarwal --- plugins/extractors/bigquery/README.md | 38 ++++++++++++++++++------- plugins/extractors/bigquery/bigquery.go | 30 +++++++++++++++++-- 2 files changed, 54 insertions(+), 14 deletions(-) diff --git a/plugins/extractors/bigquery/README.md b/plugins/extractors/bigquery/README.md index 93b4dd8c9..cf15fc973 100644 --- a/plugins/extractors/bigquery/README.md +++ b/plugins/extractors/bigquery/README.md @@ -60,17 +60,33 @@ source: ## Outputs -| Field | Sample Value | -|:----------------------|:-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| `resource.urn` | `project_id.dataset_name.table_name` | -| `resource.name` | `table_name` | -| `resource.service` | `bigquery` | -| `description` | `table description` | -| `profile.total_rows` | `2100` | -| `profile.usage_count` | `15` | -| `profile.joins` | [][Join](#Join) | -| `profile.filters` | [`"WHERE t.param_3 = 'the_param' AND t.column_1 = \"xxxxxx-xxxx-xxxx-xxxx-xxxxxxxxx\""`,`"WHERE event_timestamp >= TIMESTAMP(\"2021-10-29\", \"UTC\") AND event_timestamp < TIMESTAMP(\"2021-11-22T02:01:06Z\")"`] | -| `schema` | [][Column](#column) | +| Field | Sample Value | Description | +|:-------------------------------|:--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|:--------------------------------------------------------| +| `resource.urn` | `project_id.dataset_name.table_name` | | +| `resource.name` | `table_name` | | +| `resource.service` | `bigquery` | | +| `description` | `table description` | | +| `profile.total_rows` | `2100` | | +| `profile.usage_count` | `15` | | +| `profile.joins` | [][Join](#Join) | | +| `profile.filters` | [`"WHERE t.param_3 = 'the_param' AND t.column_1 = \"xxxxxx-xxxx-xxxx-xxxx-xxxxxxxxx\""`,`"WHERE event_timestamp >= TIMESTAMP(\"2021-10-29\", \"UTC\") AND event_timestamp < TIMESTAMP(\"2021-11-22T02:01:06Z\")"`] | | +| `schema` | [][Column](#column) | | +| `properties.partition_data` | `"partition_data": {"partition_field": "data_date", "require_partition_filter": false, "time_partition": {"partition_by": "DAY","partition_expire": 0 } }` | partition related data for time and range partitioning. | +| `properties.clustering_fields` | `['created_at', 'updated_at']` | list of fields on which the table is clustered | +| `properties.partition_field` | `created_at` | returns the field on which table is time partitioned | + +### Partition Data + +| Field | Sample Value | Description | +|:------------------------------------------|:-------------|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `partition_field` | `created_at` | field on which the table is partitioned either by TimePartitioning or RangePartitioning. In case field is empty for TimePartitioning _PARTITIONTIME is returned instead of empty. | +| `require_partition_filter` | `true` | boolean value which denotes if every query on the bigquery table must include at least one predicate that only references the partitioning column | +| `time_partition.partition_by` | `HOUR` | returns partition type HOUR/DAY/MONTH/YEAR | +| `time_partition.partition_expire_seconds` | `0` | time in which data will expire from this partition. If 0 it will not expire. | +| `range_partition.interval` | `10` | width of a interval range | +| `range_partition.start` | `0` | start value for partition inclusive of this value | +| `range_partition.end` | `100` | end value for partition exclusive of this value | + ### Column diff --git a/plugins/extractors/bigquery/bigquery.go b/plugins/extractors/bigquery/bigquery.go index 1e3af9d3a..d835d0a9a 100644 --- a/plugins/extractors/bigquery/bigquery.go +++ b/plugins/extractors/bigquery/bigquery.go @@ -257,10 +257,34 @@ func (e *Extractor) buildAsset(ctx context.Context, t *bigquery.Table, md *bigqu tableURN := plugins.BigQueryURN(t.ProjectID, t.DatasetID, t.TableID) tableProfile := e.buildTableProfile(tableURN, tableStats) - var partitionField string + partitionData := make(map[string]interface{}) if md.TimePartitioning != nil { partitionField = md.TimePartitioning.Field + if partitionField == "" { + partitionField = "_PARTITIONTIME" + } + partitionData["partition_field"] = partitionField + partitionData["time_partition"] = map[string]interface{}{ + "partition_by": string(md.TimePartitioning.Type), + "partition_expire_seconds": md.TimePartitioning.Expiration.Seconds(), + } + } else if md.RangePartitioning != nil { + partitionData["partition_field"] = md.RangePartitioning.Field + partitionData["range_partition"] = map[string]interface{}{ + "start": md.RangePartitioning.Range.Start, + "end": md.RangePartitioning.Range.End, + "interval": md.RangePartitioning.Range.Interval, + } + } + partitionData["require_partition_filter"] = md.RequirePartitionFilter + + var clusteringFields []interface{} + if md.Clustering != nil && len(md.Clustering.Fields) > 0 { + clusteringFields = make([]interface{}, len(md.Clustering.Fields)) + for idx, field := range md.Clustering.Fields { + clusteringFields[idx] = field + } } var previewFields []string @@ -271,7 +295,6 @@ func (e *Extractor) buildAsset(ctx context.Context, t *bigquery.Table, md *bigqu if err != nil { e.logger.Warn("error building preview", "err", err, "table", tableFQN) } - } table, err := anypb.New(&v1beta2.Table{ @@ -284,7 +307,8 @@ func (e *Extractor) buildAsset(ctx context.Context, t *bigquery.Table, md *bigqu "dataset": t.DatasetID, "project": t.ProjectID, "type": string(md.Type), - "partition_field": partitionField, + "partition_data": partitionData, + "clustering_fields": clusteringFields, }), CreateTime: timestamppb.New(md.CreationTime), UpdateTime: timestamppb.New(md.LastModifiedTime),