From 3e0e204841ddf3ea22bee36710d423ea76b0ecb4 Mon Sep 17 00:00:00 2001 From: jiangpengcheng Date: Fri, 2 Feb 2024 02:45:25 +0800 Subject: [PATCH] feat: Add topic auto creation policy for namespace resource (#117) * Support topic_auto_creation * Fix errors * Fix errors * Add testcase * Add document * Fix tests --- docs/resources/namespace.md | 12 +++ examples/namespaces/main.tf | 6 ++ go.mod | 2 +- go.sum | 4 +- pulsar/resource_pulsar_namespace.go | 93 ++++++++++++++++++ pulsar/resource_pulsar_namespace_test.go | 114 ++++++++++++++++++++++- pulsar/validate_helpers.go | 9 ++ 7 files changed, 235 insertions(+), 5 deletions(-) diff --git a/docs/resources/namespace.md b/docs/resources/namespace.md index 04c77a9..d19f28b 100644 --- a/docs/resources/namespace.md +++ b/docs/resources/namespace.md @@ -27,6 +27,7 @@ description: |- - `permission_grant` (Block Set) (see [below for nested schema](#nestedblock--permission_grant)) - `persistence_policies` (Block Set, Max: 1) (see [below for nested schema](#nestedblock--persistence_policies)) - `retention_policies` (Block Set, Max: 1) (see [below for nested schema](#nestedblock--retention_policies)) +- `topic_auto_creation` (Block Set, Max: 1) (see [below for nested schema](#nestedblock--topic_auto_creation)) ### Read-Only @@ -98,4 +99,15 @@ Required: - `retention_minutes` (String) - `retention_size_in_mb` (String) + +### Nested Schema for `topic_auto_creation` + +Required: + +- `enable` (Boolean) + +Optional: + +- `type` (String) +- `partitions` (Number) diff --git a/examples/namespaces/main.tf b/examples/namespaces/main.tf index e3a6768..4d45735 100644 --- a/examples/namespaces/main.tf +++ b/examples/namespaces/main.tf @@ -73,4 +73,10 @@ resource "pulsar_namespace" "test" { limit_bytes = "10000000000" policy = "producer_request_hold" } + + topic_auto_creation { + enable = true + type = "partitioned" + partitions = 3 + } } diff --git a/go.mod b/go.mod index bed91e0..b9ad84e 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/streamnative/terraform-provider-pulsar go 1.18 require ( - github.com/apache/pulsar-client-go v0.9.1-0.20230816081803-fbee610ddcbf + github.com/apache/pulsar-client-go v0.12.0 github.com/cenkalti/backoff/v4 v4.1.2 github.com/hashicorp/go-multierror v1.1.1 github.com/hashicorp/terraform-plugin-log v0.4.0 diff --git a/go.sum b/go.sum index ac1b044..4878009 100644 --- a/go.sum +++ b/go.sum @@ -16,8 +16,8 @@ github.com/agext/levenshtein v1.2.1/go.mod h1:JEDfjyjHDjOF/1e4FlBE/PkbqA9OfWu2ki github.com/agext/levenshtein v1.2.2 h1:0S/Yg6LYmFJ5stwQeRp6EeOcCbj7xiqQSdNelsXvaqE= github.com/agext/levenshtein v1.2.2/go.mod h1:JEDfjyjHDjOF/1e4FlBE/PkbqA9OfWu2ki2W0IB5558= github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYUyUczH0OGQWaF5ceTx0UBShxjsH6f8oGKYe2c= -github.com/apache/pulsar-client-go v0.9.1-0.20230816081803-fbee610ddcbf h1:k9hqsKPh5ncKf0e3CkzvBTYXLwCYNYFb1Vtk3qnYAvk= -github.com/apache/pulsar-client-go v0.9.1-0.20230816081803-fbee610ddcbf/go.mod h1:Ea/yiZA7plgiaWRyOuO1B0k5/hjpl1thmiKig+D9PBQ= +github.com/apache/pulsar-client-go v0.12.0 h1:rrMlwpr6IgLRPXLRRh2vSlcw5tGV2PUSjZwmqgh2B2I= +github.com/apache/pulsar-client-go v0.12.0/go.mod h1:dkutuH4oS2pXiGm+Ti7fQZ4MRjrMPZ8IJeEGAWMeckk= github.com/apparentlymart/go-cidr v1.1.0 h1:2mAhrMoF+nhXqxTzSZMUzDHkLjmIHC+Zzn4tdgBZjnU= github.com/apparentlymart/go-cidr v1.1.0/go.mod h1:EBcsNrHc3zQeuaeCeCtQruQm+n9/YjEn/vI25Lg7Gwc= github.com/apparentlymart/go-dump v0.0.0-20180507223929-23540a00eaa3/go.mod h1:oL81AME2rN47vu18xqj1S1jPIPuN7afo62yKTNn3XMM= diff --git a/pulsar/resource_pulsar_namespace.go b/pulsar/resource_pulsar_namespace.go index 8c391bd..54a6e83 100644 --- a/pulsar/resource_pulsar_namespace.go +++ b/pulsar/resource_pulsar_namespace.go @@ -230,6 +230,30 @@ func resourcePulsarNamespace() *schema.Resource { }, }, }, + "topic_auto_creation": { + Type: schema.TypeSet, + Optional: true, + MaxItems: 1, + Elem: &schema.Resource{ + Schema: map[string]*schema.Schema{ + "enable": { + Type: schema.TypeBool, + Required: true, + }, + "type": { + Type: schema.TypeString, + Optional: true, + ValidateFunc: validatePartitionedTopicType, + Default: "non-partitioned", + }, + "partitions": { + Type: schema.TypeInt, + Optional: true, + }, + }, + }, + Set: topicAutoCreationPoliciesToHash, + }, }, } } @@ -411,6 +435,23 @@ func resourcePulsarNamespaceRead(ctx context.Context, d *schema.ResourceData, me setPermissionGrant(d, grants) } + if topicAutoCreation, ok := d.GetOk("topic_auto_creation"); ok && topicAutoCreation.(*schema.Set).Len() > 0 { + autoCreation, err := client.GetTopicAutoCreation(*ns) + if err != nil { + return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetTopicAutoCreation: %w", err)) + } + + data := map[string]interface{}{ + "enable": autoCreation.Allow, + "type": autoCreation.Type.String(), + } + if autoCreation.Partitions != nil { + data["partitions"] = *autoCreation.Partitions + } + + _ = d.Set("topic_auto_creation", schema.NewSet(topicAutoCreationPoliciesToHash, []interface{}{data})) + } + return nil } @@ -426,6 +467,7 @@ func resourcePulsarNamespaceUpdate(ctx context.Context, d *schema.ResourceData, dispatchRateConfig := d.Get("dispatch_rate").(*schema.Set) persistencePoliciesConfig := d.Get("persistence_policies").(*schema.Set) permissionGrantConfig := d.Get("permission_grant").(*schema.Set) + topicAutoCreation := d.Get("topic_auto_creation").(*schema.Set) nsName, err := utils.GetNameSpaceName(tenant, namespace) if err != nil { @@ -562,6 +604,21 @@ func resourcePulsarNamespaceUpdate(ctx context.Context, d *schema.ResourceData, } } + if topicAutoCreation.Len() > 0 { + topicAutoCreationPolicy, err := unmarshalTopicAutoCreation(topicAutoCreation) + if err != nil { + errs = multierror.Append(errs, fmt.Errorf("SetTopicAutoCreation: %w", err)) + } else { + if err = client.SetTopicAutoCreation(*nsName, *topicAutoCreationPolicy); err != nil { + errs = multierror.Append(errs, fmt.Errorf("SetTopicAutoCreation: %w", err)) + } + } + } else { // remove the topicAutoCreation + if err = client.RemoveTopicAutoCreation(*nsName); err != nil { + errs = multierror.Append(errs, fmt.Errorf("RemoveTopicAutoCreation: %w", err)) + } + } + if errs != nil { return diag.FromErr(fmt.Errorf("ERROR_UPDATE_NAMESPACE_CONFIG: %w", errs)) } @@ -591,6 +648,7 @@ func resourcePulsarNamespaceDelete(ctx context.Context, d *schema.ResourceData, _ = d.Set("dispatch_rate", nil) _ = d.Set("persistence_policies", nil) _ = d.Set("permission_grant", nil) + _ = d.Set("topic_auto_creation", nil) return nil } @@ -644,6 +702,19 @@ func persistencePoliciesToHash(v interface{}) int { return hashcode.String(buf.String()) } +func topicAutoCreationPoliciesToHash(v interface{}) int { + var buf bytes.Buffer + m := v.(map[string]interface{}) + + buf.WriteString(fmt.Sprintf("%t-", m["enable"].(bool))) + buf.WriteString(fmt.Sprintf("%s-", m["type"].(string))) + if m["partitions"] != nil { + buf.WriteString(fmt.Sprintf("%d-", m["partitions"].(int))) + } + + return hashcode.String(buf.String()) +} + func unmarshalDispatchRate(v *schema.Set) *utils.DispatchRate { var dispatchRate utils.DispatchRate @@ -710,3 +781,25 @@ func unmarshalPersistencePolicies(v *schema.Set) *utils.PersistencePolicies { return &persPolicies } + +func unmarshalTopicAutoCreation(v *schema.Set) (*utils.TopicAutoCreationConfig, error) { + var topicAutoCreation utils.TopicAutoCreationConfig + + for _, policy := range v.List() { + data := policy.(map[string]interface{}) + + topicAutoCreation.Allow = data["enable"].(bool) + topicAutoCreation.Type = utils.TopicType(data["type"].(string)) + if topicAutoCreation.Type == utils.Partitioned { + partitions := data["partitions"].(int) + if partitions <= 0 { + return nil, fmt.Errorf("ERROR_PARSE_TOPIC_AUTO_CREATION: partitions must be greater than 0") + } + topicAutoCreation.Partitions = &partitions + } else if topicAutoCreation.Type != utils.NonPartitioned { + return nil, fmt.Errorf("ERROR_PARSE_TOPIC_AUTO_CREATION: unknown topic type %s", topicAutoCreation.Type) + } + } + + return &topicAutoCreation, nil +} diff --git a/pulsar/resource_pulsar_namespace_test.go b/pulsar/resource_pulsar_namespace_test.go index 7afac7a..9260157 100644 --- a/pulsar/resource_pulsar_namespace_test.go +++ b/pulsar/resource_pulsar_namespace_test.go @@ -134,6 +134,10 @@ func TestNamespaceWithUpdate(t *testing.T) { resource.TestCheckResourceAttr(resourceName, "permission_grant.1.actions.#", "2"), resource.TestCheckResourceAttr(resourceName, "permission_grant.1.actions.0", "consume"), resource.TestCheckResourceAttr(resourceName, "permission_grant.1.actions.1", "produce"), + resource.TestCheckResourceAttr(resourceName, "topic_auto_creation.#", "1"), + resource.TestCheckResourceAttr(resourceName, "topic_auto_creation.0.enable", "true"), + resource.TestCheckResourceAttr(resourceName, "topic_auto_creation.0.type", "partitioned"), + resource.TestCheckResourceAttr(resourceName, "topic_auto_creation.0.partitions", "3"), ), }, }, @@ -245,6 +249,75 @@ func TestNamespaceWithPermissionGrantUpdate(t *testing.T) { }) } +func TestNamespaceWithTopicAutoCreationUpdate(t *testing.T) { + + resourceName := "pulsar_namespace.test" + cName := acctest.RandString(10) + tName := acctest.RandString(10) + nsName := acctest.RandString(10) + + resource.Test(t, resource.TestCase{ + PreCheck: func() { testAccPreCheck(t) }, + ProviderFactories: testAccProviderFactories, + IDRefreshName: resourceName, + CheckDestroy: testPulsarNamespaceDestroy, + Steps: []resource.TestStep{ + { + Config: testPulsarNamespaceWithoutOptionals(testWebServiceURL, cName, tName, nsName), + Check: resource.ComposeTestCheckFunc( + testPulsarNamespaceExists(resourceName), + resource.TestCheckNoResourceAttr(resourceName, "topic_auto_creation.#"), + ), + }, + { + Config: testPulsarNamespaceWithTopicAutoCreation(testWebServiceURL, cName, tName, nsName, + `topic_auto_creation { + enable = "false" + }`), + Check: resource.ComposeTestCheckFunc( + testPulsarNamespaceExists(resourceName), + resource.TestCheckResourceAttr(resourceName, "topic_auto_creation.#", "1"), + resource.TestCheckResourceAttr(resourceName, "topic_auto_creation.0.enable", "false"), + ), + }, + { + Config: testPulsarNamespaceWithTopicAutoCreation(testWebServiceURL, cName, tName, nsName, + `topic_auto_creation { + enable = "true" + }`), + Check: resource.ComposeTestCheckFunc( + testPulsarNamespaceExists(resourceName), + resource.TestCheckResourceAttr(resourceName, "topic_auto_creation.#", "1"), + resource.TestCheckResourceAttr(resourceName, "topic_auto_creation.0.enable", "true"), + resource.TestCheckResourceAttr(resourceName, "topic_auto_creation.0.type", "non-partitioned"), + ), + }, + { + Config: testPulsarNamespaceWithTopicAutoCreation(testWebServiceURL, cName, tName, nsName, + `topic_auto_creation { + enable = "true" + type = "partitioned" + partitions = 3 + }`), + Check: resource.ComposeTestCheckFunc( + testPulsarNamespaceExists(resourceName), + resource.TestCheckResourceAttr(resourceName, "topic_auto_creation.#", "1"), + resource.TestCheckResourceAttr(resourceName, "topic_auto_creation.0.enable", "true"), + resource.TestCheckResourceAttr(resourceName, "topic_auto_creation.0.type", "partitioned"), + resource.TestCheckResourceAttr(resourceName, "topic_auto_creation.0.partitions", "3"), + ), + }, + { + Config: testPulsarNamespaceWithoutOptionals(testWebServiceURL, cName, tName, nsName), + Check: resource.ComposeTestCheckFunc( + testPulsarNamespaceExists(resourceName), + resource.TestCheckNoResourceAttr(resourceName, "topic_auto_creation.#"), + ), + }, + }, + }) +} + func TestImportExistingNamespace(t *testing.T) { tname := "public" ns := acctest.RandString(10) @@ -322,8 +395,8 @@ func testNamespaceImported() resource.ImportStateCheckFunc { return fmt.Errorf("expected %d states, got %d: %#v", 1, len(s), s) } - if len(s[0].Attributes) != 10 { - return fmt.Errorf("expected %d attrs, got %d: %#v", 10, len(s[0].Attributes), s[0].Attributes) + if len(s[0].Attributes) != 11 { + return fmt.Errorf("expected %d attrs, got %d: %#v", 11, len(s[0].Attributes), s[0].Attributes) } return nil @@ -462,6 +535,12 @@ resource "pulsar_namespace" "test" { role = "some-role-2" actions = ["produce", "consume"] } + + topic_auto_creation { + enable = true + type = "partitioned" + partitions = 3 + } } `, wsURL, cluster, tenant, ns) } @@ -544,3 +623,34 @@ resource "pulsar_namespace" "test" { } `, wsURL, cluster, tenant, ns, permissionGrants) } + +func testPulsarNamespaceWithTopicAutoCreation(wsURL, cluster, tenant, ns string, topicAutoCreation string) string { + return fmt.Sprintf(` +provider "pulsar" { + web_service_url = "%s" +} + +resource "pulsar_cluster" "test_cluster" { + cluster = "%s" + + cluster_data { + web_service_url = "http://localhost:8080" + broker_service_url = "http://localhost:6050" + peer_clusters = ["standalone"] + } + +} + +resource "pulsar_tenant" "test_tenant" { + tenant = "%s" + allowed_clusters = [pulsar_cluster.test_cluster.cluster, "standalone"] +} + +resource "pulsar_namespace" "test" { + tenant = pulsar_tenant.test_tenant.tenant + namespace = "%s" + + %s +} +`, wsURL, cluster, tenant, ns, topicAutoCreation) +} diff --git a/pulsar/validate_helpers.go b/pulsar/validate_helpers.go index a08f53f..3904eb0 100644 --- a/pulsar/validate_helpers.go +++ b/pulsar/validate_helpers.go @@ -50,3 +50,12 @@ func validateAuthAction(val interface{}, key string) (warns []string, errs []err } return } + +func validatePartitionedTopicType(val interface{}, key string) (warns []string, errs []error) { + v := val.(string) + _, err := utils.ParseTopicType(v) + if err != nil { + errs = append(errs, fmt.Errorf("%q must be a valid topic type (got: %s): %w", key, v, err)) + } + return +}