Skip to content
This repository has been archived by the owner on Nov 20, 2024. It is now read-only.

Commit

Permalink
COSI-44: Add RPC server to COSI
Browse files Browse the repository at this point in the history
Source: sigs.k8s.io/container-object-storage-interface-provisioner-sidecar/pkg/provisioner
  • Loading branch information
anurag4DSB committed Nov 9, 2024
1 parent 0037a72 commit 2114837
Show file tree
Hide file tree
Showing 5 changed files with 322 additions and 1 deletion.
2 changes: 1 addition & 1 deletion cmd/scality-cosi-driver/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"github.com/scality/cosi/pkg/driver"
"k8s.io/klog/v2"

"sigs.k8s.io/container-object-storage-interface-provisioner-sidecar/pkg/provisioner"
"github.com/scality/cosi/pkg/provisioner"
)

const (
Expand Down
71 changes: 71 additions & 0 deletions pkg/provisioner/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
Copyright 2024 Scality, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package provisioner

import (
"context"

"google.golang.org/grpc"
cosi "sigs.k8s.io/container-object-storage-interface-spec"
)

var (
_ cosi.IdentityClient = &COSIProvisionerClient{}
_ cosi.ProvisionerClient = &COSIProvisionerClient{}
)

type COSIProvisionerClient struct {
address string
conn *grpc.ClientConn
identityClient cosi.IdentityClient
provisionerClient cosi.ProvisionerClient
}

func (c *COSIProvisionerClient) DriverGetInfo(ctx context.Context,
in *cosi.DriverGetInfoRequest,
opts ...grpc.CallOption) (*cosi.DriverGetInfoResponse, error) {

return c.identityClient.DriverGetInfo(ctx, in, opts...)
}

func (c *COSIProvisionerClient) DriverCreateBucket(ctx context.Context,
in *cosi.DriverCreateBucketRequest,
opts ...grpc.CallOption) (*cosi.DriverCreateBucketResponse, error) {

return c.provisionerClient.DriverCreateBucket(ctx, in, opts...)
}

func (c *COSIProvisionerClient) DriverDeleteBucket(ctx context.Context,
in *cosi.DriverDeleteBucketRequest,
opts ...grpc.CallOption) (*cosi.DriverDeleteBucketResponse, error) {

return c.provisionerClient.DriverDeleteBucket(ctx, in, opts...)
}

func (c *COSIProvisionerClient) DriverGrantBucketAccess(ctx context.Context,
in *cosi.DriverGrantBucketAccessRequest,
opts ...grpc.CallOption) (*cosi.DriverGrantBucketAccessResponse, error) {

return c.provisionerClient.DriverGrantBucketAccess(ctx, in, opts...)
}

func (c *COSIProvisionerClient) DriverRevokeBucketAccess(ctx context.Context,
in *cosi.DriverRevokeBucketAccessRequest,
opts ...grpc.CallOption) (*cosi.DriverRevokeBucketAccessResponse, error) {

return c.provisionerClient.DriverRevokeBucketAccess(ctx, in, opts...)
}
47 changes: 47 additions & 0 deletions pkg/provisioner/interceptors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
Copyright 2024 Scality, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package provisioner

import (
"context"
"encoding/json"
"time"

"google.golang.org/grpc"
"k8s.io/klog/v2"
)

func apiLogger(ctx context.Context, api string,
req, resp interface{},
grpcConn *grpc.ClientConn,
apiCall grpc.UnaryInvoker,
opts ...grpc.CallOption) error {

if jsonReq, err := json.MarshalIndent(req, "", " "); err != nil {
klog.InfoS("Request", "api", api, "req", string(jsonReq))
}

start := time.Now()
err := apiCall(ctx, api, req, resp, grpcConn, opts...)
end := time.Now()

if jsonRes, err := json.MarshalIndent(resp, "", " "); err != nil {
klog.InfoS("Response", "api", api, "elapsed", end.Sub(start), "resp", jsonRes)
}

return err
}
121 changes: 121 additions & 0 deletions pkg/provisioner/provisioner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
Copyright 2024 Scality, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package provisioner

import (
"context"
"fmt"
"net/url"
"time"

"github.com/pkg/errors"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/credentials/insecure"

"k8s.io/klog/v2"
cosi "sigs.k8s.io/container-object-storage-interface-spec"
)

const (
maxGrpcBackoff = 5 * 30 * time.Second
grpcDialTimeout = 30 * time.Second
)

func NewDefaultCOSIProvisionerClient(ctx context.Context, address string, debug bool) (*COSIProvisionerClient, error) {
backoffConfiguration := backoff.DefaultConfig
backoffConfiguration.MaxDelay = maxGrpcBackoff

dialOpts := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()), // strictly restricting to local Unix domain socket
grpc.WithConnectParams(grpc.ConnectParams{
Backoff: backoffConfiguration,
MinConnectTimeout: grpcDialTimeout,
}),
grpc.WithBlock(), // block until connection succeeds
}

interceptors := []grpc.UnaryClientInterceptor{}

if debug {
interceptors = append(interceptors, apiLogger)
}
return NewCOSIProvisionerClient(ctx, address, dialOpts, interceptors)
}

// NewCOSIProvisionerClient creates a new GRPCClient that only supports unix domain sockets
func NewCOSIProvisionerClient(ctx context.Context, address string, dialOpts []grpc.DialOption, interceptors []grpc.UnaryClientInterceptor) (*COSIProvisionerClient, error) {
addr, err := url.Parse(address)
if err != nil {
return nil, err
}

if addr.Scheme != "unix" {
err := errors.New("Address must be a unix domain socket")
klog.ErrorS(err, "Unsupported scheme", "expected", "unix", "found", addr.Scheme)
return nil, fmt.Errorf("unsupported scheme: %w", err)
}

for _, interceptor := range interceptors {
dialOpts = append(dialOpts, grpc.WithChainUnaryInterceptor(interceptor))
}

ctx, cancel := context.WithTimeout(ctx, maxGrpcBackoff)
defer cancel()

conn, err := grpc.DialContext(ctx, address, dialOpts...)
if err != nil {
klog.ErrorS(err, "Connection failed", "address", address)
return nil, err
}
return &COSIProvisionerClient{
address: address,
conn: conn,
identityClient: cosi.NewIdentityClient(conn),
provisionerClient: cosi.NewProvisionerClient(conn),
}, nil
}

func NewDefaultCOSIProvisionerServer(address string,
identityServer cosi.IdentityServer,
provisionerServer cosi.ProvisionerServer) (*COSIProvisionerServer, error) {

return NewCOSIProvisionerServer(address, identityServer, provisionerServer, []grpc.ServerOption{})
}

func NewCOSIProvisionerServer(address string,
identityServer cosi.IdentityServer,
provisionerServer cosi.ProvisionerServer,
listenOpts []grpc.ServerOption) (*COSIProvisionerServer, error) {

if identityServer == nil {
err := errors.New("Identity server cannot be nil")
klog.ErrorS(err, "Invalid argument")
return nil, err
}
if provisionerServer == nil {
err := errors.New("Provisioner server cannot be nil")
klog.ErrorS(err, "Invalid argument")
return nil, err
}

return &COSIProvisionerServer{
address: address,
identityServer: identityServer,
provisionerServer: provisionerServer,
listenOpts: listenOpts,
}, nil
}
82 changes: 82 additions & 0 deletions pkg/provisioner/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
Copyright 2024 Scality, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package provisioner

import (
"context"
"fmt"
"net"
"net/url"

"github.com/pkg/errors"
"google.golang.org/grpc"

"k8s.io/klog/v2"
cosi "sigs.k8s.io/container-object-storage-interface-spec"
)

type COSIProvisionerServer struct {
address string
identityServer cosi.IdentityServer
provisionerServer cosi.ProvisionerServer

listenOpts []grpc.ServerOption
}

func (s *COSIProvisionerServer) Run(ctx context.Context) error {
addr, err := url.Parse(s.address)
if err != nil {
return err
}

if addr.Scheme != "unix" {
err := errors.New("Address must be a unix domain socket")
klog.ErrorS(err, "Unsupported scheme", "expected", "unix", "found", addr.Scheme)
return fmt.Errorf("invalid argument: %w", err)
}

listenConfig := net.ListenConfig{}
listener, err := listenConfig.Listen(ctx, "unix", addr.Path)
if err != nil {
klog.ErrorS(err, "Failed to start server")
return fmt.Errorf("failed to start server: %w", err)
}

server := grpc.NewServer(s.listenOpts...)

if s.provisionerServer == nil || s.identityServer == nil {
err := errors.New("ProvisionerServer and IdentityServer cannot be nil")
klog.ErrorS(err, "Invalid args")
return fmt.Errorf("invalid args: %w", err)
}

cosi.RegisterIdentityServer(server, s.identityServer)
cosi.RegisterProvisionerServer(server, s.provisionerServer)

errChan := make(chan error)
go func() {
errChan <- server.Serve(listener)
}()

select {
case <-ctx.Done():
server.GracefulStop()
return ctx.Err()
case err := <-errChan:
return err
}
}

0 comments on commit 2114837

Please sign in to comment.