diff --git a/backend/components/flink-connector/Dockerfile.result-sender b/backend/components/flink-connector/Dockerfile.result-sender new file mode 100644 index 000000000..6580476fa --- /dev/null +++ b/backend/components/flink-connector/Dockerfile.result-sender @@ -0,0 +1,16 @@ +FROM golang:1.17 + +WORKDIR /app + +COPY ./src/types.go ./src/tools.go ./src/result-sender.go ./ + +RUN go mod init main && \ + go get github.com/confluentinc/confluent-kafka-go/v2/kafka && \ + go get github.com/confluentinc/confluent-kafka-go/v2/schemaregistry && \ + go get github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/serde && \ + go get github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/serde/avro && \ + go get golang.org/x/net + +RUN go build -o app + +CMD ["./app"] diff --git a/backend/components/flink-connector/Dockerfile.statements-executor b/backend/components/flink-connector/Dockerfile.statements-executor new file mode 100644 index 000000000..3280b144f --- /dev/null +++ b/backend/components/flink-connector/Dockerfile.statements-executor @@ -0,0 +1,16 @@ +FROM golang:1.17 + +WORKDIR /app + +COPY ./src/types.go ./src/tools.go ./src/statements-executor.go ./ + +RUN go mod init main && \ + go get github.com/confluentinc/confluent-kafka-go/v2/kafka && \ + go get github.com/confluentinc/confluent-kafka-go/v2/schemaregistry && \ + go get github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/serde && \ + go get github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/serde/avro && \ + go get golang.org/x/net + +RUN go build -o app + +CMD ["./app"] diff --git a/backend/components/flink-connector/Makefile b/backend/components/flink-connector/Makefile new file mode 100644 index 000000000..1e7dcec8d --- /dev/null +++ b/backend/components/flink-connector/Makefile @@ -0,0 +1,13 @@ +build-statements-executor: + docker build -t flink-connector/statements-executor -f Dockerfile.statements-executor . + +release-statements-executor: build-statements-executor + docker tag flink-connector/statements-executor ghcr.io/airyhq/connectors/flink/statements-executor:release + docker push ghcr.io/airyhq/connectors/flink/statements-executor:release + +build-result-sender: + docker build -t flink-connector/result-sender -f Dockerfile.result-sender . + +release-result-sender: build-result-sender + docker tag flink-connector/result-sender ghcr.io/airyhq/connectors/flink/result-sender:release + docker push ghcr.io/airyhq/connectors/flink/result-sender:release diff --git a/backend/components/flink-connector/helm/BUILD b/backend/components/flink-connector/helm/BUILD new file mode 100644 index 000000000..45805d5f1 --- /dev/null +++ b/backend/components/flink-connector/helm/BUILD @@ -0,0 +1,3 @@ +load("//tools/build:helm.bzl", "helm_ruleset_core_version") + +helm_ruleset_core_version() diff --git a/backend/components/flink-connector/helm/Chart.yaml b/backend/components/flink-connector/helm/Chart.yaml new file mode 100644 index 000000000..4eb993fb2 --- /dev/null +++ b/backend/components/flink-connector/helm/Chart.yaml @@ -0,0 +1,6 @@ + +apiVersion: v2 +appVersion: "1.0" +description: Flink connector +name: flink-connector +version: 1.0 \ No newline at end of file diff --git a/backend/components/flink-connector/helm/templates/configmap.yaml b/backend/components/flink-connector/helm/templates/configmap.yaml new file mode 100644 index 000000000..d8301a65d --- /dev/null +++ b/backend/components/flink-connector/helm/templates/configmap.yaml @@ -0,0 +1,10 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: {{ .Values.component }} + labels: + core.airy.co/managed: "true" + core.airy.co/mandatory: "{{ .Values.mandatory }}" + core.airy.co/component: "{{ .Values.component }}" + annotations: + core.airy.co/enabled: "{{ .Values.enabled }}" \ No newline at end of file diff --git a/backend/components/flink-connector/helm/templates/result-sender/deployment.yaml b/backend/components/flink-connector/helm/templates/result-sender/deployment.yaml new file mode 100644 index 000000000..0f50fc554 --- /dev/null +++ b/backend/components/flink-connector/helm/templates/result-sender/deployment.yaml @@ -0,0 +1,50 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{ .Values.component }}-{{ .Values.resultSender.name }} + labels: + app: {{ .Values.component }} + core.airy.co/managed: "true" + core.airy.co/mandatory: "{{ .Values.mandatory }}" + core.airy.co/component: {{ .Values.component }} +spec: + replicas: {{ if .Values.enabled }} 1 {{ else }} 0 {{ end }} + selector: + matchLabels: + app: {{ .Values.component }}-{{ .Values.resultSender.name }} + strategy: + rollingUpdate: + maxSurge: 1 + maxUnavailable: 1 + type: RollingUpdate + template: + metadata: + labels: + app: {{ .Values.component }}-{{ .Values.resultSender.name }} + spec: + containers: + - name: app + image: "ghcr.io/airyhq/{{ .Values.resultSender.image }}:release" + imagePullPolicy: Always + envFrom: + - configMapRef: + name: security + - configMapRef: + name: kafka-config + - configMapRef: + name: {{ .Values.component }} + env: + - name: KAFKA_TOPIC_NAME + value: {{ .Values.resultSender.topic }} + - name: API_COMMUNICATION_URL + value: {{ .Values.apiCommunicationUrl }} + livenessProbe: + httpGet: + path: /actuator/health + port: {{ .Values.port }} + httpHeaders: + - name: Health-Check + value: health-check + initialDelaySeconds: 43200 + periodSeconds: 10 + failureThreshold: 3 \ No newline at end of file diff --git a/backend/components/flink-connector/helm/templates/result-sender/service.yaml b/backend/components/flink-connector/helm/templates/result-sender/service.yaml new file mode 100644 index 000000000..cdf73d72b --- /dev/null +++ b/backend/components/flink-connector/helm/templates/result-sender/service.yaml @@ -0,0 +1,15 @@ +apiVersion: v1 +kind: Service +metadata: + name: {{ .Values.component }}-{{ .Values.resultSender.name }} + labels: + app: {{ .Values.component }}-{{ .Values.resultSender.name }} +spec: + type: ClusterIP + clusterIP: None + ports: + - name: {{ .Values.component }}-{{ .Values.resultSender.name }} + port: 80 + targetPort: {{ .Values.port }} + selector: + app: {{ .Values.component }}-{{ .Values.resultSender.name }} \ No newline at end of file diff --git a/backend/components/flink-connector/helm/templates/statements-executor/deployment.yaml b/backend/components/flink-connector/helm/templates/statements-executor/deployment.yaml new file mode 100644 index 000000000..44c7b6e59 --- /dev/null +++ b/backend/components/flink-connector/helm/templates/statements-executor/deployment.yaml @@ -0,0 +1,50 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{ .Values.component }}-{{ .Values.executor.name }} + labels: + app: {{ .Values.component }} + core.airy.co/managed: "true" + core.airy.co/mandatory: "{{ .Values.mandatory }}" + core.airy.co/component: {{ .Values.component }} +spec: + replicas: {{ if .Values.enabled }} 1 {{ else }} 0 {{ end }} + selector: + matchLabels: + app: {{ .Values.component }}-{{ .Values.executor.name }} + strategy: + rollingUpdate: + maxSurge: 1 + maxUnavailable: 1 + type: RollingUpdate + template: + metadata: + labels: + app: {{ .Values.component }}-{{ .Values.executor.name }} + spec: + containers: + - name: app + image: "ghcr.io/airyhq/{{ .Values.executor.image }}:release" + imagePullPolicy: Always + envFrom: + - configMapRef: + name: security + - configMapRef: + name: kafka-config + - configMapRef: + name: {{ .Values.component }} + env: + - name: KAFKA_TOPIC_NAME + value: {{ .Values.executor.topic }} + - name: FLINK_GATEWAY_URL + value: {{ .Values.gatewayUrl }} + livenessProbe: + httpGet: + path: /actuator/health + port: {{ .Values.port }} + httpHeaders: + - name: Health-Check + value: health-check + initialDelaySeconds: 43200 + periodSeconds: 10 + failureThreshold: 3 \ No newline at end of file diff --git a/backend/components/flink-connector/helm/templates/statements-executor/service.yaml b/backend/components/flink-connector/helm/templates/statements-executor/service.yaml new file mode 100644 index 000000000..3e5fbfc30 --- /dev/null +++ b/backend/components/flink-connector/helm/templates/statements-executor/service.yaml @@ -0,0 +1,16 @@ + +apiVersion: v1 +kind: Service +metadata: + name: {{ .Values.component }}-{{ .Values.executor.name }} + labels: + app: {{ .Values.component }}-{{ .Values.executor.name }} +spec: + type: ClusterIP + clusterIP: None + ports: + - name: {{ .Values.component }}-{{ .Values.executor.name }} + port: 80 + targetPort: {{ .Values.port }} + selector: + app: {{ .Values.component }}-{{ .Values.executor.name }} \ No newline at end of file diff --git a/backend/components/flink-connector/helm/values.yaml b/backend/components/flink-connector/helm/values.yaml new file mode 100644 index 000000000..71f4475a3 --- /dev/null +++ b/backend/components/flink-connector/helm/values.yaml @@ -0,0 +1,16 @@ + +component: flink-connector +mandatory: false +enabled: false +port: 8080 +resources: +gatewayUrl: "http://flink-jobmanager:8083" +apiCommunicationUrl: "http://api-communication/messages.send" +executor: + name: statements-executor + image: connectors/flink/statements-executor + topic: flink.statements +resultSender: + name: result-sender + image: connectors/flink/result-sender + topic: flink.output \ No newline at end of file diff --git a/backend/components/flink-connector/src/result-sender.go b/backend/components/flink-connector/src/result-sender.go new file mode 100644 index 000000000..7dd5f91a8 --- /dev/null +++ b/backend/components/flink-connector/src/result-sender.go @@ -0,0 +1,158 @@ +package main + +import ( + "encoding/json" + "fmt" + "net/http" + "os" + "os/signal" + "syscall" + "time" + + "github.com/confluentinc/confluent-kafka-go/v2/kafka" +) + +func main() { + + // Create Kafka consumer to read the statements + kafkaURL := os.Getenv("KAFKA_BROKERS") + schemaRegistryURL := os.Getenv("KAFKA_SCHEMA_REGISTRY_URL") + topicName := os.Getenv("KAFKA_TOPIC_NAME") + systemToken := os.Getenv("systemToken") + authUsername := os.Getenv("AUTH_JAAS_USERNAME") + authPassword := os.Getenv("AUTH_JAAS_PASSWORD") + flinkProvider := os.Getenv("flnkProvider") + groupID := "result-sender" + msgNormal := false + msgDebug := true + + if kafkaURL == "" || schemaRegistryURL == "" || topicName == "" { + fmt.Println("KAFKA_BROKERS, KAFKA_SCHEMA_REGISTRY_URL, and KAFKA_TOPIC_NAME environment variables must be set") + return + } + + var confluentConnection ConfluentConnection + confluentConnection.Token = os.Getenv("CONFLUENT_TOKEN") + confluentConnection.ComputePoolID = os.Getenv("CONFLUENT_COMPUTE_POOL_ID") + confluentConnection.Principal = os.Getenv("CONFLUENT_PRINCIPAL") + confluentConnection.SQLCurrentCatalog = os.Getenv("CONFLUENT_SQL_CURRENT_CATALOG") + confluentConnection.SQLCurrentDatabase = os.Getenv("CONFLUENT_SQL_CURRENT_DATABASE") + + // Healthcheck + http.HandleFunc("/actuator/health", func(w http.ResponseWriter, r *http.Request) { + response := map[string]string{"status": "UP"} + jsonResponse, err := json.Marshal(response) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + w.Write(jsonResponse) + }) + + go func() { + if err := http.ListenAndServe(":80", nil); err != nil { + panic(err) + } + }() + + fmt.Println("Health-check started") + + // Create Kafka consumer configuration + fmt.Println("Creating Kafka consumer for topic: ", topicName) + + c, err := kafka.NewConsumer(&kafka.ConfigMap{ + "bootstrap.servers": kafkaURL, + "group.id": groupID, + "auto.offset.reset": "earliest", + "security.protocol": "SASL_SSL", + "sasl.mechanisms": "PLAIN", + "sasl.username": authUsername, + "sasl.password": authPassword, + }) + if err != nil { + fmt.Printf("Error creating consumer: %v\n", err) + return + } + c.SubscribeTopics([]string{topicName}, nil) + // Channel for signals + signals := make(chan os.Signal, 1) + done := make(chan bool, 1) + + signal.Notify(signals, os.Interrupt, syscall.SIGTERM) + + go func() { + for { + select { + case sig := <-signals: + // If an interrupt signal is received, break the loop + fmt.Printf("Caught signal %v: terminating\n", sig) + done <- true + return + default: + msg, err := c.ReadMessage(-1) + if err == nil { + var flinkOutput FlinkOutput + if err := json.Unmarshal(msg.Value, &flinkOutput); err != nil { + fmt.Printf("Error unmarshalling message: %v\n", err) + continue + } else { + fmt.Printf("Received message: %+v\n", flinkOutput) + + flinkGatewayURL := os.Getenv("FLINK_GATEWAY_URL") + confluentGatewayURL := os.Getenv("CONFLUENT_GATEWAY_URL") + + var result FlinkResult + var headerConfluent []string + var resultConfluent string + + if flinkProvider == "flink" { + fmt.Println("Flink gateway: ", flinkGatewayURL) + result, err = getFlinkResult(flinkGatewayURL, flinkOutput.SessionID) + headerConfluent = []string{} + } else { + fmt.Println("Flink gateway: ", confluentGatewayURL) + fmt.Println("Waiting 20 seconds...") + time.Sleep(20 * time.Second) + headerConfluent, resultConfluent, err = getFlinkResultConfluent(confluentGatewayURL, flinkOutput.SessionID, confluentConnection) + } + if err != nil { + fmt.Println("Unable to get Flink result:", err) + sendMessage("Error: "+err.Error(), flinkOutput.ConversationID, systemToken, msgDebug) + return + } + if flinkProvider == "flink" { + sendMessage("Result retrieved from Flink: "+fmt.Sprintf("%#v", result), flinkOutput.ConversationID, systemToken, msgDebug) + sendMessage("Now converting the result to Markdown", flinkOutput.ConversationID, systemToken, msgDebug) + response, err := convertResultToMarkdown(result) + if err != nil { + fmt.Println("Unable to generate Markdown from result:", err) + sendMessage("Error: "+err.Error(), flinkOutput.ConversationID, systemToken, msgDebug) + sendMessage("I'm sorry, I am unable to fetch the results from the Flink table.", flinkOutput.ConversationID, systemToken, msgNormal) + return + } + sendMessage(response, flinkOutput.ConversationID, systemToken, msgNormal) + } else { + sendMessage("Result retrieved from Flink: "+fmt.Sprintf("%#v", resultConfluent), flinkOutput.ConversationID, systemToken, msgDebug) + sendMessage("Now converting the result to Markdown", flinkOutput.ConversationID, systemToken, msgDebug) + response, err := convertConfluentResultToMarkdown(headerConfluent, resultConfluent) + if err != nil { + fmt.Println("Unable to generate Markdown from result:", err) + sendMessage("Error: "+err.Error(), flinkOutput.ConversationID, systemToken, msgDebug) + sendMessage("I'm sorry, I am unable to fetch the results from the Flink table.", flinkOutput.ConversationID, systemToken, msgNormal) + return + } + sendMessage(response, flinkOutput.ConversationID, systemToken, msgNormal) + } + } + } else { + fmt.Printf("Consumer error: %v\n", err) + } + } + } + }() + <-done + c.Close() + fmt.Println("Consumer closed") +} diff --git a/backend/components/flink-connector/src/statements-executor.go b/backend/components/flink-connector/src/statements-executor.go new file mode 100644 index 000000000..ebe12159e --- /dev/null +++ b/backend/components/flink-connector/src/statements-executor.go @@ -0,0 +1,140 @@ +package main + +import ( + "encoding/json" + "fmt" + "net/http" + "os" + "os/signal" + "syscall" + + "github.com/confluentinc/confluent-kafka-go/v2/kafka" +) + +func main() { + + kafkaURL := os.Getenv("KAFKA_BROKERS") + schemaRegistryURL := os.Getenv("KAFKA_SCHEMA_REGISTRY_URL") + topicName := os.Getenv("KAFKA_TOPIC_NAME") + systemToken := os.Getenv("systemToken") + authUsername := os.Getenv("AUTH_JAAS_USERNAME") + authPassword := os.Getenv("AUTH_JAAS_PASSWORD") + flinkProvider := os.Getenv("flnkProvider") + groupID := "statement-executor-" + msgNormal := false + msgDebug := true + + if kafkaURL == "" || schemaRegistryURL == "" || topicName == "" { + fmt.Println("KAFKA_BROKERS, KAFKA_SCHEMA_REGISTRY_URL, and KAFKA_TOPIC_NAME environment variables must be set") + return + } + + var confluentConnection ConfluentConnection + confluentConnection.Token = os.Getenv("CONFLUENT_TOKEN") + confluentConnection.ComputePoolID = os.Getenv("CONFLUENT_COMPUTE_POOL_ID") + confluentConnection.Principal = os.Getenv("CONFLUENT_PRINCIPAL") + confluentConnection.SQLCurrentCatalog = os.Getenv("CONFLUENT_SQL_CURRENT_CATALOG") + confluentConnection.SQLCurrentDatabase = os.Getenv("CONFLUENT_SQL_CURRENT_DATABASE") + + // Healthcheck + http.HandleFunc("/actuator/health", func(w http.ResponseWriter, r *http.Request) { + response := map[string]string{"status": "UP"} + jsonResponse, err := json.Marshal(response) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + w.Write(jsonResponse) + }) + + go func() { + if err := http.ListenAndServe(":80", nil); err != nil { + panic(err) + } + }() + + fmt.Println("Health-check started") + + // Create Kafka consumer configuration + fmt.Println("Creating Kafka consumer for topic: ", topicName) + + c, err := kafka.NewConsumer(&kafka.ConfigMap{ + "bootstrap.servers": kafkaURL, + "group.id": groupID, + "auto.offset.reset": "earliest", + "security.protocol": "SASL_SSL", + "sasl.mechanisms": "PLAIN", + "sasl.username": authUsername, + "sasl.password": authPassword, + }) + if err != nil { + fmt.Printf("Error creating consumer: %v\n", err) + return + } + c.SubscribeTopics([]string{topicName}, nil) + // Channel for signals + signals := make(chan os.Signal, 1) + done := make(chan bool, 1) + + signal.Notify(signals, os.Interrupt, syscall.SIGTERM) + + go func() { + for { + select { + case sig := <-signals: + fmt.Printf("Caught signal %v: terminating\n", sig) + done <- true + return + default: + msg, err := c.ReadMessage(-1) + if err == nil { + var statementSet FlinkStatementSet + if err := json.Unmarshal(msg.Value, &statementSet); err != nil { + fmt.Printf("Error unmarshalling message: %v\n", err) + continue + } else { + fmt.Printf("Received message: %+v\n", statementSet) + + flinkGatewayURL := os.Getenv("FLINK_GATEWAY_URL") + confluentGatewayURL := os.Getenv("CONFLUENT_GATEWAY_URL") + var sessionID string + if flinkProvider == "flink" { + sessionID, err = sendFlinkSQL(flinkGatewayURL, statementSet) + } else { + sessionID, err = sendFlinkSQLConfluent(confluentGatewayURL, statementSet, confluentConnection) + } + + if err != nil { + fmt.Println("Error running Flink statement:", err) + sendMessage("Error: "+err.Error(), statementSet.ConversationID, systemToken, msgDebug) + sendMessage("I am sorry, I am unable to answer that question.", statementSet.ConversationID, systemToken, msgNormal) + return + } + fmt.Println("Successfully executed the Flink statement.") + sendMessage("FlinkSessionID: "+sessionID, statementSet.ConversationID, systemToken, msgDebug) + var flinkOutput FlinkOutput + flinkOutput.SessionID = sessionID + flinkOutput.Question = statementSet.Question + flinkOutput.MessageID = statementSet.MessageID + flinkOutput.ConversationID = statementSet.ConversationID + err = produceFlinkOutput(flinkOutput, kafkaURL, "flink-producer-"+groupID, authUsername, authPassword) + if err != nil { + + fmt.Printf("error producing message to Kafka: %v\n", err) + sendMessage("Error: "+err.Error(), statementSet.ConversationID, systemToken, msgDebug) + sendMessage("I am sorry, I am unable to answer that question.", statementSet.ConversationID, systemToken, msgNormal) + } + sendMessage("Message produced to topic: flink.outputs", statementSet.ConversationID, systemToken, msgDebug) + } + } else { + fmt.Printf("Consumer error: %v\n", err) + } + } + } + }() + <-done + c.Close() + fmt.Println("Consumer closed") +} diff --git a/backend/components/flink-connector/src/tools.go b/backend/components/flink-connector/src/tools.go new file mode 100644 index 000000000..be68a6368 --- /dev/null +++ b/backend/components/flink-connector/src/tools.go @@ -0,0 +1,503 @@ +package main + +import ( + "bytes" + "encoding/json" + "errors" + "fmt" + "io" + "io/ioutil" + "net/http" + "os" + "strings" + "time" + + "github.com/confluentinc/confluent-kafka-go/v2/kafka" +) + +func sendFlinkSQL(url string, statementSet FlinkStatementSet) (string, error) { + timestamp := time.Now().Unix() + strTimestamp := fmt.Sprintf("%d", timestamp) + replacements := map[string]string{ + "{PROPERTIES_GROUP_ID}": "flink-" + strTimestamp, + "{PROPERTIES_BOOTSTRAP_SERVERS}": os.Getenv("KAFKA_BROKERS"), + "{PROPERTIES_SASL_JAAS_CONFIG}": fmt.Sprintf("org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";", os.Getenv("AUTH_JAAS_USERNAME"), os.Getenv("AUTH_JAAS_PASSWORD")), + } + for i, stmt := range statementSet.Statements { + for placeholder, value := range replacements { + stmt = strings.Replace(stmt, placeholder, value, -1) + } + statementSet.Statements[i] = stmt + } + fmt.Println("Updated StatementSet: %+v\n", statementSet.Statements) + + req, err := http.NewRequest("POST", url+"/v1/sessions/", bytes.NewReader([]byte(""))) + if err != nil { + return "", err + } + req.Header.Set("Content-Type", "application/json") + + client := &http.Client{} + resp, err := client.Do(req) + if err != nil { + return "", err + } + body, err := io.ReadAll(resp.Body) + if err != nil { + fmt.Println("Error reading response body from the API: %v", err) + } + fmt.Println("Response: ", string(body)) + var sessionResponse FlinkSessionResponse + if err := json.Unmarshal(body, &sessionResponse); err != nil { + fmt.Printf("Error unmarshaling message: %v\n", err) + return "", err + } + defer resp.Body.Close() + + fmt.Println("The Flink session is: ", sessionResponse.SessionHandle) + for _, statement := range statementSet.Statements { + payload := FlinkSQLRequest{ + Statement: statement, + } + payloadBytes, err := json.Marshal(payload) + if err != nil { + return "", err + } + + req, err = http.NewRequest("POST", url+"/v1/sessions/"+sessionResponse.SessionHandle+"/statements/", bytes.NewReader(payloadBytes)) + if err != nil { + return "", err + } + req.Header.Set("Content-Type", "application/json") + + client = &http.Client{} + resp, err = client.Do(req) + if err != nil { + return "", err + } + body, err = io.ReadAll(resp.Body) + if err != nil { + fmt.Println("Error reading response body from the API: %v", err) + } + fmt.Println("Statement submitted. Response: ", string(body)) + var statementResponse FlinkStatementResponse + if err := json.Unmarshal(body, &statementResponse); err != nil { + fmt.Printf("Error unmarshaling message: %v\n", err) + return "", err + } + fmt.Printf("Check status on: %s/v1/sessions/%s/operations/%s/result/0\n", url, sessionResponse.SessionHandle, statementResponse.OperationHandle) + defer resp.Body.Close() + } + + return sessionResponse.SessionHandle, nil +} + +func produceFlinkOutput(flinkOutput FlinkOutput, kafkaURL, groupID, authUsername, authPassword string) error { + + kafkaTopic := "flink.outputs" + + flinkOutputJSON, err := json.Marshal(flinkOutput) + if err != nil { + return fmt.Errorf("error marshaling query to JSON: %w", err) + } + + configMap := kafka.ConfigMap{ + "bootstrap.servers": kafkaURL, + } + if authUsername != "" && authPassword != "" { + configMap.SetKey("security.protocol", "SASL_SSL") + configMap.SetKey("sasl.mechanisms", "PLAIN") + configMap.SetKey("sasl.username", authUsername) + configMap.SetKey("sasl.password", authPassword) + } + + producer, err := kafka.NewProducer(&configMap) + if err != nil { + return fmt.Errorf("failed to create producer: %w", err) + } + defer producer.Close() + + // Produce the message + message := kafka.Message{ + TopicPartition: kafka.TopicPartition{Topic: &kafkaTopic, Partition: kafka.PartitionAny}, + Key: []byte(flinkOutput.SessionID), + Value: flinkOutputJSON, + } + + err = producer.Produce(&message, nil) + if err != nil { + return fmt.Errorf("failed to produce message: %w", err) + } + fmt.Println("message scheduled for production") + producer.Flush(15 * 1000) + fmt.Println("message flushed") + return nil +} + +func sendMessage(message string, conversationId string, systemToken string, debug bool) (int, string, error) { + messageContent := messageContent{ + Text: message, + Debug: debug, + } + messageToSend := ApplicationCommunicationSendMessage{ + ConversationID: conversationId, + Message: messageContent, + } + messageJSON, err := json.Marshal(messageToSend) + if err != nil { + fmt.Printf("Error encoding response to JSON: %v\n", err) + return 0, "", errors.New("The message could not be encoded to JSON for sending.") + } + + req, err := http.NewRequest("POST", "http://api-communication/messages.send", bytes.NewReader(messageJSON)) + if err != nil { + fmt.Printf("Error creating request: %v\n", err) + return 0, "", errors.New("The message could not be sent.") + } + req.Header.Add("Authorization", "Bearer "+systemToken) + req.Header.Add("Content-Type", "application/json") + + client := &http.Client{} + resp, err := client.Do(req) + if err != nil { + fmt.Printf("Error sending POST request: %v\n", err) + return 0, "", errors.New("Error sending POST request.") + } + defer resp.Body.Close() + + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + fmt.Println("Error reading response body:", err) + return 0, "", errors.New("Error reading response body.") + } + + var response SendMessageResponse + err = json.Unmarshal(body, &response) + if err != nil { + fmt.Println("Error unmarshaling response:", err) + return 0, "", errors.New("Response couldn't be unmarshaled.") + } + + fmt.Printf("Message sent with status code: %d\n", resp.StatusCode) + return resp.StatusCode, response.ID, nil +} + +func sendFlinkSQLConfluent(url string, statementSet FlinkStatementSet, connection ConfluentConnection) (string, error) { + timestamp := time.Now().Unix() + strTimestamp := fmt.Sprintf("%d", timestamp) + statementName := "airy-" + strTimestamp + payload := ConfluentFlink{ + Name: statementName, + Spec: ConfluentFlinkSpec{ + Statement: statementSet.Statements[0], + ComputePoolID: connection.ComputePoolID, + Principal: connection.Principal, + Properties: FlinkSpecProperties{ + SQLCurrentCatalog: connection.SQLCurrentCatalog, + SQLCurrentDatabase: connection.SQLCurrentDatabase, + }, + Stopped: false, + }, + } + payloadBytes, err := json.Marshal(payload) + if err != nil { + return "", err + } + + req, err := http.NewRequest("POST", url, bytes.NewReader(payloadBytes)) + if err != nil { + return "", err + } + req.Header.Set("Content-Type", "application/json") + req.Header.Add("Authorization", "Basic "+connection.Token) + + client := &http.Client{} + resp, err := client.Do(req) + if err != nil { + return "", err + } + body, err := io.ReadAll(resp.Body) + if err != nil { + fmt.Println("Error reading response body from the API: %v", err) + } + fmt.Println("Statement submitted. Response: ", string(body)) + var statementResponse ConfluentFlinkStatementResponse + if err := json.Unmarshal(body, &statementResponse); err != nil { + fmt.Printf("Error unmarshaling message: %v\n", err) + return "", err + } + fmt.Printf("Check status on: %s/%s\n", url, statementName) + defer resp.Body.Close() + + return statementName, nil +} + +func getFlinkResult(url, sessionID string) (FlinkResult, error) { + fmt.Println("The Flink session is: ", sessionID) + payload := FlinkSQLRequest{ + Statement: "select * from output;", + } + payloadBytes, err := json.Marshal(payload) + if err != nil { + return FlinkResult{}, err + } + + req, err := http.NewRequest("POST", url+"/v1/sessions/"+sessionID+"/statements/", bytes.NewReader(payloadBytes)) + if err != nil { + return FlinkResult{}, err + } + req.Header.Set("Content-Type", "application/json") + + client := &http.Client{} + resp, err := client.Do(req) + if err != nil { + return FlinkResult{}, err + } + body, err := io.ReadAll(resp.Body) + if err != nil { + fmt.Println("Error reading response body from the API: %v", err) + } + fmt.Println("Statement submitted. Response: ", string(body)) + var statementResponse FlinkStatementResponse + if err := json.Unmarshal(body, &statementResponse); err != nil { + fmt.Printf("Error unmarshaling message: %v\n", err) + return FlinkResult{}, err + } + + fmt.Printf("Fetching result from: %s/v1/sessions/%s/operations/%s/result/0\n", url, sessionID, statementResponse.OperationHandle) + time.Sleep(20 * time.Second) + req, err = http.NewRequest("GET", url+"/v1/sessions/"+sessionID+"/operations/"+statementResponse.OperationHandle+"/result/0", nil) + if err != nil { + return FlinkResult{}, err + } + req.Header.Set("Content-Type", "application/json") + + client = &http.Client{} + resp, err = client.Do(req) + if err != nil { + return FlinkResult{}, err + } + body, err = io.ReadAll(resp.Body) + if err != nil { + fmt.Println("Error reading response body from the API: %v", err) + } + fmt.Println("Statement submitted. Response: ", string(body)) + var flinkResultResponse FlinkResultResponse + if err := json.Unmarshal(body, &flinkResultResponse); err != nil { + fmt.Printf("Error unmarshaling message: %v\n", err) + return FlinkResult{}, err + } + defer resp.Body.Close() + + if flinkResultResponse.Errors != nil { + statementError := errors.New(strings.Join(flinkResultResponse.Errors, ",")) + return FlinkResult{}, statementError + } + return flinkResultResponse.Results, nil +} + +func markdown(message string) (string, error) { + return message, nil +} + +func convertResultToMarkdown(result FlinkResult) (string, error) { + var builder strings.Builder + + if len(result.Columns) == 0 { + return "", errors.New("No columns found for generating the Markdown table.") + } + for _, col := range result.Columns { + builder.WriteString("| " + col.Name + " ") + } + builder.WriteString("|\n") + + for range result.Columns { + builder.WriteString("|---") + } + builder.WriteString("|\n") + + for _, d := range result.Data { + for _, field := range d.Fields { + builder.WriteString(fmt.Sprintf("| %v ", field)) + } + builder.WriteString("|\n") + } + + return builder.String(), nil +} + +func getFlinkResultConfluent(url, sessionID string, connection ConfluentConnection) ([]string, string, error) { + req, err := http.NewRequest("GET", url+"/"+sessionID, bytes.NewReader([]byte(""))) + if err != nil { + return []string{}, "", err + } + req.Header.Set("Content-Type", "application/json") + req.Header.Add("Authorization", "Basic "+connection.Token) + + client := &http.Client{} + resp, err := client.Do(req) + if err != nil { + return []string{}, "", err + } + body, err := io.ReadAll(resp.Body) + if err != nil { + fmt.Println("Error reading response body from the API: %v", err) + } + fmt.Println("Statement submitted. Response: ", string(body)) + var statementResponse ConfluentFlinkStatementResponse + if err := json.Unmarshal(body, &statementResponse); err != nil { + fmt.Printf("Error unmarshaling message: %v\n", err) + return []string{}, "", err + } + fmt.Printf("Received result for statement: %s\n", sessionID) + fmt.Println("Phase: ", statementResponse.Status.Phase, " Detail: ", statementResponse.Status.Detail) + defer resp.Body.Close() + + if statementResponse.Status.Phase == "RUNNING" || statementResponse.Status.Phase == "COMPLETED" { + columns, err := getColumnNames(statementResponse.Status.ResultSchema) + if err != nil { + fmt.Println("Extracting of the column names failed.") + return []string{}, "", err + } + req, err := http.NewRequest("GET", url+"/"+sessionID+"/results", bytes.NewReader([]byte(""))) + if err != nil { + return []string{}, "", err + } + req.Header.Set("Content-Type", "application/json") + req.Header.Add("Authorization", "Basic "+connection.Token) + + client := &http.Client{} + resp, err := client.Do(req) + if err != nil { + return []string{}, "", err + } + body, err := io.ReadAll(resp.Body) + if err != nil { + fmt.Println("Error reading response body from the API: %v", err) + } + fmt.Println("Statement submitted. Response: ", string(body)) + var result ConfluentFlinkResultsResponse + if err := json.Unmarshal(body, &result); err != nil { + fmt.Printf("Error unmarshaling message: %v\n", err) + return []string{}, "", err + } + nextResult := result.Metadata.Next + fmt.Println("Next result: ", nextResult) + fmt.Println("Result: ", result.Results.Data) + data, err := dataToString(result.Results.Data) + if data != "" { + return columns, data, nil + } else { + req, err := http.NewRequest("GET", nextResult, bytes.NewReader([]byte(""))) + if err != nil { + return []string{}, "", err + } + req.Header.Set("Content-Type", "application/json") + req.Header.Add("Authorization", "Basic "+connection.Token) + + client := &http.Client{} + resp, err := client.Do(req) + if err != nil { + return []string{}, "", err + } + body, err := io.ReadAll(resp.Body) + if err != nil { + fmt.Println("Error reading response body from the API: %v", err) + } + fmt.Println("Statement submitted. Response: ", string(body)) + var result ConfluentFlinkResultsResponse + if err := json.Unmarshal(body, &result); err != nil { + fmt.Printf("Error unmarshaling message: %v\n", err) + return []string{}, "", err + } + data, err := dataToString(result.Results.Data) + return columns, data, err + } + } else { + err := errors.New("Flink statement failed. Status: " + statementResponse.Status.Phase) + return []string{}, "", err + } +} + +func dataToString(data interface{}) (string, error) { + if slice, ok := data.([]interface{}); ok && len(slice) > 0 { + dataBytes, err := json.Marshal(data) + if err != nil { + return "", err + } + return string(dataBytes), nil + } + return "", nil +} + +func convertConfluentResultToMarkdown(headerNames []string, jsonStr string) (string, error) { + var dataRows []ConfluentDataRow + err := json.Unmarshal([]byte(jsonStr), &dataRows) + if err != nil { + return "", err + } + + var sb strings.Builder + + header := generateMarkdownHeader(headerNames) + sb.WriteString(header) + sb.WriteString("\n") + + separator := strings.Repeat("| --- ", strings.Count(header, "|")-1) + "|" + sb.WriteString(separator) + sb.WriteString("\n") + + for _, dataRow := range dataRows { + sb.WriteString("|") + for _, cell := range dataRow.Row { + sb.WriteString(" ") + sb.WriteString(cell) + sb.WriteString(" |") + } + sb.WriteString("\n") + } + + return sb.String(), nil +} + +func extractColumnNames(jsonStr string) ([]string, error) { + var schema ConfluentResultSchema + err := json.Unmarshal([]byte(jsonStr), &schema) + if err != nil { + return nil, err + } + + var columnNames []string + for _, column := range schema.Columns { + columnNames = append(columnNames, column.Name) + } + + return columnNames, nil +} + +func generateMarkdownHeader(columnNames []string) string { + var header string + + for _, name := range columnNames { + header += "| " + name + " " + } + header += "|" + + return header +} + +func ResultsToString(rs ConfluentResultSchema) string { + var columnNames []string + for _, column := range rs.Columns { + columnNames = append(columnNames, column.Name) + } + return strings.Join(columnNames, ", ") +} + +func getColumnNames(schema ConfluentResultSchema) ([]string, error) { + var columnNames []string + for _, column := range schema.Columns { + columnNames = append(columnNames, column.Name) + } + return columnNames, nil +} diff --git a/backend/components/flink-connector/src/types.go b/backend/components/flink-connector/src/types.go new file mode 100644 index 000000000..c67ee3944 --- /dev/null +++ b/backend/components/flink-connector/src/types.go @@ -0,0 +1,137 @@ +package main + +type ApplicationCommunicationSendMessage struct { + ConversationID string `json:"conversation_id"` + Message messageContent `json:"message"` + Metadata map[string]string `json:"metadata"` +} + +type messageContent struct { + Text string `json:"text"` + Debug bool `json:"debug"` +} + +type SendMessageResponse struct { + ID string `json:"id"` + State string `json:"state"` +} + +type FlinkOutput struct { + SessionID string `json:"session_id"` + Question string `json:"question"` + MessageID string `json:"message_id"` + ConversationID string `json:"conversation_id"` +} + +type FlinkSQLRequest struct { + Statement string `json:"statement"` +} + +type FlinkSessionResponse struct { + SessionHandle string `json:"sessionHandle"` +} + +type FlinkStatementResponse struct { + OperationHandle string `json:"operationHandle"` +} + +type Column struct { + Name string `json:"name"` + LogicalType struct { + Type string `json:"type"` + Nullable bool `json:"nullable"` + Length int `json:"length,omitempty"` + } `json:"logicalType"` + Comment interface{} `json:"comment"` +} + +type Data struct { + Kind string `json:"kind"` + Fields []interface{} `json:"fields"` +} + +type FlinkResult struct { + Columns []Column `json:"columns"` + RowFormat string `json:"rowFormat"` + Data []Data `json:"data"` +} + +type FlinkResultResponse struct { + ResultType string `json:"resultType"` + IsQueryResult bool `json:"isQueryResult"` + JobID string `json:"jobID"` + ResultKind string `json:"resultKind"` + Results FlinkResult `json:"results"` + NextResultUri string `json:"nextResultUri"` + Errors []string `json:"errors"` +} + +type ConfluentFlink struct { + Name string `json:"name"` + Spec ConfluentFlinkSpec `json:"spec"` +} + +type ConfluentFlinkSpec struct { + Statement string `json:"statement"` + ComputePoolID string `json:"compute_pool_id"` + Principal string `json:"principal"` + Properties FlinkSpecProperties `json:"properties"` + Stopped bool `json:"stopped"` +} + +type FlinkSpecProperties struct { + SQLCurrentCatalog string `json:"sql.current-catalog"` + SQLCurrentDatabase string `json:"sql.current-database"` +} + +type ConfluentFlinkStatementResponse struct { + Name string `json:"name"` + Status ConfluentFlinkStatementStatus `json:"status"` +} + +type ConfluentFlinkStatementStatus struct { + Detail string `json:"detail"` + Phase string `json:"phase"` + ResultSchema ConfluentResultSchema `json:"result_schema"` +} + +type ConfluentResultSchema struct { + Columns []struct { + Name string `json:"name"` + } `json:"columns"` +} + +type ConfluentFlinkResultsResponse struct { + Metadata ResultResponseMetadata `json:"metadata"` + Results ResultResponseResults `json:"results"` +} + +type ResultResponseMetadata struct { + CreatedAt string `json:"created_at"` + Next string `json:"next"` + Self string `json:"self"` +} + +type ResultResponseResults struct { + Data interface{} `json:"data"` +} + +type ConfluentDataRow struct { + Op int `json:"op"` + Row []string `json:"row"` +} + +type FlinkStatementSet struct { + Statements []string `json:"statements"` + Question string `json:"question"` + MessageID string `json:"message_id"` + ConversationID string `json:"conversation_id"` +} + +type ConfluentConnection struct { + Token string + ComputePoolID string + Principal string + SQLCurrentCatalog string + SQLCurrentDatabase string +}