Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix issues with flink application list command #2909

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions internal/flink/command_application.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@ import (
)

type flinkApplicationSummaryOut struct {
Name string `human:"Name" serialized:"name"`
Environment string `human:"Environment" serialized:"environment"`
Copy link
Member

@sgagniere sgagniere Oct 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a breaking change (environment).

JobName string `human:"Job Name" serialized:"job_name"`
JobStatus string `human:"Job Status" serialized:"job_status"`
Name string `human:"Name" serialized:"name"`
JobName string `human:"Job Name" serialized:"job_name"`
JobStatus string `human:"Job Status" serialized:"job_status"`
}

func (c *command) newApplicationCommand() *cobra.Command {
Expand Down
47 changes: 33 additions & 14 deletions internal/flink/command_application_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package flink
import (
"github.com/spf13/cobra"

cmfsdk "github.com/confluentinc/cmf-sdk-go/v1"

pcmd "github.com/confluentinc/cli/v4/pkg/cmd"
"github.com/confluentinc/cli/v4/pkg/output"
)
Expand Down Expand Up @@ -43,23 +45,40 @@ func (c *command) applicationList(cmd *cobra.Command, _ []string) error {
if output.GetFormat(cmd) == output.Human {
list := output.NewList(cmd)
for _, app := range applications {
jobStatus, ok := app.Status["jobStatus"].(map[string]any)
if !ok {
jobStatus = map[string]any{}
}
envInApp, ok := app.Spec["environment"].(string)
if !ok {
envInApp = environment
}
list.Add(&flinkApplicationSummaryOut{
Name: app.Metadata["name"].(string),
Environment: envInApp,
JobName: jobStatus["jobName"].(string),
JobStatus: jobStatus["state"].(string),
})
appSummary := populateFlinkApplicationSummaryOut(app)
list.Add(appSummary)
}
return list.Print()
}
// if the output format is not human, we serialize the output as it is (JSON or YAML)
return output.SerializedOutput(cmd, applications)
}

func populateFlinkApplicationSummaryOut(application cmfsdk.Application) *flinkApplicationSummaryOut {
var appSummary *flinkApplicationSummaryOut

var jobStatus map[string]any = getOrDefault(application.Status, "jobStatus", map[string]any{})
jobNameString := getOrDefault(jobStatus, "jobName", "")
jobStatusString := getOrDefault(jobStatus, "state", "")
name := getOrDefault(application.Metadata, "name", "")

appSummary = &flinkApplicationSummaryOut{
Name: name,
JobName: jobNameString,
JobStatus: jobStatusString,
}

return appSummary
}

func getOrDefault[T any](m map[string]any, key string, d T) T {
value, ok := m[key]
if !ok {
return d
}
valueCast, ok := value.(T)
if !ok {
return d
}
return valueCast
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ spec:
metrics.reporter.prom.factory.class: org.apache.flink.metrics.prometheus.PrometheusReporterFactory
metrics.reporter.prom.port: 9249-9250
taskmanager.numberOfTaskSlots: "8"
flinkEnvironmentName: default
flinkVersion: v1_19
image: confluentinc/cp-flink:1.19.1-cp1
job:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
"metrics.reporter.prom.port": "9249-9250",
"taskmanager.numberOfTaskSlots": "8"
},
"flinkEnvironmentName": "default",
"flinkVersion": "v1_19",
"image": "confluentinc/cp-flink:1.19.1-cp1",
"job": {
Expand Down
8 changes: 4 additions & 4 deletions test/fixtures/output/flink/application/list-human.golden
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
Name | Environment | Job Name | Job Status
------------------------+-------------+-------------------+--------------
default-application-1 | default | State machine job | RECONCILING
default-application-2 | default | State machine job | RECONCILING
Name | Job Name | Job Status
------------------------+-------------------+--------------
default-application-1 | State machine job | RECONCILING
default-application-2 | State machine job | RECONCILING
2 changes: 0 additions & 2 deletions test/fixtures/output/flink/application/list-json.golden
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
"metrics.reporter.prom.port": "9249-9250",
"taskmanager.numberOfTaskSlots": "8"
},
"flinkEnvironmentName": "default",
"flinkVersion": "v1_19",
"image": "confluentinc/cp-flink:1.19.1-cp1",
"job": {
Expand Down Expand Up @@ -93,7 +92,6 @@
"metrics.reporter.prom.port": "9249-9250",
"taskmanager.numberOfTaskSlots": "8"
},
"flinkEnvironmentName": "default",
"flinkVersion": "v1_19",
"image": "confluentinc/cp-flink:1.19.1-cp1",
"job": {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Name | Job Name | Job Status
----------------------+----------+-------------
new-env-application | |
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
[
{
"apiVersion": "cmf.confluent.io/v1alpha1",
"kind": "FlinkApplication",
"metadata": {
"name": "new-env-application"
},
"spec": {
"flinkConfiguration": {
"metrics.reporter.prom.factory.class": "org.apache.flink.metrics.prometheus.PrometheusReporterFactory",
"metrics.reporter.prom.port": "9249-9250",
"taskmanager.numberOfTaskSlots": "8"
},
"flinkVersion": "v1_19",
"image": "confluentinc/cp-flink:1.19.1-cp1",
"job": {
"jarURI": "local:///opt/flink/examples/streaming/StateMachineExample.jar",
"parallelism": 3,
"state": "running",
"upgradeMode": "stateless"
},
"jobManager": {
"resource": {
"cpu": 1,
"memory": "1048m"
}
},
"serviceAccount": "flink",
"taskManager": {
"resource": {
"cpu": 1,
"memory": "1048m"
}
}
},
"status": {
"clusterInfo": {
"flink-revision": "89d0b8f @ 2024-06-22T13:19:31+02:00",
"flink-version": "1.19.1-cp1",
"total-cpu": "3.0",
"total-memory": "3296722944"
},
"error": null,
"jobManagerDeploymentStatus": "DEPLOYING",
"jobStatus": {
"checkpointInfo": {
"formatType": null,
"lastCheckpoint": null,
"lastPeriodicCheckpointTimestamp": 0,
"triggerId": null,
"triggerTimestamp": null,
"triggerType": null
},
"jobId": "dcabb1ad6c40495bc2d7fa7a0097c5aa",
"savepointInfo": {
"formatType": null,
"lastPeriodicSavepointTimestamp": 0,
"lastSavepoint": null,
"savepointHistory": [],
"triggerId": null,
"triggerTimestamp": null,
"triggerType": null
},
"startTime": "1726640263746",
"updateTime": "1726640280561"
},
"lifecycleState": "DEPLOYED",
"observedGeneration": 4,
"reconciliationStatus": {
"lastReconciledSpec": "",
"lastStableSpec": "",
"reconciliationTimestamp": 1726640346899,
"state": "DEPLOYED"
},
"taskManager": {
"labelSelector": "component=taskmanager,app=basic-example",
"replicas": 1
}
}
}
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
- apiversion: cmf.confluent.io/v1alpha1
kind: FlinkApplication
metadata:
name: new-env-application
spec:
flinkConfiguration:
metrics.reporter.prom.factory.class: org.apache.flink.metrics.prometheus.PrometheusReporterFactory
metrics.reporter.prom.port: 9249-9250
taskmanager.numberOfTaskSlots: "8"
flinkVersion: v1_19
image: confluentinc/cp-flink:1.19.1-cp1
job:
jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
parallelism: 3
state: running
upgradeMode: stateless
jobManager:
resource:
cpu: 1
memory: 1048m
serviceAccount: flink
taskManager:
resource:
cpu: 1
memory: 1048m
status:
clusterInfo:
flink-revision: 89d0b8f @ 2024-06-22T13:19:31+02:00
flink-version: 1.19.1-cp1
total-cpu: "3.0"
total-memory: "3296722944"
error: null
jobManagerDeploymentStatus: DEPLOYING
jobStatus:
checkpointInfo:
formatType: null
lastCheckpoint: null
lastPeriodicCheckpointTimestamp: 0
triggerId: null
triggerTimestamp: null
triggerType: null
jobId: dcabb1ad6c40495bc2d7fa7a0097c5aa
savepointInfo:
formatType: null
lastPeriodicSavepointTimestamp: 0
lastSavepoint: null
savepointHistory: []
triggerId: null
triggerTimestamp: null
triggerType: null
startTime: "1726640263746"
updateTime: "1726640280561"
lifecycleState: DEPLOYED
observedGeneration: 4
reconciliationStatus:
lastReconciledSpec: ""
lastStableSpec: ""
reconciliationTimestamp: 1.726640346899e+12
state: DEPLOYED
taskManager:
labelSelector: component=taskmanager,app=basic-example
replicas: 1
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
"metrics.reporter.prom.port": "9249-9250",
"taskmanager.numberOfTaskSlots": "8"
},
"flinkEnvironmentName": "default",
"flinkVersion": "v1_19",
"image": "confluentinc/cp-flink:1.19.1-cp1",
"job": {
Expand Down
3 changes: 3 additions & 0 deletions test/flink_onprem_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ func (s *CLITestSuite) TestFlinkApplicationList() {
{args: "flink application list --environment test", fixture: "flink/application/list-empty-env.golden"},
{args: "flink application list --environment default --output json", fixture: "flink/application/list-json.golden"},
{args: "flink application list --environment default --output human", fixture: "flink/application/list-human.golden"},
{args: "flink application list --environment new-env", fixture: "flink/application/list-success-human-missing-attribute.golden"},
{args: "flink application list --environment new-env --output yaml", fixture: "flink/application/list-success-yaml-missing-attribute.golden"},
{args: "flink application list --environment new-env --output json", fixture: "flink/application/list-success-json-missing-attribute.golden"},
}

for _, test := range tests {
Expand Down
30 changes: 20 additions & 10 deletions test/test-server/flink_onprem_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,16 @@ import (
)

// Helper function to create a Flink application.
func createApplication(name string, environment string) cmfsdk.Application {
func createApplication(name string) cmfsdk.Application {
return cmfsdk.Application{
ApiVersion: "cmf.confluent.io/v1alpha1",
Kind: "FlinkApplication",
Metadata: map[string]interface{}{
"name": name,
},
Spec: map[string]interface{}{
"flinkEnvironmentName": environment,
"image": "confluentinc/cp-flink:1.19.1-cp1",
"flinkVersion": "v1_19",
"image": "confluentinc/cp-flink:1.19.1-cp1",
"flinkVersion": "v1_19",
"flinkConfiguration": map[string]interface{}{
"taskmanager.numberOfTaskSlots": "8",
"metrics.reporter.prom.factory.class": "org.apache.flink.metrics.prometheus.PrometheusReporterFactory",
Expand Down Expand Up @@ -224,7 +223,7 @@ func handleCmfApplications(t *testing.T) http.HandlerFunc {
environment := vars["environment"]
switch r.Method {
case http.MethodGet:
if environment != "default" && environment != "test" && environment != "update-failure" {
if environment != "default" && environment != "test" && environment != "update-failure" && environment != "new-env" {
http.Error(w, "Environment not found", http.StatusNotFound)
return
}
Expand All @@ -239,14 +238,25 @@ func handleCmfApplications(t *testing.T) http.HandlerFunc {
page := r.URL.Query().Get("page")

if environment == "default" && page == "0" {
items := []cmfsdk.Application{createApplication("default-application-1", "default"), createApplication("default-application-2", "default")}
items := []cmfsdk.Application{createApplication("default-application-1"), createApplication("default-application-2")}
applicationsPage = map[string]interface{}{
"items": items,
}
}

if environment == "update-failure" && page == "0" {
items := []cmfsdk.Application{createApplication("update-failure-application", "update-failure")}
items := []cmfsdk.Application{createApplication("update-failure-application")}
applicationsPage = map[string]interface{}{
"items": items,
}
}

// for new-env, return an application where some of the fields are missing.
if environment == "new-env" && page == "0" {
newApplication := createApplication("new-env-application")
delete(newApplication.Status["jobStatus"].(map[string]interface{}), "jobName")
delete(newApplication.Status["jobStatus"].(map[string]interface{}), "state")
items := []cmfsdk.Application{newApplication}
applicationsPage = map[string]interface{}{
"items": items,
}
Expand Down Expand Up @@ -278,7 +288,7 @@ func handleCmfApplications(t *testing.T) http.HandlerFunc {
if applicationName == "default-application-1" || applicationName == "default-application-2" {
// The 'update' is going to be spec.serviceAccount. This is just a dummy update,
// and we don't do any actual merge logic.
outputApplication := createApplication(applicationName, environment)
outputApplication := createApplication(applicationName)
outputApplication.Spec["serviceAccount"] = application.Spec["serviceAccount"]
err = json.NewEncoder(w).Encode(outputApplication)
require.NoError(t, err)
Expand Down Expand Up @@ -313,14 +323,14 @@ func handleCmfApplication(t *testing.T) http.HandlerFunc {
case http.MethodGet:
// In case the application actually exists, let the handler return the application.
if (application == "default-application-1" || application == "default-application-2") && environment == "default" {
outputApplication := createApplication(application, environment)
outputApplication := createApplication(application)
err := json.NewEncoder(w).Encode(outputApplication)
require.NoError(t, err)
return
}

if application == "update-failure-application" && environment == "update-failure" {
outputApplication := createApplication(application, environment)
outputApplication := createApplication(application)
err := json.NewEncoder(w).Encode(outputApplication)
require.NoError(t, err)
return
Expand Down