From 71e45aef879beaf3ae3806c6fcf3b898f19c3ec1 Mon Sep 17 00:00:00 2001 From: jiangpengcheng Date: Mon, 29 Jan 2024 10:17:27 +0800 Subject: [PATCH 1/6] Support topic_auto_creation --- go.mod | 2 +- go.sum | 4 +- pulsar/resource_pulsar_namespace.go | 76 +++++++++++++++++++++++++++++ pulsar/validate_helpers.go | 9 ++++ 4 files changed, 88 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index bed91e0..b9ad84e 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/streamnative/terraform-provider-pulsar go 1.18 require ( - github.com/apache/pulsar-client-go v0.9.1-0.20230816081803-fbee610ddcbf + github.com/apache/pulsar-client-go v0.12.0 github.com/cenkalti/backoff/v4 v4.1.2 github.com/hashicorp/go-multierror v1.1.1 github.com/hashicorp/terraform-plugin-log v0.4.0 diff --git a/go.sum b/go.sum index ac1b044..4878009 100644 --- a/go.sum +++ b/go.sum @@ -16,8 +16,8 @@ github.com/agext/levenshtein v1.2.1/go.mod h1:JEDfjyjHDjOF/1e4FlBE/PkbqA9OfWu2ki github.com/agext/levenshtein v1.2.2 h1:0S/Yg6LYmFJ5stwQeRp6EeOcCbj7xiqQSdNelsXvaqE= github.com/agext/levenshtein v1.2.2/go.mod h1:JEDfjyjHDjOF/1e4FlBE/PkbqA9OfWu2ki2W0IB5558= github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYUyUczH0OGQWaF5ceTx0UBShxjsH6f8oGKYe2c= -github.com/apache/pulsar-client-go v0.9.1-0.20230816081803-fbee610ddcbf h1:k9hqsKPh5ncKf0e3CkzvBTYXLwCYNYFb1Vtk3qnYAvk= -github.com/apache/pulsar-client-go v0.9.1-0.20230816081803-fbee610ddcbf/go.mod h1:Ea/yiZA7plgiaWRyOuO1B0k5/hjpl1thmiKig+D9PBQ= +github.com/apache/pulsar-client-go v0.12.0 h1:rrMlwpr6IgLRPXLRRh2vSlcw5tGV2PUSjZwmqgh2B2I= +github.com/apache/pulsar-client-go v0.12.0/go.mod h1:dkutuH4oS2pXiGm+Ti7fQZ4MRjrMPZ8IJeEGAWMeckk= github.com/apparentlymart/go-cidr v1.1.0 h1:2mAhrMoF+nhXqxTzSZMUzDHkLjmIHC+Zzn4tdgBZjnU= github.com/apparentlymart/go-cidr v1.1.0/go.mod h1:EBcsNrHc3zQeuaeCeCtQruQm+n9/YjEn/vI25Lg7Gwc= github.com/apparentlymart/go-dump v0.0.0-20180507223929-23540a00eaa3/go.mod h1:oL81AME2rN47vu18xqj1S1jPIPuN7afo62yKTNn3XMM= diff --git a/pulsar/resource_pulsar_namespace.go b/pulsar/resource_pulsar_namespace.go index 8c391bd..0656e02 100644 --- a/pulsar/resource_pulsar_namespace.go +++ b/pulsar/resource_pulsar_namespace.go @@ -230,6 +230,29 @@ func resourcePulsarNamespace() *schema.Resource { }, }, }, + "topic_auto_creation": { + Type: schema.TypeSet, + Optional: true, + MaxItems: 1, + Elem: &schema.Resource{ + Schema: map[string]*schema.Schema{ + "enable": { + Type: schema.TypeBool, + Required: true, + }, + "type": { + Type: schema.TypeString, + Required: false, + ValidateFunc: validatePartitionedTopicType, + }, + "partitions": { + Type: schema.TypeInt, + Required: false, + }, + }, + }, + Set: topicAutoCreationPoliciesToHash, + }, }, } } @@ -411,6 +434,21 @@ func resourcePulsarNamespaceRead(ctx context.Context, d *schema.ResourceData, me setPermissionGrant(d, grants) } + if topicAutoCreation, ok := d.GetOk("topic_auto_creation"); ok && topicAutoCreation.(*schema.Set).Len() > 0 { + autoCreation, err := client.GetTopicAutoCreation(*ns) + if err != nil { + return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetTopicAutoCreation: %w", err)) + } + + _ = d.Set("topic_auto_creation", schema.NewSet(topicAutoCreationPoliciesToHash, []interface{}{ + map[string]interface{}{ + "enable": autoCreation.Allow, + "type": autoCreation.Type, + "partitions": autoCreation.Partitions, + }, + })) + } + return nil } @@ -426,6 +464,7 @@ func resourcePulsarNamespaceUpdate(ctx context.Context, d *schema.ResourceData, dispatchRateConfig := d.Get("dispatch_rate").(*schema.Set) persistencePoliciesConfig := d.Get("persistence_policies").(*schema.Set) permissionGrantConfig := d.Get("permission_grant").(*schema.Set) + topicAutoCreation := d.Get("topic_auto_creation").(*schema.Set) nsName, err := utils.GetNameSpaceName(tenant, namespace) if err != nil { @@ -562,6 +601,17 @@ func resourcePulsarNamespaceUpdate(ctx context.Context, d *schema.ResourceData, } } + if topicAutoCreation.Len() > 0 { + topicAutoCreationPolicy := unmarshalTopicAutoCreation(topicAutoCreation) + if err = client.SetTopicAutoCreation(*nsName, *topicAutoCreationPolicy); err != nil { + errs = multierror.Append(errs, fmt.Errorf("SetTopicAutoCreation: %w", err)) + } + } else { // remove the topicAutoCreation + if err = client.RemoveTopicAutoCreation(*nsName); err != nil { + errs = multierror.Append(errs, fmt.Errorf("RemoveTopicAutoCreation: %w", err)) + } + } + if errs != nil { return diag.FromErr(fmt.Errorf("ERROR_UPDATE_NAMESPACE_CONFIG: %w", errs)) } @@ -591,6 +641,7 @@ func resourcePulsarNamespaceDelete(ctx context.Context, d *schema.ResourceData, _ = d.Set("dispatch_rate", nil) _ = d.Set("persistence_policies", nil) _ = d.Set("permission_grant", nil) + _ = d.Set("topic_auto_creation", nil) return nil } @@ -644,6 +695,17 @@ func persistencePoliciesToHash(v interface{}) int { return hashcode.String(buf.String()) } +func topicAutoCreationPoliciesToHash(v interface{}) int { + var buf bytes.Buffer + m := v.(map[string]interface{}) + + buf.WriteString(fmt.Sprintf("%t-", m["enable"].(bool))) + buf.WriteString(fmt.Sprintf("%s-", m["type"].(string))) + buf.WriteString(fmt.Sprintf("%d-", m["partitions"].(int))) + + return hashcode.String(buf.String()) +} + func unmarshalDispatchRate(v *schema.Set) *utils.DispatchRate { var dispatchRate utils.DispatchRate @@ -710,3 +772,17 @@ func unmarshalPersistencePolicies(v *schema.Set) *utils.PersistencePolicies { return &persPolicies } + +func unmarshalTopicAutoCreation(v *schema.Set) *utils.TopicAutoCreationConfig { + var topicAutoCreation utils.TopicAutoCreationConfig + + for _, policy := range v.List() { + data := policy.(map[string]interface{}) + + topicAutoCreation.Allow = data["allow"].(bool) + topicAutoCreation.Type = utils.TopicType(data["type"].(string)) + topicAutoCreation.Partitions = data["partitions"].(*int) + } + + return &topicAutoCreation +} diff --git a/pulsar/validate_helpers.go b/pulsar/validate_helpers.go index a08f53f..3904eb0 100644 --- a/pulsar/validate_helpers.go +++ b/pulsar/validate_helpers.go @@ -50,3 +50,12 @@ func validateAuthAction(val interface{}, key string) (warns []string, errs []err } return } + +func validatePartitionedTopicType(val interface{}, key string) (warns []string, errs []error) { + v := val.(string) + _, err := utils.ParseTopicType(v) + if err != nil { + errs = append(errs, fmt.Errorf("%q must be a valid topic type (got: %s): %w", key, v, err)) + } + return +} From 6dabcd92007f353daeb15e6365d297de436b61f2 Mon Sep 17 00:00:00 2001 From: jiangpengcheng Date: Mon, 29 Jan 2024 14:08:58 +0800 Subject: [PATCH 2/6] Fix errors --- pulsar/resource_pulsar_namespace.go | 53 ++++++++++++++++++++--------- 1 file changed, 37 insertions(+), 16 deletions(-) diff --git a/pulsar/resource_pulsar_namespace.go b/pulsar/resource_pulsar_namespace.go index 0656e02..d81b18d 100644 --- a/pulsar/resource_pulsar_namespace.go +++ b/pulsar/resource_pulsar_namespace.go @@ -242,12 +242,13 @@ func resourcePulsarNamespace() *schema.Resource { }, "type": { Type: schema.TypeString, - Required: false, + Optional: true, ValidateFunc: validatePartitionedTopicType, + Default: "non-partitioned", }, "partitions": { Type: schema.TypeInt, - Required: false, + Optional: true, }, }, }, @@ -440,13 +441,15 @@ func resourcePulsarNamespaceRead(ctx context.Context, d *schema.ResourceData, me return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetTopicAutoCreation: %w", err)) } - _ = d.Set("topic_auto_creation", schema.NewSet(topicAutoCreationPoliciesToHash, []interface{}{ - map[string]interface{}{ - "enable": autoCreation.Allow, - "type": autoCreation.Type, - "partitions": autoCreation.Partitions, - }, - })) + data := map[string]interface{}{ + "enable": autoCreation.Allow, + "type": autoCreation.Type.String(), + } + if autoCreation.Partitions != nil { + data["partitions"] = *autoCreation.Partitions + } + + _ = d.Set("topic_auto_creation", schema.NewSet(topicAutoCreationPoliciesToHash, []interface{}{data})) } return nil @@ -602,9 +605,13 @@ func resourcePulsarNamespaceUpdate(ctx context.Context, d *schema.ResourceData, } if topicAutoCreation.Len() > 0 { - topicAutoCreationPolicy := unmarshalTopicAutoCreation(topicAutoCreation) - if err = client.SetTopicAutoCreation(*nsName, *topicAutoCreationPolicy); err != nil { + topicAutoCreationPolicy, err := unmarshalTopicAutoCreation(topicAutoCreation) + if err != nil { errs = multierror.Append(errs, fmt.Errorf("SetTopicAutoCreation: %w", err)) + } else { + if err = client.SetTopicAutoCreation(*nsName, *topicAutoCreationPolicy); err != nil { + errs = multierror.Append(errs, fmt.Errorf("SetTopicAutoCreation: %w", err)) + } } } else { // remove the topicAutoCreation if err = client.RemoveTopicAutoCreation(*nsName); err != nil { @@ -701,7 +708,9 @@ func topicAutoCreationPoliciesToHash(v interface{}) int { buf.WriteString(fmt.Sprintf("%t-", m["enable"].(bool))) buf.WriteString(fmt.Sprintf("%s-", m["type"].(string))) - buf.WriteString(fmt.Sprintf("%d-", m["partitions"].(int))) + if m["partitions"] != nil { + buf.WriteString(fmt.Sprintf("%d-", m["partitions"].(int))) + } return hashcode.String(buf.String()) } @@ -773,16 +782,28 @@ func unmarshalPersistencePolicies(v *schema.Set) *utils.PersistencePolicies { return &persPolicies } -func unmarshalTopicAutoCreation(v *schema.Set) *utils.TopicAutoCreationConfig { +func unmarshalTopicAutoCreation(v *schema.Set) (*utils.TopicAutoCreationConfig, error) { var topicAutoCreation utils.TopicAutoCreationConfig for _, policy := range v.List() { data := policy.(map[string]interface{}) - topicAutoCreation.Allow = data["allow"].(bool) + topicAutoCreation.Allow = data["enable"].(bool) topicAutoCreation.Type = utils.TopicType(data["type"].(string)) - topicAutoCreation.Partitions = data["partitions"].(*int) + if topicAutoCreation.Type == utils.Partitioned { + if data["partitions"] == nil { + return nil, fmt.Errorf("ERROR_PARSE_TOPIC_AUTO_CREATION: partitions is required for partitioned topic") + } + partitions := data["partitions"].(int) + topicAutoCreation.Partitions = &partitions + } else if topicAutoCreation.Type == utils.NonPartitioned { + if data["partitions"] != nil { + return nil, fmt.Errorf("ERROR_PARSE_TOPIC_AUTO_CREATION: partitions is not allowed for non-partitioned topic") + } + } else { + return nil, fmt.Errorf("ERROR_PARSE_TOPIC_AUTO_CREATION: unknown topic type %s", topicAutoCreation.Type) + } } - return &topicAutoCreation + return &topicAutoCreation, nil } From dc9cf43605e0ff1aaa702b71fb6152433f65d5ec Mon Sep 17 00:00:00 2001 From: jiangpengcheng Date: Mon, 29 Jan 2024 14:18:12 +0800 Subject: [PATCH 3/6] Fix errors --- pulsar/resource_pulsar_namespace.go | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/pulsar/resource_pulsar_namespace.go b/pulsar/resource_pulsar_namespace.go index d81b18d..e06f919 100644 --- a/pulsar/resource_pulsar_namespace.go +++ b/pulsar/resource_pulsar_namespace.go @@ -791,16 +791,9 @@ func unmarshalTopicAutoCreation(v *schema.Set) (*utils.TopicAutoCreationConfig, topicAutoCreation.Allow = data["enable"].(bool) topicAutoCreation.Type = utils.TopicType(data["type"].(string)) if topicAutoCreation.Type == utils.Partitioned { - if data["partitions"] == nil { - return nil, fmt.Errorf("ERROR_PARSE_TOPIC_AUTO_CREATION: partitions is required for partitioned topic") - } partitions := data["partitions"].(int) topicAutoCreation.Partitions = &partitions - } else if topicAutoCreation.Type == utils.NonPartitioned { - if data["partitions"] != nil { - return nil, fmt.Errorf("ERROR_PARSE_TOPIC_AUTO_CREATION: partitions is not allowed for non-partitioned topic") - } - } else { + } else if topicAutoCreation.Type != utils.NonPartitioned { return nil, fmt.Errorf("ERROR_PARSE_TOPIC_AUTO_CREATION: unknown topic type %s", topicAutoCreation.Type) } } From d3bd080641c961075e59b0e2b0456e06ca9c3903 Mon Sep 17 00:00:00 2001 From: jiangpengcheng Date: Mon, 29 Jan 2024 15:11:03 +0800 Subject: [PATCH 4/6] Add testcase --- pulsar/resource_pulsar_namespace.go | 3 + pulsar/resource_pulsar_namespace_test.go | 110 +++++++++++++++++++++++ 2 files changed, 113 insertions(+) diff --git a/pulsar/resource_pulsar_namespace.go b/pulsar/resource_pulsar_namespace.go index e06f919..54a6e83 100644 --- a/pulsar/resource_pulsar_namespace.go +++ b/pulsar/resource_pulsar_namespace.go @@ -792,6 +792,9 @@ func unmarshalTopicAutoCreation(v *schema.Set) (*utils.TopicAutoCreationConfig, topicAutoCreation.Type = utils.TopicType(data["type"].(string)) if topicAutoCreation.Type == utils.Partitioned { partitions := data["partitions"].(int) + if partitions <= 0 { + return nil, fmt.Errorf("ERROR_PARSE_TOPIC_AUTO_CREATION: partitions must be greater than 0") + } topicAutoCreation.Partitions = &partitions } else if topicAutoCreation.Type != utils.NonPartitioned { return nil, fmt.Errorf("ERROR_PARSE_TOPIC_AUTO_CREATION: unknown topic type %s", topicAutoCreation.Type) diff --git a/pulsar/resource_pulsar_namespace_test.go b/pulsar/resource_pulsar_namespace_test.go index 7afac7a..193e09c 100644 --- a/pulsar/resource_pulsar_namespace_test.go +++ b/pulsar/resource_pulsar_namespace_test.go @@ -134,6 +134,10 @@ func TestNamespaceWithUpdate(t *testing.T) { resource.TestCheckResourceAttr(resourceName, "permission_grant.1.actions.#", "2"), resource.TestCheckResourceAttr(resourceName, "permission_grant.1.actions.0", "consume"), resource.TestCheckResourceAttr(resourceName, "permission_grant.1.actions.1", "produce"), + resource.TestCheckResourceAttr(resourceName, "topic_auto_creation.#", "1"), + resource.TestCheckResourceAttr(resourceName, "topic_auto_creation.0.enable", "true"), + resource.TestCheckResourceAttr(resourceName, "topic_auto_creation.0.type", "partitioned"), + resource.TestCheckResourceAttr(resourceName, "topic_auto_creation.0.partitions", "3"), ), }, }, @@ -245,6 +249,75 @@ func TestNamespaceWithPermissionGrantUpdate(t *testing.T) { }) } +func TestNamespaceWithTopicAutoCreationUpdate(t *testing.T) { + + resourceName := "pulsar_namespace.test" + cName := acctest.RandString(10) + tName := acctest.RandString(10) + nsName := acctest.RandString(10) + + resource.Test(t, resource.TestCase{ + PreCheck: func() { testAccPreCheck(t) }, + ProviderFactories: testAccProviderFactories, + IDRefreshName: resourceName, + CheckDestroy: testPulsarNamespaceDestroy, + Steps: []resource.TestStep{ + { + Config: testPulsarNamespaceWithoutOptionals(testWebServiceURL, cName, tName, nsName), + Check: resource.ComposeTestCheckFunc( + testPulsarNamespaceExists(resourceName), + resource.TestCheckNoResourceAttr(resourceName, "topic_auto_creation.#"), + ), + }, + { + Config: testPulsarNamespaceWithTopicAutoCreation(testWebServiceURL, cName, tName, nsName, + `topic_auto_creation { + enable = "false" + }`), + Check: resource.ComposeTestCheckFunc( + testPulsarNamespaceExists(resourceName), + resource.TestCheckResourceAttr(resourceName, "topic_auto_creation.#", "1"), + resource.TestCheckResourceAttr(resourceName, "topic_auto_creation.0.enable", "false"), + ), + }, + { + Config: testPulsarNamespaceWithTopicAutoCreation(testWebServiceURL, cName, tName, nsName, + `topic_auto_creation { + enable = "true" + }`), + Check: resource.ComposeTestCheckFunc( + testPulsarNamespaceExists(resourceName), + resource.TestCheckResourceAttr(resourceName, "topic_auto_creation.#", "1"), + resource.TestCheckResourceAttr(resourceName, "topic_auto_creation.0.enable", "true"), + resource.TestCheckResourceAttr(resourceName, "topic_auto_creation.0.type", "non-partitioned"), + ), + }, + { + Config: testPulsarNamespaceWithTopicAutoCreation(testWebServiceURL, cName, tName, nsName, + `topic_auto_creation { + enable = "true" + type = "partitioned" + partitions = 3 + }`), + Check: resource.ComposeTestCheckFunc( + testPulsarNamespaceExists(resourceName), + resource.TestCheckResourceAttr(resourceName, "topic_auto_creation.#", "1"), + resource.TestCheckResourceAttr(resourceName, "topic_auto_creation.0.enable", "true"), + resource.TestCheckResourceAttr(resourceName, "topic_auto_creation.0.type", "partitioned"), + resource.TestCheckResourceAttr(resourceName, "topic_auto_creation.0.partitions", "3"), + ), + }, + { + Config: testPulsarNamespaceWithoutOptionals(testWebServiceURL, cName, tName, nsName), + Check: resource.ComposeTestCheckFunc( + testPulsarNamespaceExists(resourceName), + resource.TestCheckNoResourceAttr(resourceName, "topic_auto_creation.#"), + ), + }, + }, + }) +} + func TestImportExistingNamespace(t *testing.T) { tname := "public" ns := acctest.RandString(10) @@ -462,6 +535,12 @@ resource "pulsar_namespace" "test" { role = "some-role-2" actions = ["produce", "consume"] } + + topic_auto_creation { + enable = true + type = "partitioned" + partitions = 3 + } } `, wsURL, cluster, tenant, ns) } @@ -544,3 +623,34 @@ resource "pulsar_namespace" "test" { } `, wsURL, cluster, tenant, ns, permissionGrants) } + +func testPulsarNamespaceWithTopicAutoCreation(wsURL, cluster, tenant, ns string, topicAutoCreation string) string { + return fmt.Sprintf(` +provider "pulsar" { + web_service_url = "%s" +} + +resource "pulsar_cluster" "test_cluster" { + cluster = "%s" + + cluster_data { + web_service_url = "http://localhost:8080" + broker_service_url = "http://localhost:6050" + peer_clusters = ["standalone"] + } + +} + +resource "pulsar_tenant" "test_tenant" { + tenant = "%s" + allowed_clusters = [pulsar_cluster.test_cluster.cluster, "standalone"] +} + +resource "pulsar_namespace" "test" { + tenant = pulsar_tenant.test_tenant.tenant + namespace = "%s" + + %s +} +`, wsURL, cluster, tenant, ns, topicAutoCreation) +} From 7585613b840a8dbb99e0ec56f3e530d7a4b6479b Mon Sep 17 00:00:00 2001 From: jiangpengcheng Date: Mon, 29 Jan 2024 15:35:39 +0800 Subject: [PATCH 5/6] Add document --- docs/resources/namespace.md | 12 ++++++++++++ examples/namespaces/main.tf | 6 ++++++ 2 files changed, 18 insertions(+) diff --git a/docs/resources/namespace.md b/docs/resources/namespace.md index 04c77a9..d19f28b 100644 --- a/docs/resources/namespace.md +++ b/docs/resources/namespace.md @@ -27,6 +27,7 @@ description: |- - `permission_grant` (Block Set) (see [below for nested schema](#nestedblock--permission_grant)) - `persistence_policies` (Block Set, Max: 1) (see [below for nested schema](#nestedblock--persistence_policies)) - `retention_policies` (Block Set, Max: 1) (see [below for nested schema](#nestedblock--retention_policies)) +- `topic_auto_creation` (Block Set, Max: 1) (see [below for nested schema](#nestedblock--topic_auto_creation)) ### Read-Only @@ -98,4 +99,15 @@ Required: - `retention_minutes` (String) - `retention_size_in_mb` (String) + +### Nested Schema for `topic_auto_creation` + +Required: + +- `enable` (Boolean) + +Optional: + +- `type` (String) +- `partitions` (Number) diff --git a/examples/namespaces/main.tf b/examples/namespaces/main.tf index e3a6768..4d45735 100644 --- a/examples/namespaces/main.tf +++ b/examples/namespaces/main.tf @@ -73,4 +73,10 @@ resource "pulsar_namespace" "test" { limit_bytes = "10000000000" policy = "producer_request_hold" } + + topic_auto_creation { + enable = true + type = "partitioned" + partitions = 3 + } } From 1f8be19ab31d998f58fc3b04ec7957f13c3c6126 Mon Sep 17 00:00:00 2001 From: jiangpengcheng Date: Mon, 29 Jan 2024 15:50:19 +0800 Subject: [PATCH 6/6] Fix tests --- pulsar/resource_pulsar_namespace_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar/resource_pulsar_namespace_test.go b/pulsar/resource_pulsar_namespace_test.go index 193e09c..9260157 100644 --- a/pulsar/resource_pulsar_namespace_test.go +++ b/pulsar/resource_pulsar_namespace_test.go @@ -395,8 +395,8 @@ func testNamespaceImported() resource.ImportStateCheckFunc { return fmt.Errorf("expected %d states, got %d: %#v", 1, len(s), s) } - if len(s[0].Attributes) != 10 { - return fmt.Errorf("expected %d attrs, got %d: %#v", 10, len(s[0].Attributes), s[0].Attributes) + if len(s[0].Attributes) != 11 { + return fmt.Errorf("expected %d attrs, got %d: %#v", 11, len(s[0].Attributes), s[0].Attributes) } return nil