Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#4139] Add Flink connector #4147

Merged
merged 4 commits into from
Apr 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions backend/components/flink-connector/Dockerfile.result-sender
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
FROM golang:1.17

WORKDIR /app

COPY ./src/types.go ./src/tools.go ./src/result-sender.go ./

RUN go mod init main && \
go get github.com/confluentinc/confluent-kafka-go/v2/kafka && \
go get github.com/confluentinc/confluent-kafka-go/v2/schemaregistry && \
go get github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/serde && \
go get github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/serde/avro && \
go get golang.org/x/net

RUN go build -o app

CMD ["./app"]
16 changes: 16 additions & 0 deletions backend/components/flink-connector/Dockerfile.statements-executor
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
FROM golang:1.17

WORKDIR /app

COPY ./src/types.go ./src/tools.go ./src/statements-executor.go ./

RUN go mod init main && \
go get github.com/confluentinc/confluent-kafka-go/v2/kafka && \
go get github.com/confluentinc/confluent-kafka-go/v2/schemaregistry && \
go get github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/serde && \
go get github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/serde/avro && \
go get golang.org/x/net

RUN go build -o app

CMD ["./app"]
13 changes: 13 additions & 0 deletions backend/components/flink-connector/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
build-statements-executor:
docker build -t flink-connector/statements-executor -f Dockerfile.statements-executor .

release-statements-executor: build-statements-executor
docker tag flink-connector/statements-executor ghcr.io/airyhq/connectors/flink/statements-executor:release
docker push ghcr.io/airyhq/connectors/flink/statements-executor:release

build-result-sender:
docker build -t flink-connector/result-sender -f Dockerfile.result-sender .

release-result-sender: build-result-sender
docker tag flink-connector/result-sender ghcr.io/airyhq/connectors/flink/result-sender:release
docker push ghcr.io/airyhq/connectors/flink/result-sender:release
3 changes: 3 additions & 0 deletions backend/components/flink-connector/helm/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
load("//tools/build:helm.bzl", "helm_ruleset_core_version")

helm_ruleset_core_version()
6 changes: 6 additions & 0 deletions backend/components/flink-connector/helm/Chart.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@

apiVersion: v2
appVersion: "1.0"
description: Flink connector
name: flink-connector
version: 1.0
10 changes: 10 additions & 0 deletions backend/components/flink-connector/helm/templates/configmap.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
apiVersion: v1
kind: ConfigMap
metadata:
name: {{ .Values.component }}
labels:
core.airy.co/managed: "true"
core.airy.co/mandatory: "{{ .Values.mandatory }}"
core.airy.co/component: "{{ .Values.component }}"
annotations:
core.airy.co/enabled: "{{ .Values.enabled }}"
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: {{ .Values.component }}-{{ .Values.resultSender.name }}
labels:
app: {{ .Values.component }}
core.airy.co/managed: "true"
core.airy.co/mandatory: "{{ .Values.mandatory }}"
core.airy.co/component: {{ .Values.component }}
spec:
replicas: {{ if .Values.enabled }} 1 {{ else }} 0 {{ end }}
selector:
matchLabels:
app: {{ .Values.component }}-{{ .Values.resultSender.name }}
strategy:
rollingUpdate:
maxSurge: 1
maxUnavailable: 1
type: RollingUpdate
template:
metadata:
labels:
app: {{ .Values.component }}-{{ .Values.resultSender.name }}
spec:
containers:
- name: app
image: "ghcr.io/airyhq/{{ .Values.resultSender.image }}:release"
imagePullPolicy: Always
envFrom:
- configMapRef:
name: security
- configMapRef:
name: kafka-config
- configMapRef:
name: {{ .Values.component }}
env:
- name: KAFKA_TOPIC_NAME
value: {{ .Values.resultSender.topic }}
- name: API_COMMUNICATION_URL
value: {{ .Values.apiCommunicationUrl }}
livenessProbe:
httpGet:
path: /actuator/health
port: {{ .Values.port }}
httpHeaders:
- name: Health-Check
value: health-check
initialDelaySeconds: 43200
periodSeconds: 10
failureThreshold: 3
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
apiVersion: v1
kind: Service
metadata:
name: {{ .Values.component }}-{{ .Values.resultSender.name }}
labels:
app: {{ .Values.component }}-{{ .Values.resultSender.name }}
spec:
type: ClusterIP
clusterIP: None
ports:
- name: {{ .Values.component }}-{{ .Values.resultSender.name }}
port: 80
targetPort: {{ .Values.port }}
selector:
app: {{ .Values.component }}-{{ .Values.resultSender.name }}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: {{ .Values.component }}-{{ .Values.executor.name }}
labels:
app: {{ .Values.component }}
core.airy.co/managed: "true"
core.airy.co/mandatory: "{{ .Values.mandatory }}"
core.airy.co/component: {{ .Values.component }}
spec:
replicas: {{ if .Values.enabled }} 1 {{ else }} 0 {{ end }}
selector:
matchLabels:
app: {{ .Values.component }}-{{ .Values.executor.name }}
strategy:
rollingUpdate:
maxSurge: 1
maxUnavailable: 1
type: RollingUpdate
template:
metadata:
labels:
app: {{ .Values.component }}-{{ .Values.executor.name }}
spec:
containers:
- name: app
image: "ghcr.io/airyhq/{{ .Values.executor.image }}:release"
imagePullPolicy: Always
envFrom:
- configMapRef:
name: security
- configMapRef:
name: kafka-config
- configMapRef:
name: {{ .Values.component }}
env:
- name: KAFKA_TOPIC_NAME
value: {{ .Values.executor.topic }}
- name: FLINK_GATEWAY_URL
value: {{ .Values.gatewayUrl }}
livenessProbe:
httpGet:
path: /actuator/health
port: {{ .Values.port }}
httpHeaders:
- name: Health-Check
value: health-check
initialDelaySeconds: 43200
periodSeconds: 10
failureThreshold: 3
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@

apiVersion: v1
kind: Service
metadata:
name: {{ .Values.component }}-{{ .Values.executor.name }}
labels:
app: {{ .Values.component }}-{{ .Values.executor.name }}
spec:
type: ClusterIP
clusterIP: None
ports:
- name: {{ .Values.component }}-{{ .Values.executor.name }}
port: 80
targetPort: {{ .Values.port }}
selector:
app: {{ .Values.component }}-{{ .Values.executor.name }}
16 changes: 16 additions & 0 deletions backend/components/flink-connector/helm/values.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@

component: flink-connector
mandatory: false
enabled: false
port: 8080
resources:
gatewayUrl: "http://flink-jobmanager:8083"
apiCommunicationUrl: "http://api-communication/messages.send"
executor:
name: statements-executor
image: connectors/flink/statements-executor
topic: flink.statements
resultSender:
name: result-sender
image: connectors/flink/result-sender
topic: flink.output
158 changes: 158 additions & 0 deletions backend/components/flink-connector/src/result-sender.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
package main

import (
"encoding/json"
"fmt"
"net/http"
"os"
"os/signal"
"syscall"
"time"

"github.com/confluentinc/confluent-kafka-go/v2/kafka"
)

func main() {

// Create Kafka consumer to read the statements
kafkaURL := os.Getenv("KAFKA_BROKERS")
schemaRegistryURL := os.Getenv("KAFKA_SCHEMA_REGISTRY_URL")
topicName := os.Getenv("KAFKA_TOPIC_NAME")
systemToken := os.Getenv("systemToken")
authUsername := os.Getenv("AUTH_JAAS_USERNAME")
authPassword := os.Getenv("AUTH_JAAS_PASSWORD")
flinkProvider := os.Getenv("provider")
groupID := "result-sender"
msgNormal := false
msgDebug := true

if kafkaURL == "" || schemaRegistryURL == "" || topicName == "" {
fmt.Println("KAFKA_BROKERS, KAFKA_SCHEMA_REGISTRY_URL, and KAFKA_TOPIC_NAME environment variables must be set")
return
}

var confluentConnection ConfluentConnection
confluentConnection.Token = os.Getenv("confluentToken")
confluentConnection.ComputePoolID = os.Getenv("confluentComputePoolID")
confluentConnection.Principal = os.Getenv("confluentPrincipal")
confluentConnection.SQLCurrentCatalog = os.Getenv("confluentSQLCurrentCatalog")
confluentConnection.SQLCurrentDatabase = os.Getenv("confluentSQLCurrentDatabase")

// Healthcheck
http.HandleFunc("/actuator/health", func(w http.ResponseWriter, r *http.Request) {
response := map[string]string{"status": "UP"}
jsonResponse, err := json.Marshal(response)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write(jsonResponse)
})

go func() {
if err := http.ListenAndServe(":80", nil); err != nil {
panic(err)
}
}()

fmt.Println("Health-check started")

// Create Kafka consumer configuration
fmt.Println("Creating Kafka consumer for topic: ", topicName)

c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": kafkaURL,
"group.id": groupID,
"auto.offset.reset": "earliest",
"security.protocol": "SASL_SSL",
"sasl.mechanisms": "PLAIN",
"sasl.username": authUsername,
"sasl.password": authPassword,
})
if err != nil {
fmt.Printf("Error creating consumer: %v\n", err)
return
}
c.SubscribeTopics([]string{topicName}, nil)
// Channel for signals
signals := make(chan os.Signal, 1)
done := make(chan bool, 1)

signal.Notify(signals, os.Interrupt, syscall.SIGTERM)

go func() {
for {
select {
case sig := <-signals:
// If an interrupt signal is received, break the loop
fmt.Printf("Caught signal %v: terminating\n", sig)
done <- true
return
default:
msg, err := c.ReadMessage(-1)
if err == nil {
var flinkOutput FlinkOutput
if err := json.Unmarshal(msg.Value, &flinkOutput); err != nil {
fmt.Printf("Error unmarshalling message: %v\n", err)
continue
} else {
fmt.Printf("Received message: %+v\n", flinkOutput)

flinkGatewayURL := os.Getenv("FLINK_GATEWAY_URL")
confluentGatewayURL := os.Getenv("CONFLUENT_GATEWAY_URL")

var result FlinkResult
var headerConfluent []string
var resultConfluent string

if flinkProvider == "flink" {
fmt.Println("Flink gateway: ", flinkGatewayURL)
result, err = getFlinkResult(flinkGatewayURL, flinkOutput.SessionID)
headerConfluent = []string{}
} else {
fmt.Println("Flink gateway: ", confluentGatewayURL)
fmt.Println("Waiting 20 seconds...")
time.Sleep(20 * time.Second)
headerConfluent, resultConfluent, err = getFlinkResultConfluent(confluentGatewayURL, flinkOutput.SessionID, confluentConnection)
}
if err != nil {
fmt.Println("Unable to get Flink result:", err)
sendMessage("Error: "+err.Error(), flinkOutput.ConversationID, systemToken, msgDebug)
return
}
if flinkProvider == "flink" {
sendMessage("Result retrieved from Flink: "+fmt.Sprintf("%#v", result), flinkOutput.ConversationID, systemToken, msgDebug)
sendMessage("Now converting the result to Markdown", flinkOutput.ConversationID, systemToken, msgDebug)
response, err := convertResultToMarkdown(result)
if err != nil {
fmt.Println("Unable to generate Markdown from result:", err)
sendMessage("Error: "+err.Error(), flinkOutput.ConversationID, systemToken, msgDebug)
sendMessage("I'm sorry, I am unable to fetch the results from the Flink table.", flinkOutput.ConversationID, systemToken, msgNormal)
return
}
sendMessage(response, flinkOutput.ConversationID, systemToken, msgNormal)
} else {
sendMessage("Result retrieved from Flink: "+fmt.Sprintf("%#v", resultConfluent), flinkOutput.ConversationID, systemToken, msgDebug)
sendMessage("Now converting the result to Markdown", flinkOutput.ConversationID, systemToken, msgDebug)
response, err := convertConfluentResultToMarkdown(headerConfluent, resultConfluent)
if err != nil {
fmt.Println("Unable to generate Markdown from result:", err)
sendMessage("Error: "+err.Error(), flinkOutput.ConversationID, systemToken, msgDebug)
sendMessage("I'm sorry, I am unable to fetch the results from the Flink table.", flinkOutput.ConversationID, systemToken, msgNormal)
return
}
sendMessage(response, flinkOutput.ConversationID, systemToken, msgNormal)
}
}
} else {
fmt.Printf("Consumer error: %v\n", err)
}
}
}
}()
<-done
c.Close()
fmt.Println("Consumer closed")
}
Loading
Loading