Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bug: fix sink retain ordering and ignore custom runtime options server set vals #112

Merged
merged 10 commits into from
Jan 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,4 @@ gen
.DS_Store

.vscode/
.terraform.lock.hcl
43 changes: 37 additions & 6 deletions pulsar/resource_pulsar_function.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,31 @@ func resourcePulsarFunction() *schema.Resource {
resourceFunctionSubscriptionPositionKey: {
Type: schema.TypeString,
Optional: true,
Computed: true,
Description: resourceFunctionDescriptions[resourceFunctionSubscriptionPositionKey],
ValidateFunc: func(val interface{}, key string) ([]string, []error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems can use a common method for functions/sins both

v := val.(string)
subscriptionPositionSupported := []string{
SubscriptionPositionEarliest,
SubscriptionPositionLatest,
}

found := false
for _, item := range subscriptionPositionSupported {
if v == item {
found = true
break
}
}
if !found {
return nil, []error{
fmt.Errorf("%s is unsupported, shold be one of %s", v,
strings.Join(subscriptionPositionSupported, ",")),
}
}

return nil, nil
},
},
resourceFunctionCleanupSubscriptionKey: {
Type: schema.TypeBool,
Expand Down Expand Up @@ -327,19 +351,19 @@ func resourcePulsarFunction() *schema.Resource {
resourceFunctionCPUKey: {
Type: schema.TypeFloat,
Optional: true,
Default: 0.5,
Computed: true,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why the default value is removed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the resources will be returned by worker service apis and it may changed by the worker service, so it should be marked as computed to prevent the resources been frequently update.

Description: resourceFunctionDescriptions[resourceFunctionCPUKey],
},
resourceFunctionRAMKey: {
Type: schema.TypeInt,
Optional: true,
Default: 128,
Computed: true,
Description: resourceFunctionDescriptions[resourceFunctionRAMKey],
},
resourceFunctionDiskKey: {
Type: schema.TypeInt,
Optional: true,
Default: 128,
Computed: true,
Description: resourceFunctionDescriptions[resourceFunctionDiskKey],
},
resourceFunctionUserConfig: {
Expand Down Expand Up @@ -822,9 +846,16 @@ func unmarshalFunctionConfig(functionConfig utils.FunctionConfig, d *schema.Reso
}

if functionConfig.CustomRuntimeOptions != "" {
err = d.Set(resourceFunctionCustomRuntimeOptionsKey, functionConfig.CustomRuntimeOptions)
if err != nil {
return err
orig, ok := d.GetOk(resourceFunctionCustomRuntimeOptionsKey)
if ok {
s, err := ignoreServerSetCustomRuntimeOptions(orig.(string), functionConfig.CustomRuntimeOptions)
if err != nil {
return err
}
err = d.Set(resourceFunctionCustomRuntimeOptionsKey, s)
if err != nil {
return err
}
}
}

Expand Down
71 changes: 58 additions & 13 deletions pulsar/resource_pulsar_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,31 @@ func resourcePulsarSink() *schema.Resource {
resourceSinkSubscriptionPositionKey: {
Type: schema.TypeString,
Optional: true,
Default: "Earliest",
Default: SubscriptionPositionEarliest,
Description: resourceSinkDescriptions[resourceSinkSubscriptionPositionKey],
ValidateFunc: func(val interface{}, key string) ([]string, []error) {
v := val.(string)
subscriptionPositionSupported := []string{
SubscriptionPositionEarliest,
SubscriptionPositionLatest,
}

found := false
for _, item := range subscriptionPositionSupported {
if v == item {
found = true
break
}
}
if !found {
return nil, []error{
fmt.Errorf("%s is unsupported, shold be one of %s", v,
strings.Join(subscriptionPositionSupported, ",")),
}
}

return nil, nil
},
},
resourceSinkCustomSerdeInputsKey: {
Type: schema.TypeMap,
Expand Down Expand Up @@ -260,19 +283,19 @@ func resourcePulsarSink() *schema.Resource {
resourceSinkCPUKey: {
Type: schema.TypeFloat,
Optional: true,
Default: utils.NewDefaultResources().CPU,
Computed: true,
Description: resourceSinkDescriptions[resourceSinkCPUKey],
},
resourceSinkRAMKey: {
Type: schema.TypeInt,
Optional: true,
Default: int(bytesize.FormBytes(uint64(utils.NewDefaultResources().RAM)).ToMegaBytes()),
Computed: true,
Description: resourceSinkDescriptions[resourceSinkRAMKey],
},
resourceSinkDiskKey: {
Type: schema.TypeInt,
Optional: true,
Default: int(bytesize.FormBytes(uint64(utils.NewDefaultResources().Disk)).ToMegaBytes()),
Computed: true,
Description: resourceSinkDescriptions[resourceSinkDiskKey],
},
resourceSinkConfigsKey: {
Expand All @@ -294,6 +317,7 @@ func resourcePulsarSink() *schema.Resource {
resourceSinkCustomRuntimeOptionsKey: {
Type: schema.TypeString,
Optional: true,
Computed: true,
Description: resourceSinkDescriptions[resourceSinkCustomRuntimeOptionsKey],
ValidateFunc: jsonValidateFunc,
},
Expand Down Expand Up @@ -353,11 +377,6 @@ func resourcePulsarSinkCreate(ctx context.Context, d *schema.ResourceData, meta
}

func resourcePulsarSinkRead(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
// NOTE: Pulsar cannot returns the fields correctly, so ignore these:
// - resourceSinkSubscriptionPositionKey
// - resourceSinkProcessingGuaranteesKey
// - resourceSinkRetainOrderingKey

client := meta.(admin.Client).Sinks()

tenant := d.Get(resourceSinkTenantKey).(string)
Expand Down Expand Up @@ -500,10 +519,17 @@ func resourcePulsarSinkRead(ctx context.Context, d *schema.ResourceData, meta in
}
}

if len(sinkConfig.CustomRuntimeOptions) != 0 {
err = d.Set(resourceSinkCustomRuntimeOptionsKey, sinkConfig.CustomRuntimeOptions)
if err != nil {
return diag.FromErr(err)
if sinkConfig.CustomRuntimeOptions != "" {
orig, ok := d.GetOk(resourceSinkCustomRuntimeOptionsKey)
if ok {
s, err := ignoreServerSetCustomRuntimeOptions(orig.(string), sinkConfig.CustomRuntimeOptions)
if err != nil {
return diag.FromErr(err)
}
err = d.Set(resourceSinkCustomRuntimeOptionsKey, s)
if err != nil {
return diag.FromErr(err)
}
}
}

Expand Down Expand Up @@ -547,6 +573,25 @@ func resourcePulsarSinkRead(ctx context.Context, d *schema.ResourceData, meta in
}
}

err = d.Set(resourceSinkRetainOrderingKey, sinkConfig.RetainOrdering)
if err != nil {
return diag.FromErr(err)
}

if sinkConfig.ProcessingGuarantees != "" {
err = d.Set(resourceSinkProcessingGuaranteesKey, sinkConfig.ProcessingGuarantees)
if err != nil {
return diag.FromErr(err)
}
}

if sinkConfig.SourceSubscriptionPosition != "" {
err = d.Set(resourceSinkSubscriptionPositionKey, sinkConfig.SourceSubscriptionPosition)
if err != nil {
return diag.FromErr(err)
}
}

return nil
}

Expand Down
58 changes: 53 additions & 5 deletions pulsar/resource_pulsar_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package pulsar
import (
"encoding/json"
"fmt"
"io/ioutil"
"os"
"strings"
"testing"

Expand All @@ -44,7 +44,7 @@ func init() {
}

func TestSink(t *testing.T) {
configBytes, err := ioutil.ReadFile("testdata/sink/main.tf")
configBytes, err := os.ReadFile("testdata/sink/main.tf")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -142,8 +142,8 @@ func testSinkImported() resource.ImportStateCheckFunc {
return fmt.Errorf("expected %d states, got %d: %#v", 1, len(s), s)
}

if len(s[0].Attributes) != 27 {
return fmt.Errorf("expected %d attrs, got %d: %#v", 24, len(s[0].Attributes), s[0].Attributes)
if len(s[0].Attributes) != 30 {
return fmt.Errorf("expected %d attrs, got %d: %#v", 30, len(s[0].Attributes), s[0].Attributes)
}

return nil
Expand Down Expand Up @@ -173,7 +173,7 @@ func createSampleSink(name string) error {

config := &utils.SinkConfig{
CleanupSubscription: false,
RetainOrdering: false,
RetainOrdering: true,
AutoAck: true,
Parallelism: 1,
Tenant: "public",
Expand Down Expand Up @@ -223,6 +223,7 @@ resource "pulsar_sink" "test" {
max_redeliver_count = 5
negative_ack_redelivery_delay_ms = 3000
retain_key_ordering = false
retain_ordering = true
secrets ="{\"SECRET1\": {\"path\": \"sectest\", \"key\": \"hello\"}}"

processing_guarantees = "EFFECTIVELY_ONCE"
Expand All @@ -236,3 +237,50 @@ resource "pulsar_sink" "test" {
}
`, name, testdataArchive)
}

func TestSinkUpdate(t *testing.T) {
configBytes, err := os.ReadFile("testdata/sink/main.tf")
if err != nil {
t.Fatal(err)
}
configString := string(configBytes)
configString = strings.ReplaceAll(configString, "sink-1", "update-sink-test-1")

resource.Test(t, resource.TestCase{
PreCheck: func() { testAccPreCheckWithAPIVersion(t, config.V3) },
ProviderFactories: testAccProviderFactories,
PreventPostDestroyRefresh: false,
CheckDestroy: testPulsarSinkDestroy,
Steps: []resource.TestStep{
{
Config: configString,
Check: resource.ComposeTestCheckFunc(func(s *terraform.State) error {
name := "pulsar_sink.update-sink-test-1"
rs, ok := s.RootModule().Resources[name]
if !ok {
return fmt.Errorf("%s not be found", name)
}

client := getClientFromMeta(testAccProvider.Meta()).Sinks()

parts := strings.Split(rs.Primary.ID, "/")
if len(parts) != 3 {
return errors.New("resource id should be tenant/namespace/name format")
}

_, err := client.GetSink(parts[0], parts[1], parts[2])
if err != nil {
return err
}

return nil
}),
},
{
Config: configString,
PlanOnly: true,
ExpectNonEmptyPlan: false,
},
},
})
}
21 changes: 14 additions & 7 deletions pulsar/resource_pulsar_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,20 +203,20 @@ func resourcePulsarSource() *schema.Resource {
resourceSourceCPUKey: {
Type: schema.TypeFloat,
Optional: true,
Computed: true,
Description: resourceSourceDescriptions[resourceSourceCPUKey],
Default: utils.NewDefaultResources().CPU,
},
resourceSourceRAMKey: {
Type: schema.TypeInt,
Optional: true,
Computed: true,
Description: resourceSourceDescriptions[resourceSourceRAMKey],
Default: int(bytesize.FormBytes(uint64(utils.NewDefaultResources().RAM)).ToMegaBytes()),
},
resourceSourceDiskKey: {
Type: schema.TypeInt,
Optional: true,
Computed: true,
Description: resourceSourceDescriptions[resourceSourceDiskKey],
Default: int(bytesize.FormBytes(uint64(utils.NewDefaultResources().Disk)).ToMegaBytes()),
},
resourceSourceConfigsKey: {
Type: schema.TypeString,
Expand Down Expand Up @@ -414,10 +414,17 @@ func resourcePulsarSourceRead(ctx context.Context, d *schema.ResourceData, meta
}
}

if len(sourceConfig.CustomRuntimeOptions) != 0 {
err = d.Set(resourceSourceCustomRuntimeOptionsKey, sourceConfig.CustomRuntimeOptions)
if err != nil {
return diag.FromErr(err)
if sourceConfig.CustomRuntimeOptions != "" {
orig, ok := d.GetOk(resourceSourceCustomRuntimeOptionsKey)
if ok {
s, err := ignoreServerSetCustomRuntimeOptions(orig.(string), sourceConfig.CustomRuntimeOptions)
if err != nil {
return diag.FromErr(err)
}
err = d.Set(resourceSourceCustomRuntimeOptionsKey, s)
if err != nil {
return diag.FromErr(err)
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion pulsar/resource_pulsar_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func testSourceImported() resource.ImportStateCheckFunc {
return fmt.Errorf("expected %d states, got %d: %#v", 1, len(s), s)
}

count := 20
count := 19
if len(s[0].Attributes) != count {
return fmt.Errorf("expected %d attrs, got %d: %#v", count, len(s[0].Attributes), s[0].Attributes)
}
Expand Down
3 changes: 2 additions & 1 deletion pulsar/testdata/sink/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ resource "pulsar_sink" "sink-1" {
parallelism = 1
auto_ack = true

processing_guarantees = "EFFECTIVELY_ONCE"
processing_guarantees = "ATLEAST_ONCE"
retain_ordering = false

archive = "https://www.apache.org/dyn/mirrors/mirrors.cgi?action=download&filename=pulsar/pulsar-2.10.4/connectors/pulsar-io-jdbc-postgres-2.10.4.nar"
configs = "{\"jdbcUrl\":\"jdbc:postgresql://localhost:5432/pulsar_postgres_jdbc_sink\",\"password\":\"password\",\"tableName\":\"pulsar_postgres_jdbc_sink\",\"userName\":\"postgres\"}"
Expand Down
Loading
Loading