diff --git a/.gitignore b/.gitignore index a1c2b27..d0c3c84 100644 --- a/.gitignore +++ b/.gitignore @@ -70,3 +70,4 @@ gen .DS_Store .vscode/ +.terraform.lock.hcl diff --git a/pulsar/resource_pulsar_function.go b/pulsar/resource_pulsar_function.go index 1ee200d..abc1d81 100644 --- a/pulsar/resource_pulsar_function.go +++ b/pulsar/resource_pulsar_function.go @@ -214,7 +214,31 @@ func resourcePulsarFunction() *schema.Resource { resourceFunctionSubscriptionPositionKey: { Type: schema.TypeString, Optional: true, + Computed: true, Description: resourceFunctionDescriptions[resourceFunctionSubscriptionPositionKey], + ValidateFunc: func(val interface{}, key string) ([]string, []error) { + v := val.(string) + subscriptionPositionSupported := []string{ + SubscriptionPositionEarliest, + SubscriptionPositionLatest, + } + + found := false + for _, item := range subscriptionPositionSupported { + if v == item { + found = true + break + } + } + if !found { + return nil, []error{ + fmt.Errorf("%s is unsupported, shold be one of %s", v, + strings.Join(subscriptionPositionSupported, ",")), + } + } + + return nil, nil + }, }, resourceFunctionCleanupSubscriptionKey: { Type: schema.TypeBool, @@ -327,19 +351,19 @@ func resourcePulsarFunction() *schema.Resource { resourceFunctionCPUKey: { Type: schema.TypeFloat, Optional: true, - Default: 0.5, + Computed: true, Description: resourceFunctionDescriptions[resourceFunctionCPUKey], }, resourceFunctionRAMKey: { Type: schema.TypeInt, Optional: true, - Default: 128, + Computed: true, Description: resourceFunctionDescriptions[resourceFunctionRAMKey], }, resourceFunctionDiskKey: { Type: schema.TypeInt, Optional: true, - Default: 128, + Computed: true, Description: resourceFunctionDescriptions[resourceFunctionDiskKey], }, resourceFunctionUserConfig: { @@ -822,9 +846,16 @@ func unmarshalFunctionConfig(functionConfig utils.FunctionConfig, d *schema.Reso } if functionConfig.CustomRuntimeOptions != "" { - err = d.Set(resourceFunctionCustomRuntimeOptionsKey, functionConfig.CustomRuntimeOptions) - if err != nil { - return err + orig, ok := d.GetOk(resourceFunctionCustomRuntimeOptionsKey) + if ok { + s, err := ignoreServerSetCustomRuntimeOptions(orig.(string), functionConfig.CustomRuntimeOptions) + if err != nil { + return err + } + err = d.Set(resourceFunctionCustomRuntimeOptionsKey, s) + if err != nil { + return err + } } } diff --git a/pulsar/resource_pulsar_sink.go b/pulsar/resource_pulsar_sink.go index e95eaa8..0c286fb 100644 --- a/pulsar/resource_pulsar_sink.go +++ b/pulsar/resource_pulsar_sink.go @@ -173,8 +173,31 @@ func resourcePulsarSink() *schema.Resource { resourceSinkSubscriptionPositionKey: { Type: schema.TypeString, Optional: true, - Default: "Earliest", + Default: SubscriptionPositionEarliest, Description: resourceSinkDescriptions[resourceSinkSubscriptionPositionKey], + ValidateFunc: func(val interface{}, key string) ([]string, []error) { + v := val.(string) + subscriptionPositionSupported := []string{ + SubscriptionPositionEarliest, + SubscriptionPositionLatest, + } + + found := false + for _, item := range subscriptionPositionSupported { + if v == item { + found = true + break + } + } + if !found { + return nil, []error{ + fmt.Errorf("%s is unsupported, shold be one of %s", v, + strings.Join(subscriptionPositionSupported, ",")), + } + } + + return nil, nil + }, }, resourceSinkCustomSerdeInputsKey: { Type: schema.TypeMap, @@ -260,19 +283,19 @@ func resourcePulsarSink() *schema.Resource { resourceSinkCPUKey: { Type: schema.TypeFloat, Optional: true, - Default: utils.NewDefaultResources().CPU, + Computed: true, Description: resourceSinkDescriptions[resourceSinkCPUKey], }, resourceSinkRAMKey: { Type: schema.TypeInt, Optional: true, - Default: int(bytesize.FormBytes(uint64(utils.NewDefaultResources().RAM)).ToMegaBytes()), + Computed: true, Description: resourceSinkDescriptions[resourceSinkRAMKey], }, resourceSinkDiskKey: { Type: schema.TypeInt, Optional: true, - Default: int(bytesize.FormBytes(uint64(utils.NewDefaultResources().Disk)).ToMegaBytes()), + Computed: true, Description: resourceSinkDescriptions[resourceSinkDiskKey], }, resourceSinkConfigsKey: { @@ -294,6 +317,7 @@ func resourcePulsarSink() *schema.Resource { resourceSinkCustomRuntimeOptionsKey: { Type: schema.TypeString, Optional: true, + Computed: true, Description: resourceSinkDescriptions[resourceSinkCustomRuntimeOptionsKey], ValidateFunc: jsonValidateFunc, }, @@ -353,11 +377,6 @@ func resourcePulsarSinkCreate(ctx context.Context, d *schema.ResourceData, meta } func resourcePulsarSinkRead(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics { - // NOTE: Pulsar cannot returns the fields correctly, so ignore these: - // - resourceSinkSubscriptionPositionKey - // - resourceSinkProcessingGuaranteesKey - // - resourceSinkRetainOrderingKey - client := meta.(admin.Client).Sinks() tenant := d.Get(resourceSinkTenantKey).(string) @@ -500,10 +519,17 @@ func resourcePulsarSinkRead(ctx context.Context, d *schema.ResourceData, meta in } } - if len(sinkConfig.CustomRuntimeOptions) != 0 { - err = d.Set(resourceSinkCustomRuntimeOptionsKey, sinkConfig.CustomRuntimeOptions) - if err != nil { - return diag.FromErr(err) + if sinkConfig.CustomRuntimeOptions != "" { + orig, ok := d.GetOk(resourceSinkCustomRuntimeOptionsKey) + if ok { + s, err := ignoreServerSetCustomRuntimeOptions(orig.(string), sinkConfig.CustomRuntimeOptions) + if err != nil { + return diag.FromErr(err) + } + err = d.Set(resourceSinkCustomRuntimeOptionsKey, s) + if err != nil { + return diag.FromErr(err) + } } } @@ -547,6 +573,25 @@ func resourcePulsarSinkRead(ctx context.Context, d *schema.ResourceData, meta in } } + err = d.Set(resourceSinkRetainOrderingKey, sinkConfig.RetainOrdering) + if err != nil { + return diag.FromErr(err) + } + + if sinkConfig.ProcessingGuarantees != "" { + err = d.Set(resourceSinkProcessingGuaranteesKey, sinkConfig.ProcessingGuarantees) + if err != nil { + return diag.FromErr(err) + } + } + + if sinkConfig.SourceSubscriptionPosition != "" { + err = d.Set(resourceSinkSubscriptionPositionKey, sinkConfig.SourceSubscriptionPosition) + if err != nil { + return diag.FromErr(err) + } + } + return nil } diff --git a/pulsar/resource_pulsar_sink_test.go b/pulsar/resource_pulsar_sink_test.go index b246837..f5a3584 100644 --- a/pulsar/resource_pulsar_sink_test.go +++ b/pulsar/resource_pulsar_sink_test.go @@ -20,7 +20,7 @@ package pulsar import ( "encoding/json" "fmt" - "io/ioutil" + "os" "strings" "testing" @@ -44,7 +44,7 @@ func init() { } func TestSink(t *testing.T) { - configBytes, err := ioutil.ReadFile("testdata/sink/main.tf") + configBytes, err := os.ReadFile("testdata/sink/main.tf") if err != nil { t.Fatal(err) } @@ -142,8 +142,8 @@ func testSinkImported() resource.ImportStateCheckFunc { return fmt.Errorf("expected %d states, got %d: %#v", 1, len(s), s) } - if len(s[0].Attributes) != 27 { - return fmt.Errorf("expected %d attrs, got %d: %#v", 24, len(s[0].Attributes), s[0].Attributes) + if len(s[0].Attributes) != 30 { + return fmt.Errorf("expected %d attrs, got %d: %#v", 30, len(s[0].Attributes), s[0].Attributes) } return nil @@ -173,7 +173,7 @@ func createSampleSink(name string) error { config := &utils.SinkConfig{ CleanupSubscription: false, - RetainOrdering: false, + RetainOrdering: true, AutoAck: true, Parallelism: 1, Tenant: "public", @@ -223,6 +223,7 @@ resource "pulsar_sink" "test" { max_redeliver_count = 5 negative_ack_redelivery_delay_ms = 3000 retain_key_ordering = false + retain_ordering = true secrets ="{\"SECRET1\": {\"path\": \"sectest\", \"key\": \"hello\"}}" processing_guarantees = "EFFECTIVELY_ONCE" @@ -236,3 +237,50 @@ resource "pulsar_sink" "test" { } `, name, testdataArchive) } + +func TestSinkUpdate(t *testing.T) { + configBytes, err := os.ReadFile("testdata/sink/main.tf") + if err != nil { + t.Fatal(err) + } + configString := string(configBytes) + configString = strings.ReplaceAll(configString, "sink-1", "update-sink-test-1") + + resource.Test(t, resource.TestCase{ + PreCheck: func() { testAccPreCheckWithAPIVersion(t, config.V3) }, + ProviderFactories: testAccProviderFactories, + PreventPostDestroyRefresh: false, + CheckDestroy: testPulsarSinkDestroy, + Steps: []resource.TestStep{ + { + Config: configString, + Check: resource.ComposeTestCheckFunc(func(s *terraform.State) error { + name := "pulsar_sink.update-sink-test-1" + rs, ok := s.RootModule().Resources[name] + if !ok { + return fmt.Errorf("%s not be found", name) + } + + client := getClientFromMeta(testAccProvider.Meta()).Sinks() + + parts := strings.Split(rs.Primary.ID, "/") + if len(parts) != 3 { + return errors.New("resource id should be tenant/namespace/name format") + } + + _, err := client.GetSink(parts[0], parts[1], parts[2]) + if err != nil { + return err + } + + return nil + }), + }, + { + Config: configString, + PlanOnly: true, + ExpectNonEmptyPlan: false, + }, + }, + }) +} diff --git a/pulsar/resource_pulsar_source.go b/pulsar/resource_pulsar_source.go index 46da224..8f98753 100644 --- a/pulsar/resource_pulsar_source.go +++ b/pulsar/resource_pulsar_source.go @@ -203,20 +203,20 @@ func resourcePulsarSource() *schema.Resource { resourceSourceCPUKey: { Type: schema.TypeFloat, Optional: true, + Computed: true, Description: resourceSourceDescriptions[resourceSourceCPUKey], - Default: utils.NewDefaultResources().CPU, }, resourceSourceRAMKey: { Type: schema.TypeInt, Optional: true, + Computed: true, Description: resourceSourceDescriptions[resourceSourceRAMKey], - Default: int(bytesize.FormBytes(uint64(utils.NewDefaultResources().RAM)).ToMegaBytes()), }, resourceSourceDiskKey: { Type: schema.TypeInt, Optional: true, + Computed: true, Description: resourceSourceDescriptions[resourceSourceDiskKey], - Default: int(bytesize.FormBytes(uint64(utils.NewDefaultResources().Disk)).ToMegaBytes()), }, resourceSourceConfigsKey: { Type: schema.TypeString, @@ -414,10 +414,17 @@ func resourcePulsarSourceRead(ctx context.Context, d *schema.ResourceData, meta } } - if len(sourceConfig.CustomRuntimeOptions) != 0 { - err = d.Set(resourceSourceCustomRuntimeOptionsKey, sourceConfig.CustomRuntimeOptions) - if err != nil { - return diag.FromErr(err) + if sourceConfig.CustomRuntimeOptions != "" { + orig, ok := d.GetOk(resourceSourceCustomRuntimeOptionsKey) + if ok { + s, err := ignoreServerSetCustomRuntimeOptions(orig.(string), sourceConfig.CustomRuntimeOptions) + if err != nil { + return diag.FromErr(err) + } + err = d.Set(resourceSourceCustomRuntimeOptionsKey, s) + if err != nil { + return diag.FromErr(err) + } } } diff --git a/pulsar/resource_pulsar_source_test.go b/pulsar/resource_pulsar_source_test.go index 52bf8b0..9822d6c 100644 --- a/pulsar/resource_pulsar_source_test.go +++ b/pulsar/resource_pulsar_source_test.go @@ -159,7 +159,7 @@ func testSourceImported() resource.ImportStateCheckFunc { return fmt.Errorf("expected %d states, got %d: %#v", 1, len(s), s) } - count := 20 + count := 19 if len(s[0].Attributes) != count { return fmt.Errorf("expected %d attrs, got %d: %#v", count, len(s[0].Attributes), s[0].Attributes) } diff --git a/pulsar/testdata/sink/main.tf b/pulsar/testdata/sink/main.tf index e5ed563..af78548 100644 --- a/pulsar/testdata/sink/main.tf +++ b/pulsar/testdata/sink/main.tf @@ -32,7 +32,8 @@ resource "pulsar_sink" "sink-1" { parallelism = 1 auto_ack = true - processing_guarantees = "EFFECTIVELY_ONCE" + processing_guarantees = "ATLEAST_ONCE" + retain_ordering = false archive = "https://www.apache.org/dyn/mirrors/mirrors.cgi?action=download&filename=pulsar/pulsar-2.10.4/connectors/pulsar-io-jdbc-postgres-2.10.4.nar" configs = "{\"jdbcUrl\":\"jdbc:postgresql://localhost:5432/pulsar_postgres_jdbc_sink\",\"password\":\"password\",\"tableName\":\"pulsar_postgres_jdbc_sink\",\"userName\":\"postgres\"}" diff --git a/pulsar/util.go b/pulsar/util.go index 97139a3..305ce30 100644 --- a/pulsar/util.go +++ b/pulsar/util.go @@ -12,6 +12,11 @@ const ( ProcessingGuaranteesEffectivelyOnce = "EFFECTIVELY_ONCE" ) +const ( + SubscriptionPositionEarliest = "Earliest" + SubscriptionPositionLatest = "Latest" +) + func isPackageURLSupported(functionPkgURL string) bool { return strings.HasPrefix(functionPkgURL, "http://") || strings.HasPrefix(functionPkgURL, "https://") || @@ -31,3 +36,27 @@ func jsonValidateFunc(i interface{}, s string) ([]string, []error) { } return nil, nil } + +func ignoreServerSetCustomRuntimeOptions(tfGenString string, readString string) (string, error) { + tfGenMap := make(map[string]interface{}) + err := json.Unmarshal([]byte(tfGenString), &tfGenMap) + if err != nil { + return "", err + } + readMap := make(map[string]interface{}) + computedMap := make(map[string]interface{}) + err = json.Unmarshal([]byte(readString), &readMap) + if err != nil { + return "", err + } + for k := range readMap { + if _, has := tfGenMap[k]; has { + computedMap[k] = readMap[k] + } + } + s, err := json.Marshal(computedMap) + if err != nil { + return "", err + } + return string(s), nil +}