diff --git a/core/asset/asset.go b/core/asset/asset.go index 683fca9d..13dc8152 100644 --- a/core/asset/asset.go +++ b/core/asset/asset.go @@ -35,6 +35,7 @@ type Asset struct { Name string `json:"name" diff:"name"` Description string `json:"description" diff:"description"` Data map[string]interface{} `json:"data" diff:"data"` + URL string `json:"url" diff:"url"` Labels map[string]string `json:"labels" diff:"labels"` Owners []user.User `json:"owners,omitempty" diff:"owners"` CreatedAt time.Time `json:"created_at" diff:"-"` diff --git a/core/asset/asset_test.go b/core/asset/asset_test.go index 41ffdcd3..2036edc2 100644 --- a/core/asset/asset_test.go +++ b/core/asset/asset_test.go @@ -292,6 +292,7 @@ func TestAssetPatch(t *testing.T) { "service": "firehose", "description": "new-description", "name": "new-name", + "url": "https://sample-url.com", "labels": map[string]string{ "bar": "foo", "bar2": "foo2", @@ -307,6 +308,7 @@ func TestAssetPatch(t *testing.T) { Service: "firehose", Description: "new-description", Name: "new-name", + URL: "https://sample-url.com", Labels: map[string]string{ "bar": "foo", "bar2": "foo2", @@ -325,6 +327,7 @@ func TestAssetPatch(t *testing.T) { Service: "optimus", Description: "sample-description", Name: "old-name", + URL: "https://sample-url-old.com", Labels: map[string]string{ "foo": "bar", }, @@ -338,6 +341,7 @@ func TestAssetPatch(t *testing.T) { "service": "firehose", "description": "new-description", "name": "new-name", + "url": "https://sample-url.com", "labels": map[string]string{ "bar": "foo", "bar2": "foo2", @@ -353,6 +357,7 @@ func TestAssetPatch(t *testing.T) { Service: "firehose", Description: "new-description", Name: "new-name", + URL: "https://sample-url.com", Labels: map[string]string{ "bar": "foo", "bar2": "foo2", diff --git a/core/asset/patch.go b/core/asset/patch.go index b396568e..f2788ff5 100644 --- a/core/asset/patch.go +++ b/core/asset/patch.go @@ -12,6 +12,7 @@ func patchAsset(a *Asset, patchData map[string]interface{}) { a.Service = patchString("service", patchData, a.Service) a.Name = patchString("name", patchData, a.Name) a.Description = patchString("description", patchData, a.Description) + a.URL = patchString("url", patchData, a.URL) labels, exists := patchData["labels"] if exists { diff --git a/internal/server/v1beta1/asset.go b/internal/server/v1beta1/asset.go index 3bf0d5fa..3adb6018 100644 --- a/internal/server/v1beta1/asset.go +++ b/internal/server/v1beta1/asset.go @@ -422,6 +422,7 @@ func (server *APIServer) buildAsset(baseAsset *compassv1beta1.UpsertAssetRequest Name: baseAsset.GetName(), Description: baseAsset.GetDescription(), Data: baseAsset.GetData().AsMap(), + URL: baseAsset.Url, Labels: baseAsset.GetLabels(), } @@ -500,6 +501,9 @@ func decodePatchAssetToMap(pb *compassv1beta1.UpsertPatchAssetRequest_Asset) map if pb.GetData() != nil { m["data"] = pb.GetData().AsMap() } + if len(pb.Url) > 0 { + m["url"] = pb.Url + } if pb.GetLabels() != nil { m["labels"] = pb.GetLabels() } @@ -578,6 +582,7 @@ func assetToProto(a asset.Asset, withChangelog bool) (assetPB *compassv1beta1.As Name: a.Name, Description: a.Description, Data: data, + Url: a.URL, Labels: a.Labels, Owners: owners, Version: a.Version, diff --git a/internal/server/v1beta1/asset_test.go b/internal/server/v1beta1/asset_test.go index ef376618..7f28cff0 100644 --- a/internal/server/v1beta1/asset_test.go +++ b/internal/server/v1beta1/asset_test.go @@ -332,6 +332,7 @@ func TestUpsertAsset(t *testing.T) { Name: "new-name", Service: "kafka", Data: &structpb.Struct{}, + Url: "https://sample-url.com", Owners: []*compassv1beta1.User{{Id: "id", Uuid: "", Email: "email@email.com", Provider: "provider"}}, }, Upstreams: []*compassv1beta1.LineageNode{ @@ -450,6 +451,7 @@ func TestUpsertAsset(t *testing.T) { Service: "kafka", UpdatedBy: user.User{ID: userID}, Data: map[string]interface{}{}, + URL: "https://sample-url.com", Owners: []user.User{{ID: "id", UUID: "", Email: "email@email.com", Provider: "provider"}}, } upstreams := []string{"upstream-1"} @@ -520,6 +522,7 @@ func TestUpsertPatchAsset(t *testing.T) { Name: wrapperspb.String("new-name"), Service: "kafka", Data: &structpb.Struct{}, + Url: "https://sample-url.com", Owners: []*compassv1beta1.User{{Id: "id", Uuid: "", Email: "email@email.com", Provider: "provider"}}, }, Upstreams: []*compassv1beta1.LineageNode{ @@ -549,6 +552,7 @@ func TestUpsertPatchAsset(t *testing.T) { Service: "kafka", UpdatedBy: user.User{ID: userID}, Data: map[string]interface{}{}, + URL: "https://sample-url-old.com", Owners: []user.User{{ID: "id", UUID: "", Email: "email@email.com", Provider: "provider"}}, } ) @@ -652,6 +656,7 @@ func TestUpsertPatchAsset(t *testing.T) { Service: "kafka", UpdatedBy: user.User{ID: userID}, Data: map[string]interface{}{}, + URL: "https://sample-url.com", Owners: []user.User{{ID: "id", UUID: "", Email: "email@email.com", Provider: "provider"}}, } upstreams := []string{"upstream-1"} @@ -688,6 +693,7 @@ func TestUpsertPatchAsset(t *testing.T) { Service: "kafka", UpdatedBy: user.User{ID: userID}, Data: map[string]interface{}{}, + URL: "https://sample-url-old.com", Owners: []user.User{{ID: "id", UUID: "", Email: "email@email.com", Provider: "provider"}}, } @@ -733,6 +739,7 @@ func TestUpsertPatchAsset(t *testing.T) { Service: "kafka", UpdatedBy: user.User{ID: userID}, Data: map[string]interface{}{}, + URL: "https://sample-url-old.com", Owners: []user.User{{ID: "id", UUID: "", Email: "email@email.com", Provider: "provider"}}, } diff --git a/internal/store/postgres/asset_model.go b/internal/store/postgres/asset_model.go index 90761ad6..5dac27dc 100644 --- a/internal/store/postgres/asset_model.go +++ b/internal/store/postgres/asset_model.go @@ -21,6 +21,7 @@ type AssetModel struct { Service string `db:"service"` Description string `db:"description"` Data JSONMap `db:"data"` + URL string `db:"url"` Labels JSONMap `db:"labels"` Version string `db:"version"` UpdatedBy UserModel `db:"updated_by"` @@ -41,6 +42,7 @@ func (a *AssetModel) toAsset(owners []user.User) asset.Asset { Service: a.Service, Description: a.Description, Data: a.Data, + URL: a.URL, Labels: a.buildLabels(), Owners: owners, Version: a.Version, diff --git a/internal/store/postgres/asset_repository.go b/internal/store/postgres/asset_repository.go index 603b6781..339f5e5d 100644 --- a/internal/store/postgres/asset_repository.go +++ b/internal/store/postgres/asset_repository.go @@ -453,8 +453,8 @@ func (r *AssetRepository) deleteWithPredicate(ctx context.Context, pred sq.Eq) ( func (r *AssetRepository) insert(ctx context.Context, ast *asset.Asset) (id string, err error) { err = r.client.RunWithinTx(ctx, func(tx *sqlx.Tx) error { query, args, err := sq.Insert("assets"). - Columns("urn", "type", "service", "name", "description", "data", "labels", "updated_by", "version"). - Values(ast.URN, ast.Type, ast.Service, ast.Name, ast.Description, ast.Data, ast.Labels, ast.UpdatedBy.ID, asset.BaseVersion). + Columns("urn", "type", "service", "name", "description", "data", "url", "labels", "updated_by", "version"). + Values(ast.URN, ast.Type, ast.Service, ast.Name, ast.Description, ast.Data, ast.URL, ast.Labels, ast.UpdatedBy.ID, asset.BaseVersion). Suffix("RETURNING \"id\""). PlaceholderFormat(sq.Dollar). ToSql() @@ -490,7 +490,6 @@ func (r *AssetRepository) insert(ctx context.Context, ast *asset.Asset) (id stri } func (r *AssetRepository) update(ctx context.Context, assetID string, newAsset *asset.Asset, oldAsset *asset.Asset, clog diff.Changelog) error { - if !isValidUUID(assetID) { return asset.InvalidError{AssetID: assetID} } @@ -508,22 +507,24 @@ func (r *AssetRepository) update(ctx context.Context, assetID string, newAsset * newAsset.Version = newVersion newAsset.ID = oldAsset.ID - err = r.execContext(ctx, tx, - `UPDATE assets - SET urn = $1, - type = $2, - service = $3, - name = $4, - description = $5, - data = $6, - labels = $7, - updated_at = $8, - updated_by = $9, - version = $10 - WHERE id = $11; - `, - newAsset.URN, newAsset.Type, newAsset.Service, newAsset.Name, newAsset.Description, newAsset.Data, newAsset.Labels, time.Now(), newAsset.UpdatedBy.ID, newAsset.Version, assetID) + query, args, err := r.buildSQL(sq.Update("assets"). + Set("urn", newAsset.URN). + Set("type", newAsset.Type). + Set("service", newAsset.Service). + Set("name", newAsset.Name). + Set("description", newAsset.Description). + Set("data", newAsset.Data). + Set("url", newAsset.URL). + Set("labels", newAsset.Labels). + Set("updated_at", time.Now()). + Set("updated_by", newAsset.UpdatedBy.ID). + Set("version", newAsset.Version). + Where(sq.Eq{"id": assetID})) if err != nil { + return fmt.Errorf("build query: %w", err) + } + + if err := r.execContext(ctx, tx, query, args...); err != nil { return fmt.Errorf("error running update asset query: %w", err) } @@ -775,6 +776,7 @@ func (r *AssetRepository) getAssetSQL() sq.SelectBuilder { a.service as service, a.description as description, a.data as data, + COALESCE(a.url, '') as url, a.labels as labels, a.version as version, a.created_at as created_at, diff --git a/internal/store/postgres/asset_repository_test.go b/internal/store/postgres/asset_repository_test.go index cd0f4a31..9147cb58 100644 --- a/internal/store/postgres/asset_repository_test.go +++ b/internal/store/postgres/asset_repository_test.go @@ -930,6 +930,7 @@ func (r *AssetRepositoryTestSuite) TestUpsert() { Type: "table", Service: "bigquery", Version: "0.1", + URL: "https://sample-url.com", UpdatedBy: r.users[0], } id, err := r.repository.Upsert(r.ctx, &ast) @@ -1019,6 +1020,36 @@ func (r *AssetRepositoryTestSuite) TestUpsert() { r.Equal(ast.ID, identicalAsset.ID) }) + r.Run("should update the asset if asset is not identical", func() { + ast := asset.Asset{ + URN: "urn-u-2", + Type: "table", + Service: "bigquery", + URL: "https://sample-url-old.com", + UpdatedBy: r.users[0], + } + + id, err := r.repository.Upsert(r.ctx, &ast) + r.Require().NoError(err) + r.NotEmpty(id) + ast.ID = id + + updated := ast + updated.URL = "https://sample-url.com" + + id, err = r.repository.Upsert(r.ctx, &updated) + r.Require().NoError(err) + r.NotEmpty(id) + updated.ID = id + + r.Equal(ast.ID, updated.ID) + + actual, err := r.repository.GetByID(r.ctx, ast.ID) + r.NoError(err) + + r.Equal(updated.URL, actual.URL) + }) + r.Run("should delete old owners if it does not exist on new asset", func() { ast := asset.Asset{ URN: "urn-u-4", diff --git a/internal/store/postgres/migrations/000013_alter_assets_table.down.sql b/internal/store/postgres/migrations/000013_alter_assets_table.down.sql new file mode 100644 index 00000000..2b6ad57c --- /dev/null +++ b/internal/store/postgres/migrations/000013_alter_assets_table.down.sql @@ -0,0 +1 @@ +ALTER TABLE assets DROP COLUMN IF EXISTS url; diff --git a/internal/store/postgres/migrations/000013_alter_assets_table.up.sql b/internal/store/postgres/migrations/000013_alter_assets_table.up.sql new file mode 100644 index 00000000..cf561e1c --- /dev/null +++ b/internal/store/postgres/migrations/000013_alter_assets_table.up.sql @@ -0,0 +1 @@ +ALTER TABLE assets ADD COLUMN IF NOT EXISTS url TEXT;