Skip to content

Commit

Permalink
fix: Patch asset's lineage (#195)
Browse files Browse the repository at this point in the history
- For the UpsertPatchAsset RPC, when no upstreams and downstreams are
  specified in the request, skip overwriting the asset's lineage unless
  OverwriteLineage flag is set to true.
- Bump up the proton commit in Makefile to pull in changes from
  raystack/proton#238 + generate protos.
  • Loading branch information
sudo-suhas authored Jan 17, 2023
1 parent 7df7acd commit a587fff
Show file tree
Hide file tree
Showing 9 changed files with 1,467 additions and 1,211 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
NAME="github.com/odpf/compass"
VERSION=$(shell git describe --always --tags 2>/dev/null)
COVERFILE="/tmp/compass.coverprofile"
PROTON_COMMIT := "838f2a8c9ddc8fa6dfbd6f3ebe6201e76e2368f2"
PROTON_COMMIT := "c7639b42da0679b2340a52155d2fe577b9d45aa2"
.PHONY: all build test clean install proto

all: build
Expand Down
24 changes: 15 additions & 9 deletions core/asset/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,21 +39,27 @@ func (s *Service) GetAllAssets(ctx context.Context, flt Filter, withTotal bool)
}

func (s *Service) UpsertAsset(ctx context.Context, ast *Asset, upstreams, downstreams []string) (string, error) {
var assetID string
var err error
assetID, err := s.UpsertAssetWithoutLineage(ctx, ast)
if err != nil {
return "", err
}

if err := s.lineageRepository.Upsert(ctx, ast.URN, upstreams, downstreams); err != nil {
return "", err
}

assetID, err = s.assetRepository.Upsert(ctx, ast)
return assetID, nil
}

func (s *Service) UpsertAssetWithoutLineage(ctx context.Context, ast *Asset) (string, error) {
assetID, err := s.assetRepository.Upsert(ctx, ast)
if err != nil {
return assetID, err
return "", err
}

ast.ID = assetID
if err := s.discoveryRepository.Upsert(ctx, *ast); err != nil {
return assetID, err
}

if err := s.lineageRepository.Upsert(ctx, ast.URN, upstreams, downstreams); err != nil {
return assetID, err
return "", err
}

return assetID, nil
Expand Down
66 changes: 62 additions & 4 deletions core/asset/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,12 +233,70 @@ func TestService_UpsertAsset(t *testing.T) {

svc := asset.NewService(mockAssetRepo, mockDiscoveryRepo, mockLineageRepo)
rid, err := svc.UpsertAsset(ctx, tc.Asset, tc.Upstreams, tc.Downstreams)
if err != nil && errors.Is(tc.Err, err) {
t.Fatalf("got error %v, expected error was %v", err, tc.Err)
if tc.Err != nil {
assert.EqualError(t, err, tc.Err.Error())
return
}
assert.NoError(t, err)
assert.Equal(t, tc.ReturnedID, rid)
})
}
}

func TestService_UpsertAssetWithoutLineage(t *testing.T) {
sampleAsset := &asset.Asset{ID: "some-id", URN: "some-urn", Type: asset.TypeDashboard, Service: "some-service"}
var testCases = []struct {
Description string
Asset *asset.Asset
Err error
ReturnedID string
Setup func(context.Context, *mocks.AssetRepository, *mocks.DiscoveryRepository)
}{
{
Description: `should return error if asset repository upsert return error`,
Asset: sampleAsset,
Setup: func(ctx context.Context, ar *mocks.AssetRepository, dr *mocks.DiscoveryRepository) {
ar.EXPECT().Upsert(ctx, sampleAsset).Return("", errors.New("unknown error"))
},
Err: errors.New("unknown error"),
},
{
Description: `should return error if discovery repository upsert return error`,
Asset: sampleAsset,
Setup: func(ctx context.Context, ar *mocks.AssetRepository, dr *mocks.DiscoveryRepository) {
ar.EXPECT().Upsert(ctx, sampleAsset).Return(sampleAsset.ID, nil)
dr.EXPECT().Upsert(ctx, *sampleAsset).Return(errors.New("unknown error"))
},
Err: errors.New("unknown error"),
},
{
Description: `should return no error if all repositories upsert return no error`,
Asset: sampleAsset,
Setup: func(ctx context.Context, ar *mocks.AssetRepository, dr *mocks.DiscoveryRepository) {
ar.EXPECT().Upsert(ctx, sampleAsset).Return(sampleAsset.ID, nil)
dr.EXPECT().Upsert(ctx, *sampleAsset).Return(nil)
},
ReturnedID: sampleAsset.ID,
},
}
for _, tc := range testCases {
t.Run(tc.Description, func(t *testing.T) {
ctx := context.Background()

mockAssetRepo := mocks.NewAssetRepository(t)
mockDiscoveryRepo := mocks.NewDiscoveryRepository(t)
if tc.Setup != nil {
tc.Setup(ctx, mockAssetRepo, mockDiscoveryRepo)
}
if tc.ReturnedID != rid {
t.Fatalf("got returned id %v, expected returned id was %v", rid, tc.ReturnedID)

svc := asset.NewService(mockAssetRepo, mockDiscoveryRepo, mocks.NewLineageRepository(t))
rid, err := svc.UpsertAssetWithoutLineage(ctx, tc.Asset)
if tc.Err != nil {
assert.EqualError(t, err, tc.Err.Error())
return
}
assert.NoError(t, err)
assert.Equal(t, tc.ReturnedID, rid)
})
}
}
Expand Down
46 changes: 39 additions & 7 deletions internal/server/v1beta1/asset.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type AssetService interface {
GetAssetByVersion(ctx context.Context, id string, version string) (asset.Asset, error)
GetAssetVersionHistory(ctx context.Context, flt asset.Filter, id string) ([]asset.Asset, error)
UpsertAsset(ctx context.Context, ast *asset.Asset, upstreams, downstreams []string) (string, error)
UpsertAssetWithoutLineage(ctx context.Context, ast *asset.Asset) (string, error)
DeleteAsset(ctx context.Context, id string) error

GetLineage(ctx context.Context, urn string, query asset.LineageQuery) (asset.Lineage, error)
Expand Down Expand Up @@ -268,13 +269,14 @@ func (server *APIServer) UpsertPatchAsset(ctx context.Context, req *compassv1bet
ast.Patch(patchAssetMap)
ast.UpdatedBy.ID = userID

assetID, err := server.upsertAsset(
ctx,
ast,
"asset_upsert_patch",
req.GetUpstreams(),
req.GetDownstreams(),
)
var assetID string
if len(req.Upstreams) != 0 || len(req.Downstreams) != 0 || req.OverwriteLineage {
assetID, err = server.upsertAsset(
ctx, ast, "asset_upsert_patch", req.GetUpstreams(), req.GetDownstreams(),
)
} else {
assetID, err = server.upsertAssetWithoutLineage(ctx, ast)
}
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -382,6 +384,36 @@ func (server *APIServer) upsertAsset(
return
}

func (server *APIServer) upsertAssetWithoutLineage(ctx context.Context, ast asset.Asset) (string, error) {
const mode = "asset_upsert_patch_without_lineage"

if err := server.validateAsset(ast); err != nil {
return "", status.Error(codes.InvalidArgument, err.Error())
}

assetID, err := server.assetService.UpsertAssetWithoutLineage(ctx, &ast)
if err != nil {
switch {
case errors.As(err, new(asset.InvalidError)):
return "", status.Error(codes.InvalidArgument, err.Error())

case errors.As(err, new(asset.DiscoveryError)):
server.sendStatsDCounterMetric("discovery_error",
map[string]string{
"method": mode,
})
}

return "", internalServerError(server.logger, err.Error())
}

server.sendStatsDCounterMetric(mode, map[string]string{
"type": ast.Type.String(),
"service": ast.Service,
})
return assetID, nil
}

func (server *APIServer) buildAsset(baseAsset *compassv1beta1.UpsertAssetRequest_Asset) asset.Asset {
ast := asset.Asset{
URN: baseAsset.GetUrn(),
Expand Down
91 changes: 91 additions & 0 deletions internal/server/v1beta1/asset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -678,6 +678,97 @@ func TestUpsertPatchAsset(t *testing.T) {

},
},
{
Description: "without explicit overwrite_lineage, should upsert asset without lineage",
Setup: func(ctx context.Context, as *mocks.AssetService) {
patchedAsset := asset.Asset{
URN: "test dagger",
Type: asset.TypeTable,
Name: "new-name",
Service: "kafka",
UpdatedBy: user.User{ID: userID},
Data: map[string]interface{}{},
Owners: []user.User{{ID: "id", UUID: "", Email: "[email protected]", Provider: "provider"}},
}

assetWithID := patchedAsset
assetWithID.ID = assetID

as.EXPECT().GetAssetByID(ctx, "test dagger").Return(currentAsset, nil)
as.EXPECT().UpsertAssetWithoutLineage(ctx, &patchedAsset).
Return(assetWithID.ID, nil).
Run(func(ctx context.Context, ast *asset.Asset) {
patchedAsset.ID = assetWithID.ID
})
},
Request: &compassv1beta1.UpsertPatchAssetRequest{
Asset: &compassv1beta1.UpsertPatchAssetRequest_Asset{
Urn: "test dagger",
Type: "table",
Name: wrapperspb.String("new-name"),
Service: "kafka",
Data: &structpb.Struct{},
Owners: []*compassv1beta1.User{{Id: "id", Uuid: "", Email: "[email protected]", Provider: "provider"}},
},
},
ExpectStatus: codes.OK,
PostCheck: func(resp *compassv1beta1.UpsertPatchAssetResponse) error {
expected := &compassv1beta1.UpsertPatchAssetResponse{
Id: assetID,
}
if diff := cmp.Diff(resp, expected, protocmp.Transform()); diff != "" {
return fmt.Errorf("expected response to be %+v, was %+v", expected, resp)
}
return nil

},
},
{
Description: "with explicit overwrite_lineage, should upsert asset when lineage is not in the request",
Setup: func(ctx context.Context, as *mocks.AssetService) {
patchedAsset := asset.Asset{
URN: "test dagger",
Type: asset.TypeTable,
Name: "new-name",
Service: "kafka",
UpdatedBy: user.User{ID: userID},
Data: map[string]interface{}{},
Owners: []user.User{{ID: "id", UUID: "", Email: "[email protected]", Provider: "provider"}},
}

assetWithID := patchedAsset
assetWithID.ID = assetID

as.EXPECT().GetAssetByID(ctx, "test dagger").Return(currentAsset, nil)
as.EXPECT().UpsertAsset(ctx, &patchedAsset, []string{}, []string{}).
Return(assetWithID.ID, nil).
Run(func(ctx context.Context, ast *asset.Asset, _, _ []string) {
patchedAsset.ID = assetWithID.ID
})
},
Request: &compassv1beta1.UpsertPatchAssetRequest{
Asset: &compassv1beta1.UpsertPatchAssetRequest_Asset{
Urn: "test dagger",
Type: "table",
Name: wrapperspb.String("new-name"),
Service: "kafka",
Data: &structpb.Struct{},
Owners: []*compassv1beta1.User{{Id: "id", Uuid: "", Email: "[email protected]", Provider: "provider"}},
},
OverwriteLineage: true,
},
ExpectStatus: codes.OK,
PostCheck: func(resp *compassv1beta1.UpsertPatchAssetResponse) error {
expected := &compassv1beta1.UpsertPatchAssetResponse{
Id: assetID,
}
if diff := cmp.Diff(resp, expected, protocmp.Transform()); diff != "" {
return fmt.Errorf("expected response to be %+v, was %+v", expected, resp)
}
return nil

},
},
}
for _, tc := range testCases {
t.Run(tc.Description, func(t *testing.T) {
Expand Down
45 changes: 45 additions & 0 deletions internal/server/v1beta1/mocks/asset_service.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions proto/compass.swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2278,6 +2278,13 @@ definitions:
type: array
items:
$ref: '#/definitions/LineageNode'
overwrite_lineage:
type: boolean
description: |-
overwrite_lineage determines whether the asset's lineage should be
overwritten with the upstreams and downstreams specified in the request.
Currently, it is only applicable when both upstreams and downstreams are
empty/not specified.
upstreams:
type: array
items:
Expand Down
Loading

0 comments on commit a587fff

Please sign in to comment.