Skip to content

Commit

Permalink
Merge branch 'master' into propagate-annotations
Browse files Browse the repository at this point in the history
  • Loading branch information
kobzonega committed Jun 26, 2024
2 parents 62e8736 + 36ee6cf commit 04817aa
Show file tree
Hide file tree
Showing 22 changed files with 333 additions and 146 deletions.
4 changes: 2 additions & 2 deletions deploy/ydb-operator/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ type: application
# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
# Versions are expected to follow Semantic Versioning (https://semver.org/)
version: 0.5.15
version: 0.5.16

# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application. Versions are not expected to
# follow Semantic Versioning. They should reflect the version the application is using.
# It is recommended to use it with quotes.
appVersion: "0.5.15"
appVersion: "0.5.16"
22 changes: 13 additions & 9 deletions e2e/tests/smoke_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,15 +474,19 @@ var _ = Describe("Operator smoke test", func() {
By("checking that all the storage pods are running and ready...")
checkPodsRunningAndReady(ctx, "ydb-cluster", "kind-storage", storageSample.Spec.Nodes)

By("database can be healthily created after Frozen storage...")
Expect(k8sClient.Create(ctx, databaseSample)).Should(Succeed())
defer func() {
Expect(k8sClient.Delete(ctx, databaseSample)).Should(Succeed())
}()
By("waiting until database is ready...")
waitUntilDatabaseReady(ctx, databaseSample.Name, testobjects.YdbNamespace)
By("checking that all the database pods are running and ready...")
checkPodsRunningAndReady(ctx, "ydb-cluster", "kind-database", databaseSample.Spec.Nodes)
/*
// This test suite attempts to create a database on uninitialised storage
By("database can be healthily created after Frozen storage...")
Expect(k8sClient.Create(ctx, databaseSample)).Should(Succeed())
defer func() {
Expect(k8sClient.Delete(ctx, databaseSample)).Should(Succeed())
}()
By("waiting until database is ready...")
waitUntilDatabaseReady(ctx, databaseSample.Name, testobjects.YdbNamespace)
By("checking that all the database pods are running and ready...")
checkPodsRunningAndReady(ctx, "ydb-cluster", "kind-database", databaseSample.Spec.Nodes)
*/
})

It("create storage and database with nodeSets", func() {
Expand Down
84 changes: 50 additions & 34 deletions internal/cms/tenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,22 @@ import (
"fmt"

"github.com/ydb-platform/ydb-go-genproto/Ydb_Cms_V1"
"github.com/ydb-platform/ydb-go-genproto/Ydb_Operation_V1"
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb"
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Cms"
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Operations"
ydb "github.com/ydb-platform/ydb-go-sdk/v3"
ydbCredentials "github.com/ydb-platform/ydb-go-sdk/v3/credentials"
"sigs.k8s.io/controller-runtime/pkg/log"

ydbv1alpha1 "github.com/ydb-platform/ydb-kubernetes-operator/api/v1alpha1"
"github.com/ydb-platform/ydb-kubernetes-operator/internal/connection"
"github.com/ydb-platform/ydb-kubernetes-operator/internal/resources"
)

var ErrEmptyReplyFromStorage = errors.New("empty reply from storage")

type Tenant struct {
StorageEndpoint string
Domain string
Path string
StorageUnits []ydbv1alpha1.StorageUnit
Shared bool
Expand All @@ -29,43 +30,29 @@ type Tenant struct {

func (t *Tenant) Create(
ctx context.Context,
database *resources.DatabaseBuilder,
creds ydbCredentials.Credentials,
opts ...ydb.Option,
) error {
) (string, error) {
logger := log.FromContext(ctx)
createDatabaseURL := fmt.Sprintf(
"%s/%s",
t.StorageEndpoint,
database.Spec.Domain,
)

db, err := connection.Open(ctx,
createDatabaseURL,
ydb.WithCredentials(creds),
ydb.MergeOptions(opts...),
)
url := fmt.Sprintf("%s/%s", t.StorageEndpoint, t.Domain)
conn, err := connection.Open(ctx, url, opts...)
if err != nil {
logger.Error(err, "Error connecting to YDB storage")
return err
return "", err
}
defer func() {
connection.Close(ctx, db)
connection.Close(ctx, conn)
}()

client := Ydb_Cms_V1.NewCmsServiceClient(ydb.GRPCConn(db))
logger.Info(fmt.Sprintf("creating tenant, url: %s", createDatabaseURL))
client := Ydb_Cms_V1.NewCmsServiceClient(ydb.GRPCConn(conn))
logger.Info(fmt.Sprintf("creating tenant, url: %s", url))
request := t.makeCreateDatabaseRequest()
logger.Info(fmt.Sprintf("creating tenant, request: %s", request))
response, err := client.CreateDatabase(ctx, request)
if err != nil {
return err
}
if _, err := processDatabaseCreationResponse(response); err != nil {
return err
return "", err
}
logger.Info(fmt.Sprintf("creating tenant, response: %s", response))
return nil
return processDatabaseCreationOperation(response.Operation)
}

func (t *Tenant) makeCreateDatabaseRequest() *Ydb_Cms.CreateDatabaseRequest {
Expand Down Expand Up @@ -101,17 +88,46 @@ func (t *Tenant) makeCreateDatabaseRequest() *Ydb_Cms.CreateDatabaseRequest {
return request
}

func processDatabaseCreationResponse(response *Ydb_Cms.CreateDatabaseResponse) (bool, error) {
if response.Operation == nil {
return false, ErrEmptyReplyFromStorage
func processDatabaseCreationOperation(operation *Ydb_Operations.Operation) (string, error) {
if operation == nil {
return "", ErrEmptyReplyFromStorage
}

if response.Operation.Status == Ydb.StatusIds_ALREADY_EXISTS || response.Operation.Status == Ydb.StatusIds_SUCCESS {
return true, nil
if !operation.Ready {
return operation.Id, nil
}
if operation.Status == Ydb.StatusIds_ALREADY_EXISTS || operation.Status == Ydb.StatusIds_SUCCESS {
return "", nil
}
if response.Operation.Status == Ydb.StatusIds_STATUS_CODE_UNSPECIFIED && len(response.Operation.Issues) == 0 {
return true, nil
return "", fmt.Errorf("YDB response error: %v %v", operation.Status, operation.Issues)
}

func (t *Tenant) CheckCreateOperation(
ctx context.Context,
operationID string,
opts ...ydb.Option,
) (bool, error, error) {
logger := log.FromContext(ctx)
url := fmt.Sprintf("%s/%s", t.StorageEndpoint, t.Domain)
conn, err := connection.Open(ctx, url, opts...)
if err != nil {
logger.Error(err, "Error connecting to YDB storage")
return false, nil, err
}
defer func() {
connection.Close(ctx, conn)
}()

return false, fmt.Errorf("YDB response error: %v %v", response.Operation.Status, response.Operation.Issues)
client := Ydb_Operation_V1.NewOperationServiceClient(ydb.GRPCConn(conn))
request := &Ydb_Operations.GetOperationRequest{Id: operationID}
logger.Info(fmt.Sprintf("checking operation, url: %s, operationId: %s, request: %s", url, operationID, request))
response, err := client.GetOperation(ctx, request)
if err != nil {
return false, nil, err
}
logger.Info(fmt.Sprintf("checking operation, response: %s", response))
if response.Operation == nil {
return false, nil, ErrEmptyReplyFromStorage
}
oid, err := processDatabaseCreationOperation(response.Operation)
return len(oid) == 0, err, nil
}
11 changes: 6 additions & 5 deletions internal/controllers/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@ const (
StoragePausedCondition = "StoragePaused"
StorageReadyCondition = "StorageReady"

DatabasePreparedCondition = "DatabasePrepared"
DatabaseInitializedCondition = "DatabaseInitialized"
DatabaseProvisionedCondition = "DatabaseProvisioned"
DatabasePausedCondition = "DatabasePaused"
DatabaseReadyCondition = "DatabaseReady"
DatabasePreparedCondition = "DatabasePrepared"
DatabaseInitializedCondition = "DatabaseInitialized"
DatabaseProvisionedCondition = "DatabaseProvisioned"
DatabasePausedCondition = "DatabasePaused"
DatabaseReadyCondition = "DatabaseReady"
CreateDatabaseOperationCondition = "CreateDatabaseOperation"

NodeSetPreparedCondition = "NodeSetPrepared"
NodeSetProvisionedCondition = "NodeSetProvisioned"
Expand Down
2 changes: 1 addition & 1 deletion internal/controllers/database/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,11 +164,11 @@ func (r *Reconciler) SetupWithManager(mgr ctrl.Manager) error {
).
WithEventFilter(predicate.Or(
predicate.GenerationChangedPredicate{},
resources.IgnoreDeletetionPredicate(),
resources.LastAppliedAnnotationPredicate(),
resources.IsServicePredicate(),
resources.IsSecretPredicate(),
)).
WithEventFilter(resources.IgnoreDeletetionPredicate()).
Complete(r)
}

Expand Down
93 changes: 88 additions & 5 deletions internal/controllers/database/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"strconv"

"github.com/ydb-platform/ydb-go-sdk/v3"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
Expand Down Expand Up @@ -66,9 +67,76 @@ func (r *Reconciler) setInitDatabaseCompleted(
Reason: ReasonCompleted,
Message: message,
})
meta.SetStatusCondition(&database.Status.Conditions, metav1.Condition{
Type: CreateDatabaseOperationCondition,
Status: metav1.ConditionTrue,
Reason: ReasonCompleted,
Message: "Tenant creation operation is completed",
})
return r.updateStatus(ctx, database, StatusUpdateRequeueDelay)
}

func (r *Reconciler) checkCreateTenantOperation(
ctx context.Context,
database *resources.DatabaseBuilder,
tenant *cms.Tenant,
ydbOptions ydb.Option,
) (bool, ctrl.Result, error) {
condition := meta.FindStatusCondition(database.Status.Conditions, CreateDatabaseOperationCondition)
if condition == nil || len(condition.Message) == 0 {
// Something is wrong with the condition where we save operation id
// retry create tenant
meta.SetStatusCondition(&database.Status.Conditions, metav1.Condition{
Type: CreateDatabaseOperationCondition,
Status: metav1.ConditionTrue,
Reason: ReasonNotRequired,
})
return r.updateStatus(ctx, database, DatabaseInitializationRequeueDelay)
}
operationID := condition.Message
finished, operationErr, err := tenant.CheckCreateOperation(ctx, operationID, ydbOptions)
if err != nil {
r.Recorder.Event(
database,
corev1.EventTypeWarning,
"InitializingFailed",
fmt.Sprintf("Error creating tenant %s: %s", tenant.Path, err),
)
return Stop, ctrl.Result{RequeueAfter: DatabaseInitializationRequeueDelay}, err
}
if operationErr != nil {
// Creation operation failed - retry Create Tenant
r.Recorder.Event(
database,
corev1.EventTypeWarning,
"InitializingFailed",
fmt.Sprintf("Error creating tenant %s: %s", tenant.Path, operationErr),
)
meta.SetStatusCondition(&database.Status.Conditions, metav1.Condition{
Type: CreateDatabaseOperationCondition,
Status: metav1.ConditionTrue,
Reason: ReasonNotRequired,
})
return r.updateStatus(ctx, database, DatabaseInitializationRequeueDelay)
}
if !finished {
r.Recorder.Event(
database,
corev1.EventTypeWarning,
"Pending",
fmt.Sprintf("Tenant creation operation is not completed, operationID: %s", operationID),
)
return Stop, ctrl.Result{RequeueAfter: DatabaseInitializationRequeueDelay}, nil
}
r.Recorder.Event(
database,
corev1.EventTypeNormal,
"Initialized",
fmt.Sprintf("Tenant %s created", tenant.Path),
)
return r.setInitDatabaseCompleted(ctx, database, "Database initialized successfully")
}

func (r *Reconciler) initializeTenant(
ctx context.Context,
database *resources.DatabaseBuilder,
Expand Down Expand Up @@ -143,8 +211,9 @@ func (r *Reconciler) initializeTenant(
return Stop, ctrl.Result{RequeueAfter: DefaultRequeueDelay}, ErrIncorrectDatabaseResourcesConfiguration
}

tenant := cms.Tenant{
tenant := &cms.Tenant{
StorageEndpoint: database.Spec.StorageEndpoint,
Domain: database.Spec.Domain,
Path: path,
StorageUnits: storageUnits,
Shared: shared,
Expand All @@ -171,19 +240,33 @@ func (r *Reconciler) initializeTenant(
)
return Stop, ctrl.Result{RequeueAfter: DefaultRequeueDelay}, err
}
ydbOpts := ydb.MergeOptions(ydb.WithCredentials(creds), tlsOptions)

err = tenant.Create(ctx, database, creds, tlsOptions)
if meta.IsStatusConditionFalse(database.Status.Conditions, CreateDatabaseOperationCondition) {
return r.checkCreateTenantOperation(ctx, database, tenant, ydbOpts)
}
operationID, err := tenant.Create(ctx, ydb.WithCredentials(creds), tlsOptions)
if err != nil {
r.Recorder.Event(
database,
corev1.EventTypeWarning,
"InitializingFailed",
fmt.Sprintf("Error creating tenant %s: %s", tenant.Path, err),
)
return Stop, ctrl.Result{RequeueAfter: DatabaseInitializationRequeueDelay}, err
}
if len(operationID) > 0 {
r.Recorder.Event(
database,
corev1.EventTypeWarning,
"Pending",
fmt.Sprintf("Tenant creation operation in progress, operationID: %s", operationID),
)
meta.SetStatusCondition(&database.Status.Conditions, metav1.Condition{
Type: DatabaseInitializedCondition,
Status: metav1.ConditionFalse,
Reason: ReasonInProgress,
Type: CreateDatabaseOperationCondition,
Status: metav1.ConditionFalse,
Reason: ReasonInProgress,
Message: operationID,
})
return r.updateStatus(ctx, database, DatabaseInitializationRequeueDelay)
}
Expand Down
Loading

0 comments on commit 04817aa

Please sign in to comment.