diff --git a/internal/flink/command_application.go b/internal/flink/command_application.go index bb93f4915b..3703d5fdf1 100644 --- a/internal/flink/command_application.go +++ b/internal/flink/command_application.go @@ -5,10 +5,9 @@ import ( ) type flinkApplicationSummaryOut struct { - Name string `human:"Name" serialized:"name"` - Environment string `human:"Environment" serialized:"environment"` - JobName string `human:"Job Name" serialized:"job_name"` - JobStatus string `human:"Job Status" serialized:"job_status"` + Name string `human:"Name" serialized:"name"` + JobName string `human:"Job Name" serialized:"job_name"` + JobStatus string `human:"Job Status" serialized:"job_status"` } func (c *command) newApplicationCommand() *cobra.Command { diff --git a/internal/flink/command_application_list.go b/internal/flink/command_application_list.go index 9b695ee564..4a81b4b91e 100644 --- a/internal/flink/command_application_list.go +++ b/internal/flink/command_application_list.go @@ -3,6 +3,8 @@ package flink import ( "github.com/spf13/cobra" + cmfsdk "github.com/confluentinc/cmf-sdk-go/v1" + pcmd "github.com/confluentinc/cli/v4/pkg/cmd" "github.com/confluentinc/cli/v4/pkg/output" ) @@ -43,23 +45,40 @@ func (c *command) applicationList(cmd *cobra.Command, _ []string) error { if output.GetFormat(cmd) == output.Human { list := output.NewList(cmd) for _, app := range applications { - jobStatus, ok := app.Status["jobStatus"].(map[string]any) - if !ok { - jobStatus = map[string]any{} - } - envInApp, ok := app.Spec["environment"].(string) - if !ok { - envInApp = environment - } - list.Add(&flinkApplicationSummaryOut{ - Name: app.Metadata["name"].(string), - Environment: envInApp, - JobName: jobStatus["jobName"].(string), - JobStatus: jobStatus["state"].(string), - }) + appSummary := populateFlinkApplicationSummaryOut(app) + list.Add(appSummary) } return list.Print() } // if the output format is not human, we serialize the output as it is (JSON or YAML) return output.SerializedOutput(cmd, applications) } + +func populateFlinkApplicationSummaryOut(application cmfsdk.Application) *flinkApplicationSummaryOut { + var appSummary *flinkApplicationSummaryOut + + var jobStatus map[string]any = getOrDefault(application.Status, "jobStatus", map[string]any{}) + jobNameString := getOrDefault(jobStatus, "jobName", "") + jobStatusString := getOrDefault(jobStatus, "state", "") + name := getOrDefault(application.Metadata, "name", "") + + appSummary = &flinkApplicationSummaryOut{ + Name: name, + JobName: jobNameString, + JobStatus: jobStatusString, + } + + return appSummary +} + +func getOrDefault[T any](m map[string]any, key string, d T) T { + value, ok := m[key] + if !ok { + return d + } + valueCast, ok := value.(T) + if !ok { + return d + } + return valueCast +} diff --git a/test/fixtures/output/flink/application/describe-success-yaml.golden b/test/fixtures/output/flink/application/describe-success-yaml.golden index 985e67d7f4..a04104add4 100644 --- a/test/fixtures/output/flink/application/describe-success-yaml.golden +++ b/test/fixtures/output/flink/application/describe-success-yaml.golden @@ -7,7 +7,6 @@ spec: metrics.reporter.prom.factory.class: org.apache.flink.metrics.prometheus.PrometheusReporterFactory metrics.reporter.prom.port: 9249-9250 taskmanager.numberOfTaskSlots: "8" - flinkEnvironmentName: default flinkVersion: v1_19 image: confluentinc/cp-flink:1.19.1-cp1 job: diff --git a/test/fixtures/output/flink/application/describe-success.golden b/test/fixtures/output/flink/application/describe-success.golden index a8ef879eeb..1fdc7c0485 100644 --- a/test/fixtures/output/flink/application/describe-success.golden +++ b/test/fixtures/output/flink/application/describe-success.golden @@ -10,7 +10,6 @@ "metrics.reporter.prom.port": "9249-9250", "taskmanager.numberOfTaskSlots": "8" }, - "flinkEnvironmentName": "default", "flinkVersion": "v1_19", "image": "confluentinc/cp-flink:1.19.1-cp1", "job": { diff --git a/test/fixtures/output/flink/application/list-human.golden b/test/fixtures/output/flink/application/list-human.golden index 308aa0cf09..81c45eb0a2 100644 --- a/test/fixtures/output/flink/application/list-human.golden +++ b/test/fixtures/output/flink/application/list-human.golden @@ -1,4 +1,4 @@ - Name | Environment | Job Name | Job Status -------------------------+-------------+-------------------+-------------- - default-application-1 | default | State machine job | RECONCILING - default-application-2 | default | State machine job | RECONCILING + Name | Job Name | Job Status +------------------------+-------------------+-------------- + default-application-1 | State machine job | RECONCILING + default-application-2 | State machine job | RECONCILING diff --git a/test/fixtures/output/flink/application/list-json.golden b/test/fixtures/output/flink/application/list-json.golden index e4cd405db1..4ad97a6f63 100644 --- a/test/fixtures/output/flink/application/list-json.golden +++ b/test/fixtures/output/flink/application/list-json.golden @@ -11,7 +11,6 @@ "metrics.reporter.prom.port": "9249-9250", "taskmanager.numberOfTaskSlots": "8" }, - "flinkEnvironmentName": "default", "flinkVersion": "v1_19", "image": "confluentinc/cp-flink:1.19.1-cp1", "job": { @@ -93,7 +92,6 @@ "metrics.reporter.prom.port": "9249-9250", "taskmanager.numberOfTaskSlots": "8" }, - "flinkEnvironmentName": "default", "flinkVersion": "v1_19", "image": "confluentinc/cp-flink:1.19.1-cp1", "job": { diff --git a/test/fixtures/output/flink/application/list-success-human-missing-attribute.golden b/test/fixtures/output/flink/application/list-success-human-missing-attribute.golden new file mode 100644 index 0000000000..33c599273b --- /dev/null +++ b/test/fixtures/output/flink/application/list-success-human-missing-attribute.golden @@ -0,0 +1,3 @@ + Name | Job Name | Job Status +----------------------+----------+------------- + new-env-application | | diff --git a/test/fixtures/output/flink/application/list-success-json-missing-attribute.golden b/test/fixtures/output/flink/application/list-success-json-missing-attribute.golden new file mode 100644 index 0000000000..05b26019f4 --- /dev/null +++ b/test/fixtures/output/flink/application/list-success-json-missing-attribute.golden @@ -0,0 +1,81 @@ +[ + { + "apiVersion": "cmf.confluent.io/v1alpha1", + "kind": "FlinkApplication", + "metadata": { + "name": "new-env-application" + }, + "spec": { + "flinkConfiguration": { + "metrics.reporter.prom.factory.class": "org.apache.flink.metrics.prometheus.PrometheusReporterFactory", + "metrics.reporter.prom.port": "9249-9250", + "taskmanager.numberOfTaskSlots": "8" + }, + "flinkVersion": "v1_19", + "image": "confluentinc/cp-flink:1.19.1-cp1", + "job": { + "jarURI": "local:///opt/flink/examples/streaming/StateMachineExample.jar", + "parallelism": 3, + "state": "running", + "upgradeMode": "stateless" + }, + "jobManager": { + "resource": { + "cpu": 1, + "memory": "1048m" + } + }, + "serviceAccount": "flink", + "taskManager": { + "resource": { + "cpu": 1, + "memory": "1048m" + } + } + }, + "status": { + "clusterInfo": { + "flink-revision": "89d0b8f @ 2024-06-22T13:19:31+02:00", + "flink-version": "1.19.1-cp1", + "total-cpu": "3.0", + "total-memory": "3296722944" + }, + "error": null, + "jobManagerDeploymentStatus": "DEPLOYING", + "jobStatus": { + "checkpointInfo": { + "formatType": null, + "lastCheckpoint": null, + "lastPeriodicCheckpointTimestamp": 0, + "triggerId": null, + "triggerTimestamp": null, + "triggerType": null + }, + "jobId": "dcabb1ad6c40495bc2d7fa7a0097c5aa", + "savepointInfo": { + "formatType": null, + "lastPeriodicSavepointTimestamp": 0, + "lastSavepoint": null, + "savepointHistory": [], + "triggerId": null, + "triggerTimestamp": null, + "triggerType": null + }, + "startTime": "1726640263746", + "updateTime": "1726640280561" + }, + "lifecycleState": "DEPLOYED", + "observedGeneration": 4, + "reconciliationStatus": { + "lastReconciledSpec": "", + "lastStableSpec": "", + "reconciliationTimestamp": 1726640346899, + "state": "DEPLOYED" + }, + "taskManager": { + "labelSelector": "component=taskmanager,app=basic-example", + "replicas": 1 + } + } + } +] diff --git a/test/fixtures/output/flink/application/list-success-yaml-missing-attribute.golden b/test/fixtures/output/flink/application/list-success-yaml-missing-attribute.golden new file mode 100644 index 0000000000..6e6f9ce520 --- /dev/null +++ b/test/fixtures/output/flink/application/list-success-yaml-missing-attribute.golden @@ -0,0 +1,62 @@ +- apiversion: cmf.confluent.io/v1alpha1 + kind: FlinkApplication + metadata: + name: new-env-application + spec: + flinkConfiguration: + metrics.reporter.prom.factory.class: org.apache.flink.metrics.prometheus.PrometheusReporterFactory + metrics.reporter.prom.port: 9249-9250 + taskmanager.numberOfTaskSlots: "8" + flinkVersion: v1_19 + image: confluentinc/cp-flink:1.19.1-cp1 + job: + jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar + parallelism: 3 + state: running + upgradeMode: stateless + jobManager: + resource: + cpu: 1 + memory: 1048m + serviceAccount: flink + taskManager: + resource: + cpu: 1 + memory: 1048m + status: + clusterInfo: + flink-revision: 89d0b8f @ 2024-06-22T13:19:31+02:00 + flink-version: 1.19.1-cp1 + total-cpu: "3.0" + total-memory: "3296722944" + error: null + jobManagerDeploymentStatus: DEPLOYING + jobStatus: + checkpointInfo: + formatType: null + lastCheckpoint: null + lastPeriodicCheckpointTimestamp: 0 + triggerId: null + triggerTimestamp: null + triggerType: null + jobId: dcabb1ad6c40495bc2d7fa7a0097c5aa + savepointInfo: + formatType: null + lastPeriodicSavepointTimestamp: 0 + lastSavepoint: null + savepointHistory: [] + triggerId: null + triggerTimestamp: null + triggerType: null + startTime: "1726640263746" + updateTime: "1726640280561" + lifecycleState: DEPLOYED + observedGeneration: 4 + reconciliationStatus: + lastReconciledSpec: "" + lastStableSpec: "" + reconciliationTimestamp: 1.726640346899e+12 + state: DEPLOYED + taskManager: + labelSelector: component=taskmanager,app=basic-example + replicas: 1 diff --git a/test/fixtures/output/flink/application/update-successful.golden b/test/fixtures/output/flink/application/update-successful.golden index d65aa0aa09..27e2b99940 100644 --- a/test/fixtures/output/flink/application/update-successful.golden +++ b/test/fixtures/output/flink/application/update-successful.golden @@ -10,7 +10,6 @@ "metrics.reporter.prom.port": "9249-9250", "taskmanager.numberOfTaskSlots": "8" }, - "flinkEnvironmentName": "default", "flinkVersion": "v1_19", "image": "confluentinc/cp-flink:1.19.1-cp1", "job": { diff --git a/test/flink_onprem_test.go b/test/flink_onprem_test.go index 98984a2936..ec33ba7247 100644 --- a/test/flink_onprem_test.go +++ b/test/flink_onprem_test.go @@ -9,6 +9,9 @@ func (s *CLITestSuite) TestFlinkApplicationList() { {args: "flink application list --environment test", fixture: "flink/application/list-empty-env.golden"}, {args: "flink application list --environment default --output json", fixture: "flink/application/list-json.golden"}, {args: "flink application list --environment default --output human", fixture: "flink/application/list-human.golden"}, + {args: "flink application list --environment new-env", fixture: "flink/application/list-success-human-missing-attribute.golden"}, + {args: "flink application list --environment new-env --output yaml", fixture: "flink/application/list-success-yaml-missing-attribute.golden"}, + {args: "flink application list --environment new-env --output json", fixture: "flink/application/list-success-json-missing-attribute.golden"}, } for _, test := range tests { diff --git a/test/test-server/flink_onprem_handler.go b/test/test-server/flink_onprem_handler.go index ff55810b5d..30386e61d3 100644 --- a/test/test-server/flink_onprem_handler.go +++ b/test/test-server/flink_onprem_handler.go @@ -16,7 +16,7 @@ import ( ) // Helper function to create a Flink application. -func createApplication(name string, environment string) cmfsdk.Application { +func createApplication(name string) cmfsdk.Application { return cmfsdk.Application{ ApiVersion: "cmf.confluent.io/v1alpha1", Kind: "FlinkApplication", @@ -24,9 +24,8 @@ func createApplication(name string, environment string) cmfsdk.Application { "name": name, }, Spec: map[string]interface{}{ - "flinkEnvironmentName": environment, - "image": "confluentinc/cp-flink:1.19.1-cp1", - "flinkVersion": "v1_19", + "image": "confluentinc/cp-flink:1.19.1-cp1", + "flinkVersion": "v1_19", "flinkConfiguration": map[string]interface{}{ "taskmanager.numberOfTaskSlots": "8", "metrics.reporter.prom.factory.class": "org.apache.flink.metrics.prometheus.PrometheusReporterFactory", @@ -224,7 +223,7 @@ func handleCmfApplications(t *testing.T) http.HandlerFunc { environment := vars["environment"] switch r.Method { case http.MethodGet: - if environment != "default" && environment != "test" && environment != "update-failure" { + if environment != "default" && environment != "test" && environment != "update-failure" && environment != "new-env" { http.Error(w, "Environment not found", http.StatusNotFound) return } @@ -239,14 +238,25 @@ func handleCmfApplications(t *testing.T) http.HandlerFunc { page := r.URL.Query().Get("page") if environment == "default" && page == "0" { - items := []cmfsdk.Application{createApplication("default-application-1", "default"), createApplication("default-application-2", "default")} + items := []cmfsdk.Application{createApplication("default-application-1"), createApplication("default-application-2")} applicationsPage = map[string]interface{}{ "items": items, } } if environment == "update-failure" && page == "0" { - items := []cmfsdk.Application{createApplication("update-failure-application", "update-failure")} + items := []cmfsdk.Application{createApplication("update-failure-application")} + applicationsPage = map[string]interface{}{ + "items": items, + } + } + + // for new-env, return an application where some of the fields are missing. + if environment == "new-env" && page == "0" { + newApplication := createApplication("new-env-application") + delete(newApplication.Status["jobStatus"].(map[string]interface{}), "jobName") + delete(newApplication.Status["jobStatus"].(map[string]interface{}), "state") + items := []cmfsdk.Application{newApplication} applicationsPage = map[string]interface{}{ "items": items, } @@ -278,7 +288,7 @@ func handleCmfApplications(t *testing.T) http.HandlerFunc { if applicationName == "default-application-1" || applicationName == "default-application-2" { // The 'update' is going to be spec.serviceAccount. This is just a dummy update, // and we don't do any actual merge logic. - outputApplication := createApplication(applicationName, environment) + outputApplication := createApplication(applicationName) outputApplication.Spec["serviceAccount"] = application.Spec["serviceAccount"] err = json.NewEncoder(w).Encode(outputApplication) require.NoError(t, err) @@ -313,14 +323,14 @@ func handleCmfApplication(t *testing.T) http.HandlerFunc { case http.MethodGet: // In case the application actually exists, let the handler return the application. if (application == "default-application-1" || application == "default-application-2") && environment == "default" { - outputApplication := createApplication(application, environment) + outputApplication := createApplication(application) err := json.NewEncoder(w).Encode(outputApplication) require.NoError(t, err) return } if application == "update-failure-application" && environment == "update-failure" { - outputApplication := createApplication(application, environment) + outputApplication := createApplication(application) err := json.NewEncoder(w).Encode(outputApplication) require.NoError(t, err) return