-
Notifications
You must be signed in to change notification settings - Fork 1
/
clientv1_roles.go
120 lines (108 loc) · 3.62 KB
/
clientv1_roles.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package sams
import (
"context"
"io"
"slices"
"connectrpc.com/connect"
"golang.org/x/oauth2"
"github.com/google/uuid"
clientsv1 "github.com/sourcegraph/sourcegraph-accounts-sdk-go/clients/v1"
"github.com/sourcegraph/sourcegraph-accounts-sdk-go/clients/v1/clientsv1connect"
"github.com/sourcegraph/sourcegraph-accounts-sdk-go/roles"
"github.com/sourcegraph/sourcegraph/lib/errors"
)
// RolesServiceV1 provides client methods to interact with the
// RolesService API v1.
type RolesServiceV1 struct {
client *ClientV1
}
func (s *RolesServiceV1) newClient(ctx context.Context) clientsv1connect.RolesServiceClient {
return clientsv1connect.NewRolesServiceClient(
oauth2.NewClient(ctx, s.client.tokenSource),
s.client.gRPCURL(),
connect.WithInterceptors(s.client.defaultInterceptors...),
)
}
// RegisterResourcesMetadata is the metadata for a set of resources to be registered.
type RegisterResourcesMetadata struct {
ResourceType roles.ResourceType
}
func (r RegisterResourcesMetadata) validate() error {
if !slices.Contains(roles.ResourceTypes(), r.ResourceType) {
return errors.Newf("invalid resource type: %q", r.ResourceType)
}
return nil
}
// RegisterRoleResources registers the resources for a given resource type.
// `resourcesIterator` is a function that returns a page of resources to register.
// The function is invoked repeatedly until it produces an empty slice or an error.
// If another replica is already registering resources for the same resource type
// the function will return 0 with ErrAborted.
// ErrAborted means the request is safe to retry at a later time.
//
// Required scope: sams::roles.resources::write
func (s *RolesServiceV1) RegisterRoleResources(ctx context.Context, metadata RegisterResourcesMetadata, resourcesIterator func() ([]*clientsv1.RoleResource, error)) (uint64, error) {
err := metadata.validate()
if err != nil {
return 0, errors.Wrap(err, "invalid metadata")
}
/// Generate a new revision for the request metadata.
revision, err := uuid.NewV7()
if err != nil {
return 0, errors.Wrap(err, "failed to generate revision for request metadata")
}
client := s.newClient(ctx)
stream := client.RegisterRoleResources(ctx)
// Metadata must be submitted first in the stream.
err = stream.Send(&clientsv1.RegisterRoleResourcesRequest{
Payload: &clientsv1.RegisterRoleResourcesRequest_Metadata{
Metadata: &clientsv1.RegisterRoleResourcesRequestMetadata{
ResourceType: string(metadata.ResourceType),
Revision: revision.String(),
},
},
})
sendResources := true
if err != nil {
// The stream has been closed; skip sending resources.
if errors.Is(err, io.EOF) {
sendResources = false
} else {
return 0, errors.Wrap(err, "failed to send metadata")
}
}
for sendResources {
resources, err := resourcesIterator()
if err != nil {
return 0, errors.Wrap(err, "failed to get resources")
}
if len(resources) == 0 {
sendResources = false
continue
}
err = stream.Send(&clientsv1.RegisterRoleResourcesRequest{
Payload: &clientsv1.RegisterRoleResourcesRequest_Resources_{
Resources: &clientsv1.RegisterRoleResourcesRequest_Resources{
Resources: resources,
},
},
})
if err != nil {
// The stream has been closed, so we stop sending resources.
if errors.Is(err, io.EOF) {
sendResources = false
continue
}
return 0, errors.Wrap(err, "failed to send resources")
}
}
resp, err := parseResponseAndError(stream.CloseAndReceive())
if err != nil {
// Stream closed due to another replica registering the same resources.
if errors.Is(err, ErrAborted) {
return 0, nil
}
return 0, err
}
return resp.Msg.GetResourceCount(), nil
}