Skip to content

Commit

Permalink
feat: Add topic auto creation policy for namespace resource (#117)
Browse files Browse the repository at this point in the history
* Support topic_auto_creation

* Fix errors

* Fix errors

* Add testcase

* Add document

* Fix tests
  • Loading branch information
jiangpengcheng authored Feb 1, 2024
1 parent 60223d9 commit 3e0e204
Show file tree
Hide file tree
Showing 7 changed files with 235 additions and 5 deletions.
12 changes: 12 additions & 0 deletions docs/resources/namespace.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ description: |-
- `permission_grant` (Block Set) (see [below for nested schema](#nestedblock--permission_grant))
- `persistence_policies` (Block Set, Max: 1) (see [below for nested schema](#nestedblock--persistence_policies))
- `retention_policies` (Block Set, Max: 1) (see [below for nested schema](#nestedblock--retention_policies))
- `topic_auto_creation` (Block Set, Max: 1) (see [below for nested schema](#nestedblock--topic_auto_creation))

### Read-Only

Expand Down Expand Up @@ -98,4 +99,15 @@ Required:
- `retention_minutes` (String)
- `retention_size_in_mb` (String)

<a id="nestedblock--topic_auto_creation"></a>

### Nested Schema for `topic_auto_creation`

Required:

- `enable` (Boolean)

Optional:

- `type` (String)
- `partitions` (Number)
6 changes: 6 additions & 0 deletions examples/namespaces/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,10 @@ resource "pulsar_namespace" "test" {
limit_bytes = "10000000000"
policy = "producer_request_hold"
}

topic_auto_creation {
enable = true
type = "partitioned"
partitions = 3
}
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/streamnative/terraform-provider-pulsar
go 1.18

require (
github.com/apache/pulsar-client-go v0.9.1-0.20230816081803-fbee610ddcbf
github.com/apache/pulsar-client-go v0.12.0
github.com/cenkalti/backoff/v4 v4.1.2
github.com/hashicorp/go-multierror v1.1.1
github.com/hashicorp/terraform-plugin-log v0.4.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ github.com/agext/levenshtein v1.2.1/go.mod h1:JEDfjyjHDjOF/1e4FlBE/PkbqA9OfWu2ki
github.com/agext/levenshtein v1.2.2 h1:0S/Yg6LYmFJ5stwQeRp6EeOcCbj7xiqQSdNelsXvaqE=
github.com/agext/levenshtein v1.2.2/go.mod h1:JEDfjyjHDjOF/1e4FlBE/PkbqA9OfWu2ki2W0IB5558=
github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYUyUczH0OGQWaF5ceTx0UBShxjsH6f8oGKYe2c=
github.com/apache/pulsar-client-go v0.9.1-0.20230816081803-fbee610ddcbf h1:k9hqsKPh5ncKf0e3CkzvBTYXLwCYNYFb1Vtk3qnYAvk=
github.com/apache/pulsar-client-go v0.9.1-0.20230816081803-fbee610ddcbf/go.mod h1:Ea/yiZA7plgiaWRyOuO1B0k5/hjpl1thmiKig+D9PBQ=
github.com/apache/pulsar-client-go v0.12.0 h1:rrMlwpr6IgLRPXLRRh2vSlcw5tGV2PUSjZwmqgh2B2I=
github.com/apache/pulsar-client-go v0.12.0/go.mod h1:dkutuH4oS2pXiGm+Ti7fQZ4MRjrMPZ8IJeEGAWMeckk=
github.com/apparentlymart/go-cidr v1.1.0 h1:2mAhrMoF+nhXqxTzSZMUzDHkLjmIHC+Zzn4tdgBZjnU=
github.com/apparentlymart/go-cidr v1.1.0/go.mod h1:EBcsNrHc3zQeuaeCeCtQruQm+n9/YjEn/vI25Lg7Gwc=
github.com/apparentlymart/go-dump v0.0.0-20180507223929-23540a00eaa3/go.mod h1:oL81AME2rN47vu18xqj1S1jPIPuN7afo62yKTNn3XMM=
Expand Down
93 changes: 93 additions & 0 deletions pulsar/resource_pulsar_namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,30 @@ func resourcePulsarNamespace() *schema.Resource {
},
},
},
"topic_auto_creation": {
Type: schema.TypeSet,
Optional: true,
MaxItems: 1,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"enable": {
Type: schema.TypeBool,
Required: true,
},
"type": {
Type: schema.TypeString,
Optional: true,
ValidateFunc: validatePartitionedTopicType,
Default: "non-partitioned",
},
"partitions": {
Type: schema.TypeInt,
Optional: true,
},
},
},
Set: topicAutoCreationPoliciesToHash,
},
},
}
}
Expand Down Expand Up @@ -411,6 +435,23 @@ func resourcePulsarNamespaceRead(ctx context.Context, d *schema.ResourceData, me
setPermissionGrant(d, grants)
}

if topicAutoCreation, ok := d.GetOk("topic_auto_creation"); ok && topicAutoCreation.(*schema.Set).Len() > 0 {
autoCreation, err := client.GetTopicAutoCreation(*ns)
if err != nil {
return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetTopicAutoCreation: %w", err))
}

data := map[string]interface{}{
"enable": autoCreation.Allow,
"type": autoCreation.Type.String(),
}
if autoCreation.Partitions != nil {
data["partitions"] = *autoCreation.Partitions
}

_ = d.Set("topic_auto_creation", schema.NewSet(topicAutoCreationPoliciesToHash, []interface{}{data}))
}

return nil
}

Expand All @@ -426,6 +467,7 @@ func resourcePulsarNamespaceUpdate(ctx context.Context, d *schema.ResourceData,
dispatchRateConfig := d.Get("dispatch_rate").(*schema.Set)
persistencePoliciesConfig := d.Get("persistence_policies").(*schema.Set)
permissionGrantConfig := d.Get("permission_grant").(*schema.Set)
topicAutoCreation := d.Get("topic_auto_creation").(*schema.Set)

nsName, err := utils.GetNameSpaceName(tenant, namespace)
if err != nil {
Expand Down Expand Up @@ -562,6 +604,21 @@ func resourcePulsarNamespaceUpdate(ctx context.Context, d *schema.ResourceData,
}
}

if topicAutoCreation.Len() > 0 {
topicAutoCreationPolicy, err := unmarshalTopicAutoCreation(topicAutoCreation)
if err != nil {
errs = multierror.Append(errs, fmt.Errorf("SetTopicAutoCreation: %w", err))
} else {
if err = client.SetTopicAutoCreation(*nsName, *topicAutoCreationPolicy); err != nil {
errs = multierror.Append(errs, fmt.Errorf("SetTopicAutoCreation: %w", err))
}
}
} else { // remove the topicAutoCreation
if err = client.RemoveTopicAutoCreation(*nsName); err != nil {
errs = multierror.Append(errs, fmt.Errorf("RemoveTopicAutoCreation: %w", err))
}
}

if errs != nil {
return diag.FromErr(fmt.Errorf("ERROR_UPDATE_NAMESPACE_CONFIG: %w", errs))
}
Expand Down Expand Up @@ -591,6 +648,7 @@ func resourcePulsarNamespaceDelete(ctx context.Context, d *schema.ResourceData,
_ = d.Set("dispatch_rate", nil)
_ = d.Set("persistence_policies", nil)
_ = d.Set("permission_grant", nil)
_ = d.Set("topic_auto_creation", nil)

return nil
}
Expand Down Expand Up @@ -644,6 +702,19 @@ func persistencePoliciesToHash(v interface{}) int {
return hashcode.String(buf.String())
}

func topicAutoCreationPoliciesToHash(v interface{}) int {
var buf bytes.Buffer
m := v.(map[string]interface{})

buf.WriteString(fmt.Sprintf("%t-", m["enable"].(bool)))
buf.WriteString(fmt.Sprintf("%s-", m["type"].(string)))
if m["partitions"] != nil {
buf.WriteString(fmt.Sprintf("%d-", m["partitions"].(int)))
}

return hashcode.String(buf.String())
}

func unmarshalDispatchRate(v *schema.Set) *utils.DispatchRate {
var dispatchRate utils.DispatchRate

Expand Down Expand Up @@ -710,3 +781,25 @@ func unmarshalPersistencePolicies(v *schema.Set) *utils.PersistencePolicies {

return &persPolicies
}

func unmarshalTopicAutoCreation(v *schema.Set) (*utils.TopicAutoCreationConfig, error) {
var topicAutoCreation utils.TopicAutoCreationConfig

for _, policy := range v.List() {
data := policy.(map[string]interface{})

topicAutoCreation.Allow = data["enable"].(bool)
topicAutoCreation.Type = utils.TopicType(data["type"].(string))
if topicAutoCreation.Type == utils.Partitioned {
partitions := data["partitions"].(int)
if partitions <= 0 {
return nil, fmt.Errorf("ERROR_PARSE_TOPIC_AUTO_CREATION: partitions must be greater than 0")
}
topicAutoCreation.Partitions = &partitions
} else if topicAutoCreation.Type != utils.NonPartitioned {
return nil, fmt.Errorf("ERROR_PARSE_TOPIC_AUTO_CREATION: unknown topic type %s", topicAutoCreation.Type)
}
}

return &topicAutoCreation, nil
}
114 changes: 112 additions & 2 deletions pulsar/resource_pulsar_namespace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,10 @@ func TestNamespaceWithUpdate(t *testing.T) {
resource.TestCheckResourceAttr(resourceName, "permission_grant.1.actions.#", "2"),
resource.TestCheckResourceAttr(resourceName, "permission_grant.1.actions.0", "consume"),
resource.TestCheckResourceAttr(resourceName, "permission_grant.1.actions.1", "produce"),
resource.TestCheckResourceAttr(resourceName, "topic_auto_creation.#", "1"),
resource.TestCheckResourceAttr(resourceName, "topic_auto_creation.0.enable", "true"),
resource.TestCheckResourceAttr(resourceName, "topic_auto_creation.0.type", "partitioned"),
resource.TestCheckResourceAttr(resourceName, "topic_auto_creation.0.partitions", "3"),
),
},
},
Expand Down Expand Up @@ -245,6 +249,75 @@ func TestNamespaceWithPermissionGrantUpdate(t *testing.T) {
})
}

func TestNamespaceWithTopicAutoCreationUpdate(t *testing.T) {

resourceName := "pulsar_namespace.test"
cName := acctest.RandString(10)
tName := acctest.RandString(10)
nsName := acctest.RandString(10)

resource.Test(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
ProviderFactories: testAccProviderFactories,
IDRefreshName: resourceName,
CheckDestroy: testPulsarNamespaceDestroy,
Steps: []resource.TestStep{
{
Config: testPulsarNamespaceWithoutOptionals(testWebServiceURL, cName, tName, nsName),
Check: resource.ComposeTestCheckFunc(
testPulsarNamespaceExists(resourceName),
resource.TestCheckNoResourceAttr(resourceName, "topic_auto_creation.#"),
),
},
{
Config: testPulsarNamespaceWithTopicAutoCreation(testWebServiceURL, cName, tName, nsName,
`topic_auto_creation {
enable = "false"
}`),
Check: resource.ComposeTestCheckFunc(
testPulsarNamespaceExists(resourceName),
resource.TestCheckResourceAttr(resourceName, "topic_auto_creation.#", "1"),
resource.TestCheckResourceAttr(resourceName, "topic_auto_creation.0.enable", "false"),
),
},
{
Config: testPulsarNamespaceWithTopicAutoCreation(testWebServiceURL, cName, tName, nsName,
`topic_auto_creation {
enable = "true"
}`),
Check: resource.ComposeTestCheckFunc(
testPulsarNamespaceExists(resourceName),
resource.TestCheckResourceAttr(resourceName, "topic_auto_creation.#", "1"),
resource.TestCheckResourceAttr(resourceName, "topic_auto_creation.0.enable", "true"),
resource.TestCheckResourceAttr(resourceName, "topic_auto_creation.0.type", "non-partitioned"),
),
},
{
Config: testPulsarNamespaceWithTopicAutoCreation(testWebServiceURL, cName, tName, nsName,
`topic_auto_creation {
enable = "true"
type = "partitioned"
partitions = 3
}`),
Check: resource.ComposeTestCheckFunc(
testPulsarNamespaceExists(resourceName),
resource.TestCheckResourceAttr(resourceName, "topic_auto_creation.#", "1"),
resource.TestCheckResourceAttr(resourceName, "topic_auto_creation.0.enable", "true"),
resource.TestCheckResourceAttr(resourceName, "topic_auto_creation.0.type", "partitioned"),
resource.TestCheckResourceAttr(resourceName, "topic_auto_creation.0.partitions", "3"),
),
},
{
Config: testPulsarNamespaceWithoutOptionals(testWebServiceURL, cName, tName, nsName),
Check: resource.ComposeTestCheckFunc(
testPulsarNamespaceExists(resourceName),
resource.TestCheckNoResourceAttr(resourceName, "topic_auto_creation.#"),
),
},
},
})
}

func TestImportExistingNamespace(t *testing.T) {
tname := "public"
ns := acctest.RandString(10)
Expand Down Expand Up @@ -322,8 +395,8 @@ func testNamespaceImported() resource.ImportStateCheckFunc {
return fmt.Errorf("expected %d states, got %d: %#v", 1, len(s), s)
}

if len(s[0].Attributes) != 10 {
return fmt.Errorf("expected %d attrs, got %d: %#v", 10, len(s[0].Attributes), s[0].Attributes)
if len(s[0].Attributes) != 11 {
return fmt.Errorf("expected %d attrs, got %d: %#v", 11, len(s[0].Attributes), s[0].Attributes)
}

return nil
Expand Down Expand Up @@ -462,6 +535,12 @@ resource "pulsar_namespace" "test" {
role = "some-role-2"
actions = ["produce", "consume"]
}
topic_auto_creation {
enable = true
type = "partitioned"
partitions = 3
}
}
`, wsURL, cluster, tenant, ns)
}
Expand Down Expand Up @@ -544,3 +623,34 @@ resource "pulsar_namespace" "test" {
}
`, wsURL, cluster, tenant, ns, permissionGrants)
}

func testPulsarNamespaceWithTopicAutoCreation(wsURL, cluster, tenant, ns string, topicAutoCreation string) string {
return fmt.Sprintf(`
provider "pulsar" {
web_service_url = "%s"
}
resource "pulsar_cluster" "test_cluster" {
cluster = "%s"
cluster_data {
web_service_url = "http://localhost:8080"
broker_service_url = "http://localhost:6050"
peer_clusters = ["standalone"]
}
}
resource "pulsar_tenant" "test_tenant" {
tenant = "%s"
allowed_clusters = [pulsar_cluster.test_cluster.cluster, "standalone"]
}
resource "pulsar_namespace" "test" {
tenant = pulsar_tenant.test_tenant.tenant
namespace = "%s"
%s
}
`, wsURL, cluster, tenant, ns, topicAutoCreation)
}
9 changes: 9 additions & 0 deletions pulsar/validate_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,12 @@ func validateAuthAction(val interface{}, key string) (warns []string, errs []err
}
return
}

func validatePartitionedTopicType(val interface{}, key string) (warns []string, errs []error) {
v := val.(string)
_, err := utils.ParseTopicType(v)
if err != nil {
errs = append(errs, fmt.Errorf("%q must be a valid topic type (got: %s): %w", key, v, err))
}
return
}

0 comments on commit 3e0e204

Please sign in to comment.